Class: Qwirk::Adapter::Base::ExpandingWorkerConfig

Inherits:
WorkerConfig
  • Object
show all
Defined in:
lib/qwirk/adapter/base/expanding_worker_config.rb

Direct Known Subclasses

InMemory::WorkerConfig

Instance Attribute Summary

Attributes inherited from WorkerConfig

#adapter_factory, #default_options, #manager, #marshal_sym, #marshaler, #name, #options, #queue_name, #queue_options, #response_options, #topic_name, #worker_class

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from WorkerConfig

#bean_attributes_changed, default_marshal_sym, in_process?, #initialize, #marshal_response, #stop, #stopped?, #to_s, #unmarshal_response

Constructor Details

This class inherits a constructor from Qwirk::Adapter::Base::WorkerConfig

Class Method Details

.initial_default_configObject

Define the default config values for the attributes all workers will share. These will be sent as options to the constructor



13
14
15
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 13

def self.initial_default_config
  super.merge(:min_count => 0, :max_count => 0, :idle_worker_timeout => 60, :max_read_threshold => 1.0)
end

Instance Method Details

#countObject



28
29
30
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 28

def count
  @worker_mutex.synchronize { return @workers.size }
end

#initObject



17
18
19
20
21
22
23
24
25
26
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 17

def init
  super
  @workers          = []
  @min_count        = 0
  @max_count        = 0
  @index_count      = 0
  @index_mutex      = Mutex.new
  @worker_mutex     = Mutex.new
  @worker_condition = ConditionVariable.new
end

#join(timeout = nil) ⇒ Object

TODO: Need this? Should I only be calling worker.stop when stopping individual workers? def stop

Qwirk.logger.debug { "#{self}: In expanding_worker_config stop" }
# First stop the impl.  For InMemory, this will not return until all the messages in the queue have
# been processed since these messages are not persistent.
@worker_mutex.synchronize do
  @workers.each { |worker| worker.stop }
  while @workers.size > 0
    @worker_condition.wait(@worker_mutex)
  end
  super
end

end



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 75

def join(timeout=nil)
  workers = @worker_mutex.synchronize { @workers.dup }
  if timeout
    end_time = Time.now + timeout
    workers.each do |worker|
      t = end_time - Time.now
      t = 0 if t < 0
      worker.join(t)
    end
  else
    workers.each { |worker| worker.join }
  end
end

#max_count=(new_max_count) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 43

def max_count=(new_max_count)
  return if @max_count == new_max_count
  raise "#{self.worker_class.name}-#{self.name}: Can't change count since we've been stopped" if self.stopped?
  Qwirk.logger.info "#{self.worker_class.name}: Changing max number of workers from #{@max_count} to #{new_max_count}"
  self.min_count = new_max_count if @min_count > new_max_count
  @min_count = 1 if @min_count == 0 && new_max_count > 0
  deleted_workers = []
  @worker_mutex.synchronize do
    @timer ||= ::Rumx::Beans::TimerAndError.new
    if @workers.size > new_max_count
      deleted_workers = @workers[new_max_count..-1]
      deleted_workers.each { |worker| worker.stop }
    end
    @max_count = new_max_count
  end
  deleted_workers.each { |worker| worker.join }
end

#min_count=(new_min_count) ⇒ Object



32
33
34
35
36
37
38
39
40
41
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 32

def min_count=(new_min_count)
  return if @min_count == new_min_count
  raise "#{self.worker_class.name}-#{self.name}: Can't change count since we've been stopped" if self.stopped?
  Qwirk.logger.info "#{self.worker_class.name}: Changing min number of workers from #{@min_count} to #{new_min_count}"
  self.max_count = new_min_count if @max_count < new_min_count
  @worker_mutex.synchronize do
    add_worker while @workers.size < new_min_count
    @min_count = new_min_count
  end
end

#periodic_call(poll_time) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 93

def periodic_call(poll_time)
  now = Time.now
  add_new_worker = true
  worker_stopped = false
  @worker_mutex.synchronize do
    # reverse_each to remove later workers first
    @workers.reverse_each do |worker|
      start_worker_time = worker.start_worker_time
      start_read_time = worker.start_read_time
      if !start_read_time || (now - start_worker_time) < (poll_time + @max_read_threshold)
        #Qwirk.logger.debug { "#{self}: Skipping newly created worker" }
        add_new_worker = false
        next
      end
      end_read_time = worker.start_processing_time
      # If the processing time is actually from the previous processing, then we're probably still waiting for the read to complete.
      if !end_read_time || end_read_time < start_read_time
        if !worker_stopped && @workers.size > @min_count && (now - start_read_time) > @idle_worker_timeout
          worker.stop
          worker_stopped = true
        end
        end_read_time = now
      end
      #Qwirk.logger.debug { "#{self}: start=#{start_read_time} end=#{end_read_time} thres=#{@max_read_threshold} add_new_worker=#{add_new_worker}" }
      add_new_worker = false if (end_read_time - start_read_time) > @max_read_threshold
    end
    add_worker if !self.stopped? && add_new_worker && @workers.size < @max_count
  end
end

#worker_stopped(worker) ⇒ Object



89
90
91
# File 'lib/qwirk/adapter/base/expanding_worker_config.rb', line 89

def worker_stopped(worker)
  remove_worker(worker)
end