Class: Henchman::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/henchman/worker.rb

Overview

A class that handles incoming messages.

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, &block) ⇒ Worker

Returns a new instance of Worker.

Parameters:

  • queue_name (String)

    the name of the queue this worker listens to.

  • exchange_type (Symbol)

    the type of exchange this worker will connect its queue to.

  • block (Proc)

    the Proc that will handle the messages for this Henchman::Worker.



112
113
114
115
# File 'lib/henchman/worker.rb', line 112

def initialize(queue_name, &block)
  @block = block
  @queue_name = queue_name
end

Instance Attribute Details

#blockObject

Proc

the Proc handling the messages for this Henchman::Worker.



105
106
107
# File 'lib/henchman/worker.rb', line 105

def block
  @block
end

#consumerObject

AMQP::Consumer

the consumer feeding this Henchman::Worker with messages.



100
101
102
# File 'lib/henchman/worker.rb', line 100

def consumer
  @consumer
end

#queue_nameObject

String

the name of the queue this Henchman::Worker listens to.



95
96
97
# File 'lib/henchman/worker.rb', line 95

def queue_name
  @queue_name
end

Instance Method Details

#call(message, headers = nil) ⇒ Henchman::Worker::Task

Call this worker with some data.

Parameters:

  • headers (AMQP::Header) (defaults to: nil)

    the headers to handle.

  • message (Object)

    the message to handle.

Returns:



176
177
178
# File 'lib/henchman/worker.rb', line 176

def call(message, headers = nil)
  Task.new(self, headers, message).call
end

#consume!Object

Make this Henchman::Worker subscribe to a direct exchange.



160
161
162
163
164
165
166
# File 'lib/henchman/worker.rb', line 160

def consume!
  deferrable = EM::DefaultDeferrable.new
  Henchman.with_queue(queue_name) do |queue|
    subscribe_to(queue, deferrable)
  end
  EM::Synchrony.sync deferrable
end

#subscribe!Object

Make this Henchman::Worker subscribe to a fanout exchange.



149
150
151
152
153
154
155
# File 'lib/henchman/worker.rb', line 149

def subscribe!
  deferrable = EM::DefaultDeferrable.new
  Henchman.with_fanout_queue(queue_name) do |queue|
    subscribe_to(queue, deferrable)
  end
  EM::Synchrony.sync deferrable
end

#subscribe_to(queue, deferrable) ⇒ Object

Subscribe this Henchman::Worker to a queue.

Parameters:

  • queue (AMQP::Queue)

    the AMQP::Queue to subscribe the Henchman::Worker to.

  • deferrable (EM::Deferrable)

    an EM::Deferrable that will succeed with the subscription is done.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/henchman/worker.rb', line 123

def subscribe_to(queue, deferrable)
  Henchman.with_channel do |channel|
    @consumer = AMQP::Consumer.new(channel, 
                                  queue, 
                                  queue.generate_consumer_tag(queue.name), # consumer_tag
                                  false, # exclusive
                                  false) # no_ack
    consumer.on_delivery do |headers, data|
      if queue.channel.status == :opened
        begin
          call(MultiJson.decode(data), headers)
        rescue Exception => e
          STDERR.puts e
          STDERR.puts e.backtrace.join("\n")
        end
      end
    end
    consumer.consume do 
      deferrable.set_deferred_status :succeeded
    end
  end
end

#unsubscribe!Object

Unsubscribe this Henchman::Worker from its queue.



183
184
185
186
187
188
189
190
191
# File 'lib/henchman/worker.rb', line 183

def unsubscribe!
  deferrable = EM::DefaultDeferrable.new
  consumer.cancel do
    deferrable.set_deferred_status :succeeded
  end
  Fiber.new do
    EM::Synchrony.sync deferrable
  end.resume
end