Module: Stooge::WorkQueue

Included in:
Stooge
Defined in:
lib/stooge/work_queue.rb

Instance Method Summary collapse

Instance Method Details

#aenqueue(queue_name, data, headers = {}) ⇒ EM::DefaultDeferrable

Asynchrounous version of enqueue. Yields when the job has been put onto the queue, if a block is given.

Parameters:

  • queue_name (String)

    The name of a work queue or an array of names that the job will be sent through in sequential order (a workflow).

  • data (Object)

    The data to send as input to the job. Needs to be JSON serializable.

  • headers (Hash) (defaults to: {})

    AMQP headers to include in the job that gets pushed. Needs to be a hash with key/value pairs.

Returns:

  • (EM::DefaultDeferrable)

    Returns an EM::DefaultDeferrable object.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/stooge/work_queue.rb', line 37

def aenqueue(queue_name, data, headers = {})
  deferrable = EM::DefaultDeferrable.new
  with_channel do |channel|
    options = {
      :routing_key => queue_name,
      :mandatory => true,
      :persistent => true,
      :content_type => 'application/json',
      :headers => headers
    }
    channel.default_exchange.publish(MultiJson.encode(data), options) do
      Stooge.log("enqueue: #{queue_name}(#{data})")
      yield if block_given?
      deferrable.set_deferred_status :succeeded
    end
  end
  deferrable
end

#enqueue(queue_name, data, headers = {}) ⇒ Object

Push a job onto a named queue.

Example:

Stooge.enqueue('example.work')

Parameters:

  • queue_name (String)

    The name of a work queue or an array of names that the job will be sent through in sequential order (a workflow).

  • data (Object)

    The data to send as input to the job. Needs to be JSON serializable.

  • headers (Hash) (defaults to: {})

    AMQP headers to include in the job that gets pushed. Needs to be a hash with key/value pairs.



19
20
21
# File 'lib/stooge/work_queue.rb', line 19

def enqueue(queue_name, data, headers = {})
  EM::Synchrony.sync(aenqueue(queue_name, data, headers))
end

#job(queue, &blk) ⇒ Object

Creates a job handler for a named queue.

Example:

Stooge.job('example.work') do |args,headers|
  # Do the work here...
end

Parameters:

  • queue (String)

    The name of the work queue.

  • blk (Proc)

    a Proc that processes the jobs.



68
69
70
71
72
# File 'lib/stooge/work_queue.rb', line 68

def job(queue, &blk)
  handler = Stooge::Handler.new(queue, :queue_options => { :durable => true })
  handler.block = blk
  add_handler(handler)
end