Class: Rabbitek::Retryer

Inherits:
Object
  • Object
show all
Includes:
Loggable
Defined in:
lib/rabbitek/server/retryer.rb

Overview

A service to retry a failed message consuming

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#debug, #error, #info, #logger, logger, #warn

Constructor Details

#initialize(consumer, message) ⇒ Retryer

Returns a new instance of Retryer.



13
14
15
16
17
18
19
20
21
22
# File 'lib/rabbitek/server/retryer.rb', line 13

def initialize(consumer, message)
  @consumer = consumer
  @message = message

  headers = message.properties.headers || {}
  dead_headers = headers.fetch('x-death', []).last || {}

  @retry_count = headers.fetch('x-retry-count', 0)
  @expiration = dead_headers.fetch('original-expiration', 1000).to_i
end

Class Method Details

.call(*args) ⇒ Object



9
10
11
# File 'lib/rabbitek/server/retryer.rb', line 9

def self.call(*args)
  new(*args).call
end

Instance Method Details

#callObject



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rabbitek/server/retryer.rb', line 24

def call
  warn_log

  # acknowledge existing message
  @consumer.ack!(@message.delivery_info)

  if @retry_count <= 25
    # Set the new expiration with an increasing factor
    @expiration *= 1.5

    # Publish to retry queue with new expiration
    publish_to_retry_queue
  else
    publish_to_dead_queue
  end
end

#publish_to_dead_queueObject



60
61
62
# File 'lib/rabbitek/server/retryer.rb', line 60

def publish_to_dead_queue
  # TODO: implement dead queue
end

#publish_to_retry_queueObject



51
52
53
54
55
56
57
58
# File 'lib/rabbitek/server/retryer.rb', line 51

def publish_to_retry_queue
  @consumer.retry_or_delayed_exchange.publish(
    @message.raw_payload,
    expiration: @expiration.to_i,
    routing_key: @message.delivery_info.routing_key,
    headers: { 'x-retry-count': @retry_count + 1, 'x-dead-letter-routing-key': @message.delivery_info.routing_key }
  )
end

#warn_logObject



41
42
43
44
45
46
47
48
49
# File 'lib/rabbitek/server/retryer.rb', line 41

def warn_log
  warn(
    message: 'Failure!',
    retry_count: @retry_count,
    expiration: @expiration,
    consumer: @consumer.class.to_s,
    jid: @consumer.jid
  )
end