Class: ActiveMessaging::ThreadedPoller
- Inherits:
-
Object
- Object
- ActiveMessaging::ThreadedPoller
- Includes:
- Celluloid
- Defined in:
- lib/activemessaging/threaded_poller.rb
Instance Attribute Summary collapse
-
#busy ⇒ Object
Returns the value of attribute busy.
-
#configuration ⇒ Object
Returns the value of attribute configuration.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#pause ⇒ Object
Returns the value of attribute pause.
-
#receiver ⇒ Object
Returns the value of attribute receiver.
-
#running ⇒ Object
Returns the value of attribute running.
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
- #died(worker, reason) ⇒ Object
- #dispatch(message, worker) ⇒ Object
- #executed(worker) ⇒ Object
-
#initialize(connection = 'default', configuration = {}) ⇒ ThreadedPoller
constructor
connection is a string, name of the connection from broker.yml to use for this threaded poller instance.
- #inspect ⇒ Object
-
#log_status ⇒ Object
recursive method, uses celluloid ‘after’ to keep calling.
- #logger ⇒ Object
- #receive(worker) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
- #to_s ⇒ Object
Constructor Details
#initialize(connection = 'default', configuration = {}) ⇒ ThreadedPoller
connection is a string, name of the connection from broker.yml to use for this threaded poller instance
configuration is a list of hashes each has describes a group of worker threads for each group, define what priorities those workers will process
[
{
:pool_size => 1 # number of workers of this type
:priorities => [1,2,3] # what message priorities this thread will process
}
]
30 31 32 33 34 35 |
# File 'lib/activemessaging/threaded_poller.rb', line 30 def initialize(connection='default', configuration={}) # default config is a pool size of 3 worker threads self.configuration = configuration || [{:pool_size => 3}] self.connection = connection self.pause = 1 end |
Instance Attribute Details
#busy ⇒ Object
Returns the value of attribute busy.
15 16 17 |
# File 'lib/activemessaging/threaded_poller.rb', line 15 def busy @busy end |
#configuration ⇒ Object
Returns the value of attribute configuration.
15 16 17 |
# File 'lib/activemessaging/threaded_poller.rb', line 15 def configuration @configuration end |
#connection ⇒ Object
Returns the value of attribute connection.
15 16 17 |
# File 'lib/activemessaging/threaded_poller.rb', line 15 def connection @connection end |
#pause ⇒ Object
Returns the value of attribute pause.
15 16 17 |
# File 'lib/activemessaging/threaded_poller.rb', line 15 def pause @pause end |
#receiver ⇒ Object
Returns the value of attribute receiver.
15 16 17 |
# File 'lib/activemessaging/threaded_poller.rb', line 15 def receiver @receiver end |
#running ⇒ Object
Returns the value of attribute running.
15 16 17 |
# File 'lib/activemessaging/threaded_poller.rb', line 15 def running @running end |
#workers ⇒ Object
Returns the value of attribute workers.
15 16 17 |
# File 'lib/activemessaging/threaded_poller.rb', line 15 def workers @workers end |
Instance Method Details
#died(worker, reason) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/activemessaging/threaded_poller.rb', line 118 def died(worker, reason) busy.delete(worker) if running logger.info "uh oh, #{worker.inspect} died because of #{reason.class}" worker = Worker.new_link(current_actor) workers << worker receive(worker) else logger.info "check to see if busy is empty: #{busy.inspect}" if busy.empty? logger.info "all died: signal stopped" after(0){ signal(:shutdown) } end end end |
#dispatch(message, worker) ⇒ Object
97 98 99 100 101 |
# File 'lib/activemessaging/threaded_poller.rb', line 97 def dispatch(, worker) workers.delete(worker) busy << worker worker.execute!() end |
#executed(worker) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/activemessaging/threaded_poller.rb', line 103 def executed(worker) busy.delete(worker) if running workers << worker receive(worker) else worker.terminate if worker.alive? if busy.empty? logger.info "all executed: signal stopped" after(0) { signal(:shutdown) } end end end |
#inspect ⇒ Object
139 140 141 |
# File 'lib/activemessaging/threaded_poller.rb', line 139 def inspect "#<ThreadedPoller #{to_s}>" end |
#log_status ⇒ Object
recursive method, uses celluloid ‘after’ to keep calling
87 88 89 90 91 |
# File 'lib/activemessaging/threaded_poller.rb', line 87 def log_status return unless logger.debug? logger.debug("ActiveMessaging::ThreadedPoller: conn:#{connection}, #{workers.count}, #{busy.count}, #{running}") after(10){ log_status } end |
#logger ⇒ Object
147 |
# File 'lib/activemessaging/threaded_poller.rb', line 147 def logger; ActiveMessaging.logger; end |
#receive(worker) ⇒ Object
93 94 95 |
# File 'lib/activemessaging/threaded_poller.rb', line 93 def receive(worker) receiver.receive!(worker) if (receiver && running && worker) end |
#start ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/activemessaging/threaded_poller.rb', line 37 def start logger.info "ActiveMessaging::ThreadedPoller start" # these are workers ready to use self.workers = [] # these are workers already working self.busy = [] # this indicates if we are running or not, helps threads to stop gracefully self.running = true # subscribe will create the connections based on subscriptions in processsors # (you can't find or use the connection until it is created by calling this) ActiveMessaging::Gateway.subscribe # create a message receiver actor, ony need one, using connection receiver_connection = ActiveMessaging::Gateway.connection(connection) self.receiver = MessageReceiver.new(current_actor, receiver_connection, pause) # start the workers based on the config configuration.each do |c| (c[:pool_size] || 1).times{ self.workers << Worker.new_link(current_actor, c) } end # once all workers are created, start them up self.workers.each{|worker| receive(worker)} # in debug level, log info about workers every 10 seconds log_status end |
#stop ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/activemessaging/threaded_poller.rb', line 69 def stop logger.info "ActiveMessaging::ThreadedPoller stop" # indicates to all busy workers not to pick up another messages, but does not interrupt # also indicates to the message receiver to stop getting more messages self.running = false # tell each waiting worker to shut down. Running ones will be allowed to finish receiver.terminate! if receiver.alive? logger.info "ActiveMessaging::ThreadedPoller receiver terminated" workers.each { |w| w.terminate! if w.alive? } logger.info "ActiveMessaging::ThreadedPoller workers terminated" after(0) { signal(:shutdown) } if stopped? end |
#stopped? ⇒ Boolean
135 136 137 |
# File 'lib/activemessaging/threaded_poller.rb', line 135 def stopped? (!running && busy.empty?) end |
#to_s ⇒ Object
143 144 145 |
# File 'lib/activemessaging/threaded_poller.rb', line 143 def to_s @str ||= "#{Process.pid}-#{Thread.current.object_id}:#{self.object_id}" end |