Class: Basquiat::Adapters::RabbitMq::DelayedDelivery

Inherits:
BaseStrategy
  • Object
show all
Defined in:
lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BaseStrategy

#ack, #nack, session_options

Constructor Details

#initialize(session) ⇒ DelayedDelivery

Returns a new instance of DelayedDelivery.



18
19
20
21
22
# File 'lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb', line 18

def initialize(session)
  super
  setup_delayed_delivery
  @queue_name = session.queue_name
end

Class Attribute Details

.optionsObject (readonly)

Returns the value of attribute options.



9
10
11
# File 'lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb', line 9

def options
  @options
end

Class Method Details

.setup(opts) ⇒ Object



11
12
13
14
15
# File 'lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb', line 11

def setup(opts)
  @options = { retries:            5,
               exchange_name:      'basquiat.dlx',
               queue_name_preffix: 'basquiat.ddlq' }.deep_merge(opts)
end

Instance Method Details

#requeue(message) ⇒ Object

Parameters:

  • message (Message)

    the, well, message to be requeued



31
32
33
34
# File 'lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb', line 31

def requeue(message)
  @exchange.publish(Basquiat::Json.encode(message), routing_key: requeue_route_for(message.di.routing_key))
  ack(message)
end

#run(message) ⇒ Object



24
25
26
27
28
# File 'lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb', line 24

def run(message)
  message.routing_key = extract_event_info(message.routing_key)[0]
  yield
  public_send(message.action, message)
end