Module: Henchman
Overview
Thin wrapper around AMQP
Defined Under Namespace
Classes: Worker
Constant Summary collapse
- @@connection =
nil
- @@channel =
nil
- @@error_handler =
Proc.new do STDERR.puts("consume(#{queue_name.inspect}, #{headers.inspect}, #{.inspect}): #{exception.}") STDERR.puts(exception.backtrace.join("\n")) end
- @@logger =
Proc.new do |msg| puts msg end
Class Method Summary collapse
-
.error_handler ⇒ Proc
The error handler.
Instance Method Summary collapse
-
#aenqueue(queue_name, message, headers = {}) ⇒ EM::Deferrable
Enqueue a message asynchronously.
-
#amqp_options ⇒ Hash
Will return the default options when connecting to the AMQP broker.
-
#amqp_url ⇒ String
Will return a URL to the AMQP broker to use.
-
#amqp_url=(url) ⇒ String
The AMQP broker url to use.
-
#apublish(exchange_name, message, headers = {}) ⇒ EM::Deferrable
Publish a message to multiple consumers asynchronously.
-
#channel_options ⇒ Hash
Will return the default options to use when creating channels.
-
#enqueue(queue_name, message, headers = {}) ⇒ Object
Enqueue a message synchronously.
-
#error(&block) ⇒ Object
Define an error handler.
-
#exchange_options ⇒ Hash
Will return the default options to use when creating exchanges.
-
#log(msg) ⇒ Object
Log a message.
-
#logger(&block) ⇒ Object
Define a log handler.
-
#publish(exchange_name, message, headers = {}) ⇒ Object
Publish a a message to multiple consumers synchronously.
-
#queue_options ⇒ Hash
Will return the default options to use when creating queues.
-
#stop! ⇒ Object
Will stop and deactivate Henchman.
-
#with_channel(&block) ⇒ Object
Will yield an open and ready channel.
-
#with_connection(&block) ⇒ Object
Will yield an open and ready connection.
-
#with_direct_exchange(&block) ⇒ Object
Will yield an open and ready direct exchange.
-
#with_fanout_exchange(exchange_name, &block) ⇒ Object
Will yield an open and ready fanout exchange.
-
#with_fanout_queue(exchange_name, &block) ⇒ Object
Will yield an open and ready queue bound to an open and ready fanout exchange.
-
#with_queue(queue_name, &block) ⇒ Object
Will yield an open and ready queue.
Class Method Details
.error_handler ⇒ Proc
Returns the error handler.
45 46 47 |
# File 'lib/henchman.rb', line 45 def self.error_handler @@error_handler end |
Instance Method Details
#aenqueue(queue_name, message, headers = {}) ⇒ EM::Deferrable
Enqueue a message asynchronously.
261 262 263 264 265 266 267 268 269 |
# File 'lib/henchman.rb', line 261 def aenqueue(queue_name, , headers = {}) deferrable = EM::DefaultDeferrable.new with_direct_exchange do |exchange| exchange.publish(MultiJson.encode(), :headers => headers, :routing_key => queue_name) do deferrable.set_deferred_status :succeeded end end deferrable end |
#amqp_options ⇒ Hash
Will return the default options when connecting to the AMQP broker.
Uses the URL from #amqp_url to construct these options.
81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/henchman.rb', line 81 def uri = URI.parse(amqp_url) { :vhost => uri.path, :host => uri.host, :user => uri.user || "guest", :port => uri.port || 5672, :pass => uri.password || "guest" } rescue Object => e raise "invalid AMQP_URL: #{uri.inspect} (#{e})" end |
#amqp_url ⇒ String
Will return a URL to the AMQP broker to use. Will get this from the ENV
variable AMQP_URL
if present.
63 64 65 |
# File 'lib/henchman.rb', line 63 def amqp_url @amqp_url ||= (ENV["AMQP_URL"] || "amqp://localhost/") end |
#amqp_url=(url) ⇒ String
Returns the AMQP broker url to use.
70 71 72 |
# File 'lib/henchman.rb', line 70 def amqp_url=(url) @amqp_url = url end |
#apublish(exchange_name, message, headers = {}) ⇒ EM::Deferrable
Publish a message to multiple consumers asynchronously.
288 289 290 291 292 293 294 295 296 |
# File 'lib/henchman.rb', line 288 def apublish(exchange_name, , headers = {}) deferrable = EM::DefaultDeferrable.new with_fanout_exchange(exchange_name) do |exchange| exchange.publish(MultiJson.encode(), :headers => headers) do deferrable.set_deferred_status :succeeded end end deferrable end |
#channel_options ⇒ Hash
Will return the default options to use when creating channels.
If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.
128 129 130 131 132 133 |
# File 'lib/henchman.rb', line 128 def @channel_options ||= { :prefetch => 1, :auto_recovery => true } end |
#enqueue(queue_name, message, headers = {}) ⇒ Object
Enqueue a message synchronously.
250 251 252 |
# File 'lib/henchman.rb', line 250 def enqueue(queue_name, , headers = {}) EM::Synchrony.sync(aenqueue(queue_name, , headers)) end |
#error(&block) ⇒ Object
Define an error handler.
54 55 56 |
# File 'lib/henchman.rb', line 54 def error(&block) @@error_handler = block end |
#exchange_options ⇒ Hash
Will return the default options to use when creating exchanges.
If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.
115 116 117 118 119 |
# File 'lib/henchman.rb', line 115 def @exchange_options ||= { :auto_delete => true } end |
#log(msg) ⇒ Object
Log a message.
39 40 |
# File 'lib/henchman.rb', line 39 def log(msg) end |
#logger(&block) ⇒ Object
Define a log handler.
30 31 32 |
# File 'lib/henchman.rb', line 30 def logger(&block) @@logger = block end |
#publish(exchange_name, message, headers = {}) ⇒ Object
Publish a a message to multiple consumers synchronously.
277 278 279 |
# File 'lib/henchman.rb', line 277 def publish(exchange_name, , headers = {}) EM::Synchrony.sync(apublish(exchange_name, , headers)) end |
#queue_options ⇒ Hash
Will return the default options to use when creating queues.
If you change the returned Hash the changes will persist in this instance, so use this to configure stuff.
101 102 103 104 105 106 |
# File 'lib/henchman.rb', line 101 def @queue_options ||= { :durable => true, :auto_delete => true } end |
#stop! ⇒ Object
Will stop and deactivate Henchman.
138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/henchman.rb', line 138 def stop! with_channel do |channel| channel.close end @@channel = nil with_connection do |connection| connection.close end @@connection = nil AMQP.stop end |
#with_channel(&block) ⇒ Object
Will yield an open and ready channel.
177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/henchman.rb', line 177 def with_channel(&block) with_connection do |connection| @@channel = AMQP::Channel.new(connection, ) if @@channel.nil? || @@channel.status == :closed @@channel.on_error do |channel, channel_close| log("#{self} reinitializing #{channel} due to #{channel_close}") channel.reuse end @@channel.once_open do yield @@channel end end end |
#with_connection(&block) ⇒ Object
Will yield an open and ready connection.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/henchman.rb', line 155 def with_connection(&block) @@connection = AMQP.connect() if @@connection.nil? || @@connection.status == :closed @@connection.on_tcp_connection_loss do log("#{self} reconnecting") @@connection.reconnect end @@connection.on_recovery do log("#{self} reconnected!") end @@connection.on_error do |connection, connection_close| raise "#{connection}: #{connection_close.reply_text}" end @@connection.on_open do yield @@connection end end |
#with_direct_exchange(&block) ⇒ Object
Will yield an open and ready direct exchange.
195 196 197 198 199 |
# File 'lib/henchman.rb', line 195 def with_direct_exchange(&block) with_channel do |channel| channel.direct(AMQ::Protocol::EMPTY_STRING, , &block) end end |
#with_fanout_exchange(exchange_name, &block) ⇒ Object
Will yield an open and ready fanout exchange.
207 208 209 210 211 |
# File 'lib/henchman.rb', line 207 def with_fanout_exchange(exchange_name, &block) with_channel do |channel| channel.fanout(exchange_name, , &block) end end |
#with_fanout_queue(exchange_name, &block) ⇒ Object
Will yield an open and ready queue bound to an open and ready fanout exchange.
219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/henchman.rb', line 219 def with_fanout_queue(exchange_name, &block) with_channel do |channel| with_fanout_exchange(exchange_name) do |exchange| channel.queue do |queue| queue.bind(exchange) do yield queue end end end end end |
#with_queue(queue_name, &block) ⇒ Object
Will yield an open and ready queue.
236 237 238 239 240 241 242 |
# File 'lib/henchman.rb', line 236 def with_queue(queue_name, &block) with_channel do |channel| channel.queue(queue_name, ) do |queue| yield queue end end end |