Module: Minion
- Extended by:
- Minion
- Included in:
- Minion
- Defined in:
- lib/mb-minion/handler.rb,
lib/mb-minion.rb,
lib/mb-minion/message.rb,
lib/mb-minion/version.rb
Overview
:nodoc
Defined Under Namespace
Constant Summary collapse
- VERSION =
"0.2.0"
Instance Method Summary collapse
-
#alert(exception) ⇒ Object
Handle when an error gets raised.
-
#config ⇒ Hash
Gets the hash of configuration options.
-
#enqueue(queues, data = {}) ⇒ Object
Add content to the supplied queue or queues.
-
#error(&block) ⇒ Object
Define an optional method of changing the ways errors get handled.
-
#execute_handlers ⇒ Object
Runs each of the handlers.
-
#info(message) ⇒ Object
Log the supplied information message.
-
#job(queue, options = {}, &block) ⇒ Object
Sets up a subscriber to a queue to process jobs.
-
#logger(&block) ⇒ Object
Define an optional method of changing the ways logging is handled.
-
#message_count(queue) ⇒ Fixnum
Get the message count for a specific queue.
-
#run ⇒ Object
Runs the minion subscribers.
-
#url ⇒ String
Get the url for the amqp server.
-
#url=(url) ⇒ String
Set the url to the amqp server.
Instance Method Details
#alert(exception) ⇒ Object
Handle when an error gets raised.
22 23 24 25 |
# File 'lib/mb-minion.rb', line 22 def alert(exception) raise(exception) unless error_handling error_handling.call(exception) end |
#config ⇒ Hash
Gets the hash of configuration options.
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/mb-minion.rb', line 33 def config uri = URI.parse(url) { :vhost => uri.path, :host => uri.host, :user => uri.user, :port => (uri.port || 5672), :pass => uri.password } rescue Object => e raise("invalid AMQP_URL: #{uri.inspect} (#{e})") end |
#enqueue(queues, data = {}) ⇒ Object
Add content to the supplied queue or queues. The hash will get converted to JSON and placed on the queue as the JSON string.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/mb-minion.rb', line 59 def enqueue(queues, data = {}) raise "Cannot enqueue an empty or nil name" if queues.nil? || queues.empty? # Wrap raw data when we receive it data = {'content' => data} unless data.class == Hash && data['content'] if queues.respond_to? :shift queue = queues.shift data['callbacks'] = queues else queue = queues end # @todo: Durran: Any multi-byte character in the JSON causes a bad_payload # error on the rabbitmq side. It seems a fix in the old amqp gem # regressed in the new fork. encoded = JSON.dump(data).force_encoding("ISO-8859-1") Minion.info("Send: #{queue}:#{encoded}") connect do |bunny| q = bunny.queue(queue, :durable => true, :auto_delete => false) e = bunny.exchange('') # Connect to default exchange e.publish(encoded, :key => q.name) end end |
#error(&block) ⇒ Object
Define an optional method of changing the ways errors get handled.
103 104 105 |
# File 'lib/mb-minion.rb', line 103 def error(&block) @@error_handling = block end |
#execute_handlers ⇒ Object
Runs each of the handlers.
111 112 113 |
# File 'lib/mb-minion.rb', line 111 def execute_handlers @@handlers.each { |handler| handler.execute } end |
#info(message) ⇒ Object
Log the supplied information message.
121 122 123 |
# File 'lib/mb-minion.rb', line 121 def info() logging.call() end |
#job(queue, options = {}, &block) ⇒ Object
Sets up a subscriber to a queue to process jobs.
137 138 139 140 141 142 143 |
# File 'lib/mb-minion.rb', line 137 def job(queue, = {}, &block) Minion::Handler.new(queue, block, ).tap do |handler| @@handlers ||= [] at_exit { Minion.run } if @@handlers.size == 0 @@handlers << handler end end |
#logger(&block) ⇒ Object
Define an optional method of changing the ways logging is handled.
153 154 155 |
# File 'lib/mb-minion.rb', line 153 def logger(&block) @@logging = block end |
#message_count(queue) ⇒ Fixnum
Get the message count for a specific queue
89 90 91 92 93 |
# File 'lib/mb-minion.rb', line 89 def (queue) connect do |bunny| return bunny.queue(queue, :durable => true, :auto_delete => false). end end |
#run ⇒ Object
Runs the minion subscribers.
161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/mb-minion.rb', line 161 def run Minion.info("Starting minion") Signal.trap("INT") { AMQP.stop { EM.stop } } Signal.trap("TERM") { AMQP.stop { EM.stop } } EM.run do AMQP.start(config) do AMQP::Channel.new.prefetch(1) execute_handlers end end end |
#url ⇒ String
Get the url for the amqp server.
180 181 182 |
# File 'lib/mb-minion.rb', line 180 def url @@url ||= (ENV["AMQP_URL"] || "amqp://guest:guest@localhost/") end |
#url=(url) ⇒ String
Set the url to the amqp server.
190 191 192 |
# File 'lib/mb-minion.rb', line 190 def url=(url) @@url = url end |