Class: Tackle::Worker

Inherits:
Object
  • Object
show all
Includes:
TackleLogger
Defined in:
lib/tackle/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from TackleLogger

#tackle_log

Constructor Details

#initialize(exchange_name, routing_key, queue_name, options = {}) ⇒ Worker

Initializes now worker

Options Hash (options):

  • :url (String)

    AMQP connection url. Defaults to ‘localhost’

  • :retry_limit (Integer)

    Number of times message processing should be retried in case of an exception.

  • :retry_delay (Integer)

    Delay between processing retries. Dafaults to 30 seconds. Cannot be changed without deleting or renameing a queue.

  • :logger (Logger)

    Logger instance. Defaults to standard output.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/tackle/worker.rb', line 23

def initialize(exchange_name, routing_key, queue_name, options = {})
  @queue_name = queue_name
  @amqp_url = options[:url] || "amqp://localhost:5672"
  @retry_limit = options[:retry_limit] || 8
  @retry_delay = (options[:retry_delay] || 30) * 1000 #ms
  @logger = options[:logger] || Logger.new(STDOUT)

  @rabbit = Tackle::Rabbit.new(exchange_name,
                               routing_key,
                               @queue_name,
                               @amqp_url,
                               @retry_delay,
                               @logger)

  @rabbit.connect
  @rabbit.on_uncaught_exception(options[:on_uncaught_exception]) if options[:on_uncaught_exception]
end

Instance Attribute Details

#rabbitObject (readonly)

Returns the value of attribute rabbit.



8
9
10
# File 'lib/tackle/worker.rb', line 8

def rabbit
  @rabbit
end

Instance Method Details

#process_message(delivery_info, properties, payload, block) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/tackle/worker.rb', line 64

def process_message(delivery_info, properties, payload, block)
  begin
    tackle_log("Calling message processor...")
    block.call(payload)
    @rabbit.channel.ack(delivery_info.delivery_tag)
    tackle_log("Successfully processed message")
  rescue Exception => ex
    tackle_log("Failed to process message. Received exception '#{ex}'")
    try_again = Tackle::DelayedRetry.new(@rabbit.dead_letter_queue,
                                         properties,
                                         payload,
                                         @retry_limit,
                                         @logger)
    try_again.schedule_retry
    tackle_log("Sending negative acknowledgement to source queue...")
    @rabbit.channel.nack(delivery_info.delivery_tag)
    tackle_log("Negative acknowledgement sent")

    raise ex
  end
end

#subscribe(&block) ⇒ Object

Subscribes for message deliveries



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/tackle/worker.rb', line 46

def subscribe(&block)
  tackle_log("Subscribing to queue '#{@queue_name}'...")
  rabbit.queue.subscribe(:manual_ack => true,
                         :block => true) do |delivery_info, properties, payload|

    tackle_log("Received message. Processing...")
    process_message(delivery_info, properties, payload, block)
    tackle_log("Done with processing message.")

  end
rescue Interrupt => _
  rabbit.close
rescue StandardError => ex
  tackle_log("An exception occured message='#{ex.message}'")

  raise ex
end