Class: Qwirk::Adapter::Base::ExpandingWorkerConfig
- Inherits:
-
WorkerConfig
- Object
- WorkerConfig
- Qwirk::Adapter::Base::ExpandingWorkerConfig
- Defined in:
- lib/qwirk/adapter/base/expanding_worker_config.rb
Direct Known Subclasses
Instance Attribute Summary
Attributes inherited from WorkerConfig
#adapter_factory, #default_options, #manager, #marshal_sym, #marshaler, #name, #options, #queue_name, #queue_options, #response_options, #topic_name, #worker_class
Class Method Summary collapse
-
.initial_default_config ⇒ Object
Define the default config values for the attributes all workers will share.
Instance Method Summary collapse
- #count ⇒ Object
- #init ⇒ Object
-
#join(timeout = nil) ⇒ Object
TODO: Need this? Should I only be calling worker.stop when stopping individual workers? def stop Qwirk.logger.debug { “#self: In expanding_worker_config stop” } # First stop the impl.
- #max_count=(new_max_count) ⇒ Object
- #min_count=(new_min_count) ⇒ Object
- #periodic_call(poll_time) ⇒ Object
- #worker_stopped(worker) ⇒ Object
Methods inherited from WorkerConfig
#bean_attributes_changed, default_marshal_sym, in_process?, #initialize, #marshal_response, #stop, #stopped?, #to_s, #unmarshal_response
Constructor Details
This class inherits a constructor from Qwirk::Adapter::Base::WorkerConfig
Class Method Details
.initial_default_config ⇒ Object
Define the default config values for the attributes all workers will share. These will be sent as options to the constructor
13 14 15 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 13 def self.initial_default_config super.merge(:min_count => 0, :max_count => 0, :idle_worker_timeout => 60, :max_read_threshold => 1.0) end |
Instance Method Details
#count ⇒ Object
28 29 30 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 28 def count @worker_mutex.synchronize { return @workers.size } end |
#init ⇒ Object
17 18 19 20 21 22 23 24 25 26 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 17 def init super @workers = [] @min_count = 0 @max_count = 0 @index_count = 0 @index_mutex = Mutex.new @worker_mutex = Mutex.new @worker_condition = ConditionVariable.new end |
#join(timeout = nil) ⇒ Object
TODO: Need this? Should I only be calling worker.stop when stopping individual workers? def stop
Qwirk.logger.debug { "#{self}: In expanding_worker_config stop" }
# First stop the impl. For InMemory, this will not return until all the messages in the queue have
# been processed since these messages are not persistent.
@worker_mutex.synchronize do
@workers.each { |worker| worker.stop }
while @workers.size > 0
@worker_condition.wait(@worker_mutex)
end
super
end
end
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 75 def join(timeout=nil) workers = @worker_mutex.synchronize { @workers.dup } if timeout end_time = Time.now + timeout workers.each do |worker| t = end_time - Time.now t = 0 if t < 0 worker.join(t) end else workers.each { |worker| worker.join } end end |
#max_count=(new_max_count) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 43 def max_count=(new_max_count) return if @max_count == new_max_count raise "#{self.worker_class.name}-#{self.name}: Can't change count since we've been stopped" if self.stopped? Qwirk.logger.info "#{self.worker_class.name}: Changing max number of workers from #{@max_count} to #{new_max_count}" self.min_count = new_max_count if @min_count > new_max_count @min_count = 1 if @min_count == 0 && new_max_count > 0 deleted_workers = [] @worker_mutex.synchronize do @timer ||= ::Rumx::Beans::TimerAndError.new if @workers.size > new_max_count deleted_workers = @workers[new_max_count..-1] deleted_workers.each { |worker| worker.stop } end @max_count = new_max_count end deleted_workers.each { |worker| worker.join } end |
#min_count=(new_min_count) ⇒ Object
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 32 def min_count=(new_min_count) return if @min_count == new_min_count raise "#{self.worker_class.name}-#{self.name}: Can't change count since we've been stopped" if self.stopped? Qwirk.logger.info "#{self.worker_class.name}: Changing min number of workers from #{@min_count} to #{new_min_count}" self.max_count = new_min_count if @max_count < new_min_count @worker_mutex.synchronize do add_worker while @workers.size < new_min_count @min_count = new_min_count end end |
#periodic_call(poll_time) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 93 def periodic_call(poll_time) now = Time.now add_new_worker = true worker_stopped = false @worker_mutex.synchronize do # reverse_each to remove later workers first @workers.reverse_each do |worker| start_worker_time = worker.start_worker_time start_read_time = worker.start_read_time if !start_read_time || (now - start_worker_time) < (poll_time + @max_read_threshold) #Qwirk.logger.debug { "#{self}: Skipping newly created worker" } add_new_worker = false next end end_read_time = worker.start_processing_time # If the processing time is actually from the previous processing, then we're probably still waiting for the read to complete. if !end_read_time || end_read_time < start_read_time if !worker_stopped && @workers.size > @min_count && (now - start_read_time) > @idle_worker_timeout worker.stop worker_stopped = true end end_read_time = now end #Qwirk.logger.debug { "#{self}: start=#{start_read_time} end=#{end_read_time} thres=#{@max_read_threshold} add_new_worker=#{add_new_worker}" } add_new_worker = false if (end_read_time - start_read_time) > @max_read_threshold end add_worker if !self.stopped? && add_new_worker && @workers.size < @max_count end end |
#worker_stopped(worker) ⇒ Object
89 90 91 |
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 89 def worker_stopped(worker) remove_worker(worker) end |