Class: Smith::Messaging::Requeue

Inherits:
Object
  • Object
show all
Includes:
Logger
Defined in:
lib/smith/messaging/requeue.rb

Instance Method Summary collapse

Methods included from Logger

included

Constructor Details

#initialize(message, metadata, opts = {}) ⇒ Requeue

Returns a new instance of Requeue.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/smith/messaging/requeue.rb', line 8

def initialize(message, , opts={})
  @message = message
  @metadata = 
  @queue = opts[:queue]
  @exchange = opts[:exchange]

  @count = opts[:count] || 3
  @delay = opts[:delay] || 5
  @strategy = opts[:strategy] || :linear

  @on_requeue = opts[:on_requeue] || ->(count, total_count, cumulative_delay) {
    logger.info { "Requeuing (#{@strategy}) message on queue: #{@queue.name}, count: #{count} of #{total_count}." }
  }

  @on_requeue_limit = opts[:on_requeue_limit] || ->(message, count, total_count, cumulative_delay) {
    logger.info { "Not attempting any more requeues, requeue limit reached: #{total_count} for queue: #{@queue.name}, cummulative delay: #{cumulative_delay}s." }
  }
end

Instance Method Details

#requeueObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/smith/messaging/requeue.rb', line 27

def requeue
  requeue_with_strategy do
    opts = @queue.opts.clone.tap do |o|
      o.delete(:queue)
      o.delete(:exchange)

      o[:headers] = increment_requeue_count
      o[:routing_key] = @queue.name
      o[:type] = @metadata.type
    end

    logger.verbose { "Requeuing to: #{@queue.name} [options]: #{opts}" }
    logger.verbose { "Requeuing to: #{@queue.name} [message]: #{@message.to_hash}" }

    @exchange.publish(@message, opts)
  end
end