Class: Henchman::Worker
- Inherits:
-
Object
- Object
- Henchman::Worker
- Defined in:
- lib/henchman/worker.rb
Overview
A class that handles incoming messages.
Defined Under Namespace
Classes: Task
Instance Attribute Summary collapse
-
#block ⇒ Object
- Proc
-
the Proc handling the messages for this Worker.
-
#consumer ⇒ Object
- AMQP::Consumer
-
the consumer feeding this Worker with messages.
-
#queue_name ⇒ Object
- String
-
the name of the queue this Worker listens to.
Instance Method Summary collapse
-
#call(message, headers = nil) ⇒ Henchman::Worker::Task
Call this worker with some data.
-
#consume! ⇒ Object
Make this Worker subscribe to a direct exchange.
-
#initialize(queue_name, &block) ⇒ Worker
constructor
A new instance of Worker.
-
#subscribe! ⇒ Object
Make this Worker subscribe to a fanout exchange.
-
#subscribe_to(queue, deferrable) ⇒ Object
Subscribe this Worker to a queue.
-
#unsubscribe! ⇒ Object
Unsubscribe this Worker from its queue.
Constructor Details
#initialize(queue_name, &block) ⇒ Worker
Returns a new instance of Worker.
112 113 114 115 |
# File 'lib/henchman/worker.rb', line 112 def initialize(queue_name, &block) @block = block @queue_name = queue_name end |
Instance Attribute Details
#block ⇒ Object
- Proc
-
the Proc handling the messages for this Henchman::Worker.
105 106 107 |
# File 'lib/henchman/worker.rb', line 105 def block @block end |
#consumer ⇒ Object
- AMQP::Consumer
-
the consumer feeding this Henchman::Worker with messages.
100 101 102 |
# File 'lib/henchman/worker.rb', line 100 def consumer @consumer end |
#queue_name ⇒ Object
- String
-
the name of the queue this Henchman::Worker listens to.
95 96 97 |
# File 'lib/henchman/worker.rb', line 95 def queue_name @queue_name end |
Instance Method Details
#call(message, headers = nil) ⇒ Henchman::Worker::Task
Call this worker with some data.
176 177 178 |
# File 'lib/henchman/worker.rb', line 176 def call(, headers = nil) Task.new(self, headers, ).call end |
#consume! ⇒ Object
Make this Henchman::Worker subscribe to a direct exchange.
160 161 162 163 164 165 166 |
# File 'lib/henchman/worker.rb', line 160 def consume! deferrable = EM::DefaultDeferrable.new Henchman.with_queue(queue_name) do |queue| subscribe_to(queue, deferrable) end EM::Synchrony.sync deferrable end |
#subscribe! ⇒ Object
Make this Henchman::Worker subscribe to a fanout exchange.
149 150 151 152 153 154 155 |
# File 'lib/henchman/worker.rb', line 149 def subscribe! deferrable = EM::DefaultDeferrable.new Henchman.with_fanout_queue(queue_name) do |queue| subscribe_to(queue, deferrable) end EM::Synchrony.sync deferrable end |
#subscribe_to(queue, deferrable) ⇒ Object
Subscribe this Henchman::Worker to a queue.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/henchman/worker.rb', line 123 def subscribe_to(queue, deferrable) Henchman.with_channel do |channel| @consumer = AMQP::Consumer.new(channel, queue, queue.generate_consumer_tag(queue.name), # consumer_tag false, # exclusive false) # no_ack consumer.on_delivery do |headers, data| if queue.channel.status == :opened begin call(MultiJson.decode(data), headers) rescue Exception => e STDERR.puts e STDERR.puts e.backtrace.join("\n") end end end consumer.consume do deferrable.set_deferred_status :succeeded end end end |
#unsubscribe! ⇒ Object
Unsubscribe this Henchman::Worker from its queue.
183 184 185 186 187 188 189 190 191 |
# File 'lib/henchman/worker.rb', line 183 def unsubscribe! deferrable = EM::DefaultDeferrable.new consumer.cancel do deferrable.set_deferred_status :succeeded end Fiber.new do EM::Synchrony.sync deferrable end.resume end |