Module: Stooge
- Included in:
- Stooge
- Defined in:
- lib/stooge.rb,
lib/stooge/worker.rb,
lib/stooge/handler.rb,
lib/stooge/version.rb,
lib/stooge/work_queue.rb
Defined Under Namespace
Modules: WorkQueue Classes: Handler, Worker
Constant Summary collapse
- VERSION =
"0.1.4"
- @@connection =
nil
- @@channel =
nil
- @@handlers =
[]
- @@logger =
Proc.new do |msg| puts "#{Time.now} :stooge: #{msg}" STDOUT.flush end
- @@error_handler =
Proc.new do |exception, handler, payload, headers| Stooge.log "#{handler.queue_name} failed: #{exception.inspect}" raise exception end
Instance Method Summary collapse
-
#add_handler(handler) ⇒ Object
Add a new job handler.
-
#amqp_url ⇒ String
Will return a URL to the AMQP broker to use.
-
#amqp_url=(url) ⇒ Object
Set the URL to the AMQP broker to use.
-
#clear_handlers ⇒ Object
Remove all job handlers.
-
#error {|exception, handler, payload, headers| ... } ⇒ Object
Configure a global error handler for Stooge jobs.
-
#error_handler ⇒ Proc
The global error handler.
-
#has_handlers? ⇒ Boolean
Are there any job handlers defined? Used by Worker to check if it should start a worker process.
-
#log(msg) ⇒ Object
Log message using the Stooge logger.
-
#logger {|msg| ... } ⇒ Object
Configure a custom logger for Stooge.
-
#run_handler(queue_name, data, headers = {}) ⇒ Object
Execute a handler block without going through AMQP at all.
-
#start! ⇒ Object
Start a Stooge worker that consumes messages from the AMQP broker.
-
#start_handlers(channel) ⇒ Object
Start listening to for new jobs on all queues using the specified channel.
-
#stop! ⇒ Object
Will stop and deactivate Stooge.
-
#with_channel(&block) ⇒ Object
Will yield an open and ready channel.
-
#with_connection(&block) ⇒ Object
Will yield an open and ready connection.
Methods included from WorkQueue
Instance Method Details
#add_handler(handler) ⇒ Object
Add a new job handler. Used by Stooge::WorkQueue#job.
118 119 120 |
# File 'lib/stooge.rb', line 118 def add_handler(handler) @@handlers << handler end |
#amqp_url ⇒ String
Will return a URL to the AMQP broker to use. Will get this from the ENV
variable AMQP_URL
if present, or use the default which is “amqp://guest:guest@localhost/” (the same as the default for a local RabbitMQ install).
34 35 36 |
# File 'lib/stooge.rb', line 34 def amqp_url @@amqp_url ||= ENV["AMQP_URL"] || "amqp://guest:guest@localhost/" end |
#amqp_url=(url) ⇒ Object
Set the URL to the AMQP broker to use. The format of the URL should be “amqp://username:password@hostname/vhost” (vhost and username/password is optional).
43 44 45 |
# File 'lib/stooge.rb', line 43 def amqp_url=(url) @@amqp_url = url end |
#clear_handlers ⇒ Object
Remove all job handlers. Mainly useful in tests to create a clean slate for each test case.
136 137 138 |
# File 'lib/stooge.rb', line 136 def clear_handlers @@handlers = [] end |
#error {|exception, handler, payload, headers| ... } ⇒ Object
Configure a global error handler for Stooge jobs. The block gets yielded when a job handler raises an exception. The default error handler simply logs the error and re-raises the exception. You can use this to for example re-queue a failed job or send email notifications when a job fails.
If you don’t raise an exception in the error handler the job will be acked with the broker and the broker will consider the job done and remove it from the queue. To make sure the job is not lost you can simply re-raise the same exception in your custom handler:
Stooge.error do |exception, handler, payload, headers|
raise exception
end
91 92 93 |
# File 'lib/stooge.rb', line 91 def error(&blk) @@error_handler = blk end |
#error_handler ⇒ Proc
The global error handler.
100 101 102 |
# File 'lib/stooge.rb', line 100 def error_handler @@error_handler end |
#has_handlers? ⇒ Boolean
Are there any job handlers defined? Used by Worker to check if it should start a worker process.
128 129 130 |
# File 'lib/stooge.rb', line 128 def has_handlers? @@handlers.empty? == false end |
#log(msg) ⇒ Object
Log message using the Stooge logger.
52 53 54 |
# File 'lib/stooge.rb', line 52 def log(msg) @@logger.call(msg) end |
#logger {|msg| ... } ⇒ Object
Configure a custom logger for Stooge. The block gets yielded with the log message. You can do whatever you want with it. The default behaviour is to simply output to stdout.
64 65 66 |
# File 'lib/stooge.rb', line 64 def logger(&blk) @@logger = blk end |
#run_handler(queue_name, data, headers = {}) ⇒ Object
Execute a handler block without going through AMQP at all. This is a helper method for use in tests. It allows you to test the business logic in a handler block without having to mess with the details of how Stooge works internally.
Examples
Stooge.run_handler('example.work', :foo => 'bar').should == 42
157 158 159 160 161 162 163 |
# File 'lib/stooge.rb', line 157 def run_handler(queue_name, data, headers = {}) @@handlers.each do |handler| if handler.queue_name == queue_name return handler.run(MultiJson.encode(data), 'application/json', headers) end end end |
#start! ⇒ Object
Start a Stooge worker that consumes messages from the AMQP broker.
209 210 211 212 213 214 215 |
# File 'lib/stooge.rb', line 209 def start! EM.synchrony do with_channel do |channel| start_handlers(channel) end end end |
#start_handlers(channel) ⇒ Object
Start listening to for new jobs on all queues using the specified channel.
109 110 111 |
# File 'lib/stooge.rb', line 109 def start_handlers(channel) @@handlers.each { |h| h.start(channel) } end |
#stop! ⇒ Object
Will stop and deactivate Stooge.
218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/stooge.rb', line 218 def stop! EM.next_tick do with_channel do |channel| channel.close end @@channel = nil with_connection do |connection| connection.close end @@connection = nil EM.stop end end |
#with_channel(&block) ⇒ Object
Will yield an open and ready channel.
194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/stooge.rb', line 194 def with_channel(&block) with_connection do |connection| if @@channel.nil? || @@channel.status == :closed @@channel = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, ) @@channel.on_error do |ch, channel_close| Stooge.log "channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" end end @@channel.once_open do yield @@channel end end end |
#with_connection(&block) ⇒ Object
Will yield an open and ready connection.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/stooge.rb', line 170 def with_connection(&block) if @@connection.nil? || @@connection.status == :closed @@connection = AMQP.connect(amqp_config) @@connection.on_tcp_connection_loss do Stooge.log "[network failure] trying to reconnect..." @@connection.reconnect end @@connection.on_recovery do Stooge.log "connection with broker recovered" end @@connection.on_error do |ch, connection_close| raise connection_close.reply_text end end @@connection.on_open do yield @@connection end end |