Module: Henchman

Extended by:
Henchman
Included in:
Henchman
Defined in:
lib/henchman.rb,
lib/henchman/worker.rb

Overview

Thin wrapper around AMQP

Defined Under Namespace

Classes: Worker

Constant Summary collapse

@@connection =
nil
@@channel =
nil
@@error_handler =
Proc.new do
  STDERR.puts("consume(#{queue_name.inspect}, #{headers.inspect}, #{message.inspect}): #{exception.message}")
  STDERR.puts(exception.backtrace.join("\n"))
end
@@logger =
Proc.new do |msg|
  puts msg
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.error_handlerProc

Returns the error handler.

Returns:

  • (Proc)

    the error handler



45
46
47
# File 'lib/henchman.rb', line 45

def self.error_handler
  @@error_handler
end

Instance Method Details

#aenqueue(queue_name, message, headers = {}) ⇒ EM::Deferrable

Enqueue a message asynchronously.

Parameters:

  • queue_name (String)

    the name of the queue to enqueue on.

  • message (Object)

    the message to enqueue.

Returns:

  • (EM::Deferrable)

    a deferrable that will succeed when the publishing is done.



261
262
263
264
265
266
267
268
269
# File 'lib/henchman.rb', line 261

def aenqueue(queue_name, message, headers = {})
  deferrable = EM::DefaultDeferrable.new
  with_direct_exchange do |exchange|
    exchange.publish(MultiJson.encode(message), :headers => headers, :routing_key => queue_name) do
      deferrable.set_deferred_status :succeeded
    end
  end
  deferrable
end

#amqp_optionsHash

Will return the default options when connecting to the AMQP broker.

Uses the URL from #amqp_url to construct these options.

Returns:

  • (Hash)

    a Hash of options to AMQP.connect.



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/henchman.rb', line 81

def amqp_options
  uri = URI.parse(amqp_url)
  {
    :vhost => uri.path,
    :host => uri.host,
    :user => uri.user || "guest",
    :port => uri.port || 5672,
    :pass => uri.password || "guest"
  }
rescue Object => e
  raise "invalid AMQP_URL: #{uri.inspect} (#{e})"
end

#amqp_urlString

Will return a URL to the AMQP broker to use. Will get this from the ENV variable AMQP_URL if present.

Returns:

  • (String)

    a URL to an AMQP broker.



63
64
65
# File 'lib/henchman.rb', line 63

def amqp_url
  @amqp_url ||= (ENV["AMQP_URL"] || "amqp://localhost/")
end

#amqp_url=(url) ⇒ String

Returns the AMQP broker url to use.

Returns:

  • (String)

    the AMQP broker url to use.



70
71
72
# File 'lib/henchman.rb', line 70

def amqp_url=(url)
  @amqp_url = url
end

#apublish(exchange_name, message, headers = {}) ⇒ EM::Deferrable

Publish a message to multiple consumers asynchronously.

Parameters:

  • exchange_name (String)

    the name of the exchange to publish on.

  • message (Object)

    the object to publish

Returns:

  • (EM::Deferrable)

    a deferrable that will succeed when the publishing is done.



288
289
290
291
292
293
294
295
296
# File 'lib/henchman.rb', line 288

def apublish(exchange_name, message, headers = {})
  deferrable = EM::DefaultDeferrable.new
  with_fanout_exchange(exchange_name) do |exchange|
    exchange.publish(MultiJson.encode(message), :headers => headers) do
      deferrable.set_deferred_status :succeeded
    end
  end
  deferrable
end

#channel_optionsHash

Will return the default options to use when creating channels.

If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.

Returns:

  • (Hash)

    a Hash of options to use when creating channels.



128
129
130
131
132
133
# File 'lib/henchman.rb', line 128

def channel_options
  @channel_options ||= {
    :prefetch => 1,
    :auto_recovery => true
  }
end

#enqueue(queue_name, message, headers = {}) ⇒ Object

Enqueue a message synchronously.

Parameters:

  • queue_name (String)

    the name of the queue to enqueue on.

  • message (Object)

    the message to enqueue.



250
251
252
# File 'lib/henchman.rb', line 250

def enqueue(queue_name, message, headers = {})
  EM::Synchrony.sync(aenqueue(queue_name, message, headers))
end

#error(&block) ⇒ Object

Define an error handler.

Parameters:

  • block (Proc)

    the block that handles errors.



54
55
56
# File 'lib/henchman.rb', line 54

def error(&block)
  @@error_handler = block
end

#exchange_optionsHash

Will return the default options to use when creating exchanges.

If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.

Returns:

  • (Hash)

    a Hash of options to use when creating exchanges.



115
116
117
118
119
# File 'lib/henchman.rb', line 115

def exchange_options
  @exchange_options ||= {
    :auto_delete => true
  }
end

#log(msg) ⇒ Object

Log a message.

Parameters:

  • msg (String)

    the message to log.



39
40
# File 'lib/henchman.rb', line 39

def log(msg)
end

#logger(&block) ⇒ Object

Define a log handler.

Parameters:

  • block (Proc)

    the block that handles log messages.



30
31
32
# File 'lib/henchman.rb', line 30

def logger(&block)
  @@logger = block
end

#publish(exchange_name, message, headers = {}) ⇒ Object

Publish a a message to multiple consumers synchronously.

Parameters:

  • exchange_name (String)

    the name of the exchange to publish on.

  • message (Object)

    the object to publish



277
278
279
# File 'lib/henchman.rb', line 277

def publish(exchange_name, message, headers = {})
  EM::Synchrony.sync(apublish(exchange_name, message, headers))
end

#queue_optionsHash

Will return the default options to use when creating queues.

If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.

Returns:

  • (Hash)

    a Hash of options to use when creating queues.



101
102
103
104
105
106
# File 'lib/henchman.rb', line 101

def queue_options
  @queue_options ||= {
    :durable => true,
    :auto_delete => true
  }
end

#stop!Object

Will stop and deactivate Henchman.



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/henchman.rb', line 138

def stop!
  with_channel do |channel|
    channel.close
  end
  @@channel = nil
  with_connection do |connection|
    connection.close
  end
  @@connection = nil
  AMQP.stop
end

#with_channel(&block) ⇒ Object

Will yield an open and ready channel.

Parameters:

  • block (Proc)

    a Proc to yield an open and ready channel to.



177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/henchman.rb', line 177

def with_channel(&block)
  with_connection do |connection|
    @@channel = AMQP::Channel.new(connection, channel_options) if @@channel.nil? || @@channel.status == :closed
    @@channel.on_error do |channel, channel_close|
      log("#{self} reinitializing #{channel} due to #{channel_close}")
      channel.reuse
    end
    @@channel.once_open do 
      yield @@channel
    end
  end
end

#with_connection(&block) ⇒ Object

Will yield an open and ready connection.

Parameters:

  • block (Proc)

    a Proc to yield an open and ready connection to.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/henchman.rb', line 155

def with_connection(&block)
  @@connection = AMQP.connect(amqp_options) if @@connection.nil? || @@connection.status == :closed
  @@connection.on_tcp_connection_loss do
    log("#{self} reconnecting")
    @@connection.reconnect
  end
  @@connection.on_recovery do
    log("#{self} reconnected!")
  end 
  @@connection.on_error do |connection, connection_close|
    raise "#{connection}: #{connection_close.reply_text}"
  end
  @@connection.on_open do 
    yield @@connection
  end
end

#with_direct_exchange(&block) ⇒ Object

Will yield an open and ready direct exchange.

Parameters:

  • block (Proc)

    a Proc to yield an open and ready direct exchange to.



195
196
197
198
199
# File 'lib/henchman.rb', line 195

def with_direct_exchange(&block)
  with_channel do |channel|
    channel.direct(AMQ::Protocol::EMPTY_STRING, exchange_options, &block)
  end
end

#with_fanout_exchange(exchange_name, &block) ⇒ Object

Will yield an open and ready fanout exchange.

Parameters:

  • exchange_name (String)

    the name of the exchange to create or find.

  • block (Proc)

    a Proc to yield an open and ready fanout exchange to.



207
208
209
210
211
# File 'lib/henchman.rb', line 207

def with_fanout_exchange(exchange_name, &block)
  with_channel do |channel|
    channel.fanout(exchange_name, exchange_options, &block)
  end
end

#with_fanout_queue(exchange_name, &block) ⇒ Object

Will yield an open and ready queue bound to an open and ready fanout exchange.

Parameters:

  • exchange_name (String)

    the name of the exchange to create or find

  • block (Proc)

    the Proc to yield an open and ready queue bound to the found exchange to.



219
220
221
222
223
224
225
226
227
228
229
# File 'lib/henchman.rb', line 219

def with_fanout_queue(exchange_name, &block)
  with_channel do |channel|
    with_fanout_exchange(exchange_name) do |exchange|
      channel.queue do |queue|
        queue.bind(exchange) do
          yield queue
        end
      end
    end
  end
end

#with_queue(queue_name, &block) ⇒ Object

Will yield an open and ready queue.

Parameters:

  • block (Proc)

    a Proc to yield an open and ready queue to.



236
237
238
239
240
241
242
# File 'lib/henchman.rb', line 236

def with_queue(queue_name, &block)
  with_channel do |channel|
    channel.queue(queue_name, queue_options) do |queue|
      yield queue
    end
  end
end