Class: Tackle::Worker
- Inherits:
-
Object
- Object
- Tackle::Worker
- Includes:
- TackleLogger
- Defined in:
- lib/tackle/worker.rb
Instance Attribute Summary collapse
-
#rabbit ⇒ Object
readonly
Returns the value of attribute rabbit.
Instance Method Summary collapse
-
#initialize(exchange_name, routing_key, queue_name, options = {}) ⇒ Worker
constructor
Initializes now worker.
- #process_message(delivery_info, properties, payload, block) ⇒ Object
-
#subscribe(&block) ⇒ Object
Subscribes for message deliveries.
Methods included from TackleLogger
Constructor Details
#initialize(exchange_name, routing_key, queue_name, options = {}) ⇒ Worker
Initializes now worker
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/tackle/worker.rb', line 23 def initialize(exchange_name, routing_key, queue_name, = {}) @queue_name = queue_name @amqp_url = [:url] || "amqp://localhost:5672" @retry_limit = [:retry_limit] || 8 @retry_delay = ([:retry_delay] || 30) * 1000 #ms @logger = [:logger] || Logger.new(STDOUT) @rabbit = Tackle::Rabbit.new(exchange_name, routing_key, @queue_name, @amqp_url, @retry_delay, @logger) @rabbit.connect @rabbit.on_uncaught_exception([:on_uncaught_exception]) if [:on_uncaught_exception] end |
Instance Attribute Details
#rabbit ⇒ Object (readonly)
Returns the value of attribute rabbit.
8 9 10 |
# File 'lib/tackle/worker.rb', line 8 def rabbit @rabbit end |
Instance Method Details
#process_message(delivery_info, properties, payload, block) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/tackle/worker.rb', line 64 def (delivery_info, properties, payload, block) begin tackle_log("Calling message processor...") block.call(payload) @rabbit.channel.ack(delivery_info.delivery_tag) tackle_log("Successfully processed message") rescue Exception => ex tackle_log("Failed to process message. Received exception '#{ex}'") try_again = Tackle::DelayedRetry.new(@rabbit.dead_letter_queue, properties, payload, @retry_limit, @logger) try_again.schedule_retry tackle_log("Sending negative acknowledgement to source queue...") @rabbit.channel.nack(delivery_info.delivery_tag) tackle_log("Negative acknowledgement sent") raise ex end end |
#subscribe(&block) ⇒ Object
Subscribes for message deliveries
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/tackle/worker.rb', line 46 def subscribe(&block) tackle_log("Subscribing to queue '#{@queue_name}'...") rabbit.queue.subscribe(:manual_ack => true, :block => true) do |delivery_info, properties, payload| tackle_log("Received message. Processing...") (delivery_info, properties, payload, block) tackle_log("Done with processing message.") end rescue Interrupt => _ rabbit.close rescue StandardError => ex tackle_log("An exception occured message='#{ex.message}'") raise ex end |