Class: Warren::DelayExchange
- Inherits:
-
Object
- Object
- Warren::DelayExchange
- Extended by:
- Forwardable
- Defined in:
- lib/warren/delay_exchange.rb
Overview
Configures and wraps up delay exchange on a Bunny Channel/Queue A delay exchange routes immediately onto a queue with a ttl once messages on this queue expire they are dead-lettered back onto to original exchange Note: This does not currently support the rabbitmq-delayed-message-exchange plugin.
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
Instance Method Summary collapse
-
#activate! ⇒ Object
Ensures the queues and channels are set up to receive messages keys: additional routing_keys to bind.
-
#initialize(channel:, config:) ⇒ DelayExchange
constructor
Create a new delay exchange.
-
#publish(payload, routing_key:, headers: {}) ⇒ Void
Post a message to the delay exchange.
Constructor Details
#initialize(channel:, config:) ⇒ DelayExchange
Create a new delay exchange. Handles queue creation, binding and attaching consumers to the queues
22 23 24 25 26 |
# File 'lib/warren/delay_exchange.rb', line 22 def initialize(channel:, config:) @channel = channel @exchange_config = config&.fetch('exchange', nil) @bindings = config&.fetch('bindings', []) || [] end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
13 14 15 |
# File 'lib/warren/delay_exchange.rb', line 13 def channel @channel end |
Instance Method Details
#activate! ⇒ Object
Ensures the queues and channels are set up to receive messages keys: additional routing_keys to bind
32 33 34 |
# File 'lib/warren/delay_exchange.rb', line 32 def activate! establish_bindings! end |
#publish(payload, routing_key:, headers: {}) ⇒ Void
Post a message to the delay exchange.
46 47 48 49 50 51 |
# File 'lib/warren/delay_exchange.rb', line 46 def publish(payload, routing_key:, headers: {}) raise StandardError, 'No delay queue configured' unless configured? = Warren::Message::Simple.new(routing_key, payload, headers) channel.publish(, exchange: exchange) end |