Module: Stooge

Extended by:
Stooge, WorkQueue
Included in:
Stooge
Defined in:
lib/stooge.rb,
lib/stooge/worker.rb,
lib/stooge/handler.rb,
lib/stooge/version.rb,
lib/stooge/work_queue.rb

Defined Under Namespace

Modules: WorkQueue Classes: Handler, Worker

Constant Summary collapse

VERSION =
"0.1.4"
@@connection =
nil
@@channel =
nil
@@handlers =
[]
@@logger =
Proc.new do |msg|
  puts "#{Time.now} :stooge: #{msg}"
  STDOUT.flush
end
@@error_handler =
Proc.new do |exception, handler, payload, headers|
  Stooge.log "#{handler.queue_name} failed: #{exception.inspect}"
  raise exception
end

Instance Method Summary collapse

Methods included from WorkQueue

aenqueue, enqueue, job

Instance Method Details

#add_handler(handler) ⇒ Object

Add a new job handler. Used by Stooge::WorkQueue#job.

Parameters:



118
119
120
# File 'lib/stooge.rb', line 118

def add_handler(handler)
  @@handlers << handler
end

#amqp_urlString

Will return a URL to the AMQP broker to use. Will get this from the ENV variable AMQP_URL if present, or use the default which is “amqp://guest:guest@localhost/” (the same as the default for a local RabbitMQ install).

Returns:

  • (String)

    a URL to an AMQP broker.



34
35
36
# File 'lib/stooge.rb', line 34

def amqp_url
  @@amqp_url ||= ENV["AMQP_URL"] || "amqp://guest:guest@localhost/"
end

#amqp_url=(url) ⇒ Object

Set the URL to the AMQP broker to use. The format of the URL should be “amqp://username:password@hostname/vhost” (vhost and username/password is optional).



43
44
45
# File 'lib/stooge.rb', line 43

def amqp_url=(url)
  @@amqp_url = url
end

#clear_handlersObject

Remove all job handlers. Mainly useful in tests to create a clean slate for each test case.



136
137
138
# File 'lib/stooge.rb', line 136

def clear_handlers
  @@handlers = []
end

#error {|exception, handler, payload, headers| ... } ⇒ Object

Configure a global error handler for Stooge jobs. The block gets yielded when a job handler raises an exception. The default error handler simply logs the error and re-raises the exception. You can use this to for example re-queue a failed job or send email notifications when a job fails.

If you don’t raise an exception in the error handler the job will be acked with the broker and the broker will consider the job done and remove it from the queue. To make sure the job is not lost you can simply re-raise the same exception in your custom handler:

Stooge.error do |exception, handler, payload, headers|
  raise exception
end

Parameters:

  • block (Proc)

    a Proc to yield when an error happens.

Yield Parameters:

  • exception (Exception)

    the exception object raised.

  • handler (Stooge::Handler)

    the handler that failed.

  • payload (Object)

    the message payload that was processed when the handler failed.

  • headers (Hash)

    the message headers



91
92
93
# File 'lib/stooge.rb', line 91

def error(&blk)
  @@error_handler = blk
end

#error_handlerProc

The global error handler.

Returns:

  • (Proc)

    the error handler block.



100
101
102
# File 'lib/stooge.rb', line 100

def error_handler
  @@error_handler
end

#has_handlers?Boolean

Are there any job handlers defined? Used by Worker to check if it should start a worker process.

Returns:

  • (Boolean)

    true or false



128
129
130
# File 'lib/stooge.rb', line 128

def has_handlers?
  @@handlers.empty? == false
end

#log(msg) ⇒ Object

Log message using the Stooge logger.

Parameters:

  • msg (String)

    the message to log.



52
53
54
# File 'lib/stooge.rb', line 52

def log(msg)
  @@logger.call(msg)
end

#logger {|msg| ... } ⇒ Object

Configure a custom logger for Stooge. The block gets yielded with the log message. You can do whatever you want with it. The default behaviour is to simply output to stdout.

Parameters:

  • block (Proc)

    a Proc to yield when a message needs to be logged.

Yield Parameters:

  • msg (String)

    the message to log.



64
65
66
# File 'lib/stooge.rb', line 64

def logger(&blk)
  @@logger = blk
end

#run_handler(queue_name, data, headers = {}) ⇒ Object

Execute a handler block without going through AMQP at all. This is a helper method for use in tests. It allows you to test the business logic in a handler block without having to mess with the details of how Stooge works internally.

Examples

Stooge.run_handler('example.work', :foo => 'bar').should == 42

Parameters:

  • queue_name (String)

    the name of the handler to run

  • data (Object)

    message data to send to the handler as arguments

  • headers (Hash) (defaults to: {})

    optional headers to send as second argument to the handler block

Returns:

  • the return value of the handler block



157
158
159
160
161
162
163
# File 'lib/stooge.rb', line 157

def run_handler(queue_name, data, headers = {})
  @@handlers.each do |handler|
    if handler.queue_name == queue_name
      return handler.run(MultiJson.encode(data), 'application/json', headers)
    end
  end
end

#start!Object

Start a Stooge worker that consumes messages from the AMQP broker.



209
210
211
212
213
214
215
# File 'lib/stooge.rb', line 209

def start!
  EM.synchrony do
    with_channel do |channel|
      start_handlers(channel)
    end
  end
end

#start_handlers(channel) ⇒ Object

Start listening to for new jobs on all queues using the specified channel.

Parameters:

  • channel (AMQP::Channel)

    an open AMQP channel



109
110
111
# File 'lib/stooge.rb', line 109

def start_handlers(channel)
  @@handlers.each { |h| h.start(channel) }
end

#stop!Object

Will stop and deactivate Stooge.



218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/stooge.rb', line 218

def stop!
  EM.next_tick do
    with_channel do |channel|
      channel.close
    end
    @@channel = nil
    with_connection do |connection|
      connection.close
    end
    @@connection = nil
    EM.stop
  end
end

#with_channel(&block) ⇒ Object

Will yield an open and ready channel.

Parameters:

  • block (Proc)

    a Proc to yield an open and ready channel to.



194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/stooge.rb', line 194

def with_channel(&block)
  with_connection do |connection|
    if @@channel.nil? || @@channel.status == :closed
      @@channel = AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, channel_options)
      @@channel.on_error do |ch, channel_close|
        Stooge.log "channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
      end
    end
    @@channel.once_open do
      yield @@channel
    end
  end
end

#with_connection(&block) ⇒ Object

Will yield an open and ready connection.

Parameters:

  • block (Proc)

    a Proc to yield an open and ready connection to.



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/stooge.rb', line 170

def with_connection(&block)
  if @@connection.nil? || @@connection.status == :closed
    @@connection = AMQP.connect(amqp_config)
    @@connection.on_tcp_connection_loss do
      Stooge.log "[network failure] trying to reconnect..."
      @@connection.reconnect
    end
    @@connection.on_recovery do
      Stooge.log "connection with broker recovered"
    end
    @@connection.on_error do |ch, connection_close|
       raise connection_close.reply_text
     end
  end
  @@connection.on_open do
    yield @@connection
  end
end