Module: ActionSubscriber::MessageRetry
- Defined in:
- lib/action_subscriber/message_retry.rb
Constant Summary collapse
- SCHEDULE =
{ 2 => 100, 3 => 500, 4 => 2_500, 5 => 12_500, 6 => 62_500, 7 => 312_500, 8 => 1_562_500, 9 => 7_812_500, 10 => 39_062_500, }.freeze
Class Method Summary collapse
-
.get_last_attempt_number(env) ⇒ Object
Private Implementation.
- .redeliver_message_with_backoff(env, backoff_schedule = SCHEDULE) ⇒ Object
- .retry_headers(env, attempt) ⇒ Object
- .retry_options(env, attempt, retry_queue_name) ⇒ Object
- .with_exchange(env, ttl, retry_queue_name) ⇒ Object
Class Method Details
.get_last_attempt_number(env) ⇒ Object
Private Implementation
26 27 28 29 |
# File 'lib/action_subscriber/message_retry.rb', line 26 def self.get_last_attempt_number(env) attempt_header = env.headers.fetch("as-attempt", "1") attempt_header.to_i end |
.redeliver_message_with_backoff(env, backoff_schedule = SCHEDULE) ⇒ Object
15 16 17 18 19 20 21 22 23 |
# File 'lib/action_subscriber/message_retry.rb', line 15 def self.(env, backoff_schedule = SCHEDULE) next_attempt = get_last_attempt_number(env) + 1 ttl = backoff_schedule[next_attempt] return unless ttl retry_queue_name = "#{env.queue}.retry_#{ttl}" with_exchange(env, ttl, retry_queue_name) do |exchange| exchange.publish(env.encoded_payload, (env, next_attempt, retry_queue_name)) end end |
.retry_headers(env, attempt) ⇒ Object
31 32 33 34 35 36 37 38 |
# File 'lib/action_subscriber/message_retry.rb', line 31 def self.retry_headers(env, attempt) env.headers.reject do |key, val| key == "x-death" end.merge({ "as-attempt" => attempt.to_s, "x-dead-letter-routing-key" => env.queue, }) end |
.retry_options(env, attempt, retry_queue_name) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/action_subscriber/message_retry.rb', line 40 def self.(env, attempt, retry_queue_name) { :content_type => env.content_type, :routing_key => retry_queue_name, :headers => retry_headers(env, attempt), } end |
.with_exchange(env, ttl, retry_queue_name) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/action_subscriber/message_retry.rb', line 48 def self.with_exchange(env, ttl, retry_queue_name) channel = RabbitConnection.with_connection{|connection| connection.create_channel} begin channel.confirm_select # an empty string is the default exchange [see bunny docs](http://rubybunny.info/articles/exchanges.html#default_exchange) exchange = channel.topic("") queue = channel.queue(retry_queue_name, :arguments => {"x-dead-letter-exchange" => "", "x-message-ttl" => ttl, "x-dead-letter-routing-key" => env.queue}) yield(exchange) channel.wait_for_confirms ensure channel.close rescue nil end end |