Class: ActiveMessaging::ThreadedPoller
- Inherits:
-
Object
- Object
- ActiveMessaging::ThreadedPoller
- Includes:
- Celluloid
- Defined in:
- lib/activemessaging/threaded_poller.rb
Instance Attribute Summary collapse
-
#busy ⇒ Object
Returns the value of attribute busy.
-
#configuration ⇒ Object
Returns the value of attribute configuration.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#receiver ⇒ Object
Returns the value of attribute receiver.
-
#running ⇒ Object
Returns the value of attribute running.
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
- #died(worker, reason) ⇒ Object
- #dispatch(message, worker) ⇒ Object
- #executed(worker) ⇒ Object
-
#initialize(connection = 'default', configuration = {}) ⇒ ThreadedPoller
constructor
connection is a string, name of the connection from broker.yml to use for this threaded poller instance.
-
#log_status ⇒ Object
recursive method, uses celluloid ‘after’ to keep calling.
- #logger ⇒ Object
- #receive(worker) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
Constructor Details
#initialize(connection = 'default', configuration = {}) ⇒ ThreadedPoller
connection is a string, name of the connection from broker.yml to use for this threaded poller instance
configuration is a list of hashes each has describes a group of worker threads for each group, define what priorities those workers will process
[
{
:pool_size => 1 # number of workers of this type
:priorities => [1,2,3] # what message priorities this thread will process
}
]
33 34 35 36 37 |
# File 'lib/activemessaging/threaded_poller.rb', line 33 def initialize(connection='default', configuration={}) # default config is a pool size of 3 worker threads self.configuration = configuration || [{:pool_size => 3}] self.connection = connection end |
Instance Attribute Details
#busy ⇒ Object
Returns the value of attribute busy.
18 19 20 |
# File 'lib/activemessaging/threaded_poller.rb', line 18 def busy @busy end |
#configuration ⇒ Object
Returns the value of attribute configuration.
18 19 20 |
# File 'lib/activemessaging/threaded_poller.rb', line 18 def configuration @configuration end |
#connection ⇒ Object
Returns the value of attribute connection.
18 19 20 |
# File 'lib/activemessaging/threaded_poller.rb', line 18 def connection @connection end |
#receiver ⇒ Object
Returns the value of attribute receiver.
18 19 20 |
# File 'lib/activemessaging/threaded_poller.rb', line 18 def receiver @receiver end |
#running ⇒ Object
Returns the value of attribute running.
18 19 20 |
# File 'lib/activemessaging/threaded_poller.rb', line 18 def running @running end |
#workers ⇒ Object
Returns the value of attribute workers.
18 19 20 |
# File 'lib/activemessaging/threaded_poller.rb', line 18 def workers @workers end |
Instance Method Details
#died(worker, reason) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/activemessaging/threaded_poller.rb', line 120 def died(worker, reason) busy.delete(worker) if running logger.info "uh oh, #{worker.inspect} died because of #{reason.class}" worker = Worker.new_link(current_actor) workers << worker receive(worker) else logger.info "check to see if busy is empty: #{busy.inspect}" if busy.empty? logger.info "all died: signal stopped" after(0){ signal(:shutdown) } end end end |
#dispatch(message, worker) ⇒ Object
99 100 101 102 103 |
# File 'lib/activemessaging/threaded_poller.rb', line 99 def dispatch(, worker) workers.delete(worker) busy << worker worker.execute!() end |
#executed(worker) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/activemessaging/threaded_poller.rb', line 105 def executed(worker) busy.delete(worker) if running workers << worker receive(worker) else worker.terminate if worker.alive? if busy.empty? logger.info "all executed: signal stopped" after(0) { signal(:shutdown) } end end end |
#log_status ⇒ Object
recursive method, uses celluloid ‘after’ to keep calling
89 90 91 92 93 |
# File 'lib/activemessaging/threaded_poller.rb', line 89 def log_status return unless logger.debug? logger.debug("ActiveMessaging::ThreadedPoller: conn:#{connection}, #{workers.count}, #{busy.count}, #{running}") after(10){ log_status } end |
#logger ⇒ Object
141 |
# File 'lib/activemessaging/threaded_poller.rb', line 141 def logger; ActiveMessaging.logger; end |
#receive(worker) ⇒ Object
95 96 97 |
# File 'lib/activemessaging/threaded_poller.rb', line 95 def receive(worker) receiver.receive!(worker) if (receiver && running && worker) end |
#start ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/activemessaging/threaded_poller.rb', line 39 def start logger.info "ActiveMessaging::ThreadedPoller start" # these are workers ready to use self.workers = [] # these are workers already working self.busy = [] # this indicates if we are running or not, helps threads to stop gracefully self.running = true # subscribe will create the connections based on subscriptions in processsors # (you can't find or use the connection until it is created by calling this) ActiveMessaging::Gateway.subscribe # create a message receiver actor, ony need one, using connection receiver_connection = ActiveMessaging::Gateway.connection(connection) self.receiver = MessageReceiver.new(current_actor, receiver_connection) # start the workers based on the config configuration.each do |c| (c[:pool_size] || 1).times{ self.workers << Worker.new_link(current_actor, c) } end # once all workers are created, start them up self.workers.each{|worker| receive(worker)} # in debug level, log info about workers every 10 seconds log_status end |
#stop ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/activemessaging/threaded_poller.rb', line 71 def stop logger.info "ActiveMessaging::ThreadedPoller stop" # indicates to all busy workers not to pick up another messages, but does not interrupt # also indicates to the message receiver to stop getting more messages self.running = false # tell each waiting worker to shut down. Running ones will be allowed to finish receiver.terminate! if receiver.alive? logger.info "ActiveMessaging::ThreadedPoller receiver terminated" workers.each { |w| w.terminate! if w.alive? } logger.info "ActiveMessaging::ThreadedPoller workers terminated" after(0) { signal(:shutdown) } if stopped? end |
#stopped? ⇒ Boolean
137 138 139 |
# File 'lib/activemessaging/threaded_poller.rb', line 137 def stopped? (!running && busy.empty?) end |