Module: Minion

Extended by:
Minion
Included in:
Minion
Defined in:
lib/mb-minion/handler.rb,
lib/mb-minion.rb,
lib/mb-minion/message.rb,
lib/mb-minion/version.rb

Overview

:nodoc

Defined Under Namespace

Classes: Handler, Message

Constant Summary collapse

VERSION =
"0.2.0"

Instance Method Summary collapse

Instance Method Details

#alert(exception) ⇒ Object

Handle when an error gets raised.

Examples:

Handle the error.

Minion.error(exception)

Parameters:

  • exception (Exception)

    The error that was raised.

Returns:

  • (Object)

    The output og the error handler block.



22
23
24
25
# File 'lib/mb-minion.rb', line 22

def alert(exception)
  raise(exception) unless error_handling
  error_handling.call(exception)
end

#configHash

Gets the hash of configuration options.

Examples:

Get the configuration hash.

Minion.config

Returns:

  • (Hash)

    The configuration options.



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/mb-minion.rb', line 33

def config
  uri = URI.parse(url)
  {
    :vhost => uri.path,
    :host => uri.host,
    :user => uri.user,
    :port => (uri.port || 5672),
    :pass => uri.password
  }
rescue Object => e
  raise("invalid AMQP_URL: #{uri.inspect} (#{e})")
end

#enqueue(queues, data = {}) ⇒ Object

Add content to the supplied queue or queues. The hash will get converted to JSON and placed on the queue as the JSON string.

Examples:

Place data on a single queue.

Minion.enqueue("queue.name", { field: "value" })

Place data on multiple queues.

Minion.enqueue([ "queue.first", "queue.second" ], { field: "value" })

Parameters:

  • name (String, Array<String>)

    The name or names of the queues.

  • data (Hash) (defaults to: {})

    The payload to send.

Raises:

  • (RuntimeError)

    If the name is nil or empty.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/mb-minion.rb', line 59

def enqueue(queues, data = {})
  raise "Cannot enqueue an empty or nil name" if queues.nil? || queues.empty?
  # Wrap raw data when we receive it
  data = {'content' => data} unless data.class == Hash && data['content']
  if queues.respond_to? :shift
    queue = queues.shift
    data['callbacks'] = queues
  else
    queue = queues
  end
  
  # @todo: Durran: Any multi-byte character in the JSON causes a bad_payload
  #   error on the rabbitmq side. It seems a fix in the old amqp gem
  #   regressed in the new fork.
  encoded = JSON.dump(data).force_encoding("ISO-8859-1")
  
  Minion.info("Send: #{queue}:#{encoded}")
  connect do |bunny|
    q = bunny.queue(queue, :durable => true, :auto_delete => false)
    e = bunny.exchange('') # Connect to default exchange
    e.publish(encoded, :key => q.name) 
  end
end

#error(&block) ⇒ Object

Define an optional method of changing the ways errors get handled.

Examples:

Define a custom error handler.

Minion.error do |e|
  puts "I got an error - #{e.message}"
end

Parameters:

  • block (Proc)

    The block that will handle the error.



103
104
105
# File 'lib/mb-minion.rb', line 103

def error(&block)
  @@error_handling = block
end

#execute_handlersObject

Runs each of the handlers.

Examples:

Check all handlers.

Minion.check_handlers


111
112
113
# File 'lib/mb-minion.rb', line 111

def execute_handlers
  @@handlers.each { |handler| handler.execute }
end

#info(message) ⇒ Object

Log the supplied information message.

Examples:

Log the message.

Minion.info("something happened")

Returns:

  • (Object)

    The output of the logging block.



121
122
123
# File 'lib/mb-minion.rb', line 121

def info(message)
  logging.call(message)
end

#job(queue, options = {}, &block) ⇒ Object

Sets up a subscriber to a queue to process jobs.

Examples:

Set up the subscriber.

Minion.job "my.queue.name" do |attributes|
  puts "Here's the message data: #{attributes"
end

Parameters:

  • queue (String)

    The queue to subscribe to.

  • options (Hash) (defaults to: {})

    Options for the subscriber.

Options Hash (options):

  • :when (lambda)

    Conditionally process the job.

  • :ack (boolean)

    Should we automatically ack the message?



137
138
139
140
141
142
143
# File 'lib/mb-minion.rb', line 137

def job(queue, options = {}, &block)
  Minion::Handler.new(queue, block, options).tap do |handler|
    @@handlers ||= []
    at_exit { Minion.run } if @@handlers.size == 0
    @@handlers << handler
  end
end

#logger(&block) ⇒ Object

Define an optional method of changing the ways logging is handled.

Examples:

Define a custom logger.

Minion.logger do |message|
  puts "Something did something - #{message}"
end

Parameters:

  • block (Proc)

    The block that will handle the logging.



153
154
155
# File 'lib/mb-minion.rb', line 153

def logger(&block)
  @@logging = block
end

#message_count(queue) ⇒ Fixnum

Get the message count for a specific queue

Examples:

Get the message count for queue ‘minion.test’.

Minion.message_count('minion.test')

Returns:

  • (Fixnum)

    the number of messages



89
90
91
92
93
# File 'lib/mb-minion.rb', line 89

def message_count(queue)
  connect do |bunny|
    return bunny.queue(queue, :durable => true, :auto_delete => false).message_count
  end
end

#runObject

Runs the minion subscribers.

Examples:

Run the subscribers.

Minion.run


161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/mb-minion.rb', line 161

def run
  Minion.info("Starting minion")
  Signal.trap("INT") { AMQP.stop { EM.stop } }
  Signal.trap("TERM") { AMQP.stop { EM.stop } }

  EM.run do
    AMQP.start(config) do
      AMQP::Channel.new.prefetch(1)
      execute_handlers
    end
  end
end

#urlString

Get the url for the amqp server.

Examples:

Get the url.

Minion.url

Returns:



180
181
182
# File 'lib/mb-minion.rb', line 180

def url
  @@url ||= (ENV["AMQP_URL"] || "amqp://guest:guest@localhost/")
end

#url=(url) ⇒ String

Set the url to the amqp server.

Examples:

Set the url.

Minion.url = "amqp://user:password@host:port/vhost"

Returns:



190
191
192
# File 'lib/mb-minion.rb', line 190

def url=(url)
  @@url = url
end