Class: Warren::DelayExchange

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/warren/delay_exchange.rb

Overview

Configures and wraps up delay exchange on a Bunny Channel/Queue A delay exchange routes immediately onto a queue with a ttl once messages on this queue expire they are dead-lettered back onto to original exchange Note: This does not currently support the rabbitmq-delayed-message-exchange plugin.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel:, config:) ⇒ DelayExchange

Create a new delay exchange. Handles queue creation, binding and attaching consumers to the queues

Parameters:



22
23
24
25
26
# File 'lib/warren/delay_exchange.rb', line 22

def initialize(channel:, config:)
  @channel = channel
  @exchange_config = config&.fetch('exchange', nil)
  @bindings = config&.fetch('bindings', []) || []
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



13
14
15
# File 'lib/warren/delay_exchange.rb', line 13

def channel
  @channel
end

Instance Method Details

#activate!Object

Ensures the queues and channels are set up to receive messages keys: additional routing_keys to bind



32
33
34
# File 'lib/warren/delay_exchange.rb', line 32

def activate!
  establish_bindings!
end

#publish(payload, routing_key:, headers: {}) ⇒ Void

Post a message to the delay exchange.

Parameters:

  • payload (String)

    The message payload

  • routing_key (String)

    The routing key of the re-sent message

  • headers (Hash) (defaults to: {})

    A hash of headers. Typically: { attempts: <Integer> }

Options Hash (headers:):

  • :attempts (Integer)

    The number of times the message has been processed

Returns:

  • (Void)

Raises:

  • (StandardError)


46
47
48
49
50
51
# File 'lib/warren/delay_exchange.rb', line 46

def publish(payload, routing_key:, headers: {})
  raise StandardError, 'No delay queue configured' unless configured?

  message = Warren::Message::Simple.new(routing_key, payload, headers)
  channel.publish(message, exchange: exchange)
end