Module: BunnyPublisher::Mandatory

Extended by:
ActiveSupport::Concern
Defined in:
lib/bunny_publisher/mandatory.rb

Overview

Enforces mandatory option for message publishing. Catches returned message if they are not routed. Creates queue/binding before re-publishing the same message again. This publisher DUPLICATES the connection for re-publishing messages!

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



39
40
41
# File 'lib/bunny_publisher/mandatory.rb', line 39

def queue_name
  @queue_name
end

#queue_optionsObject (readonly)

Returns the value of attribute queue_options.



39
40
41
# File 'lib/bunny_publisher/mandatory.rb', line 39

def queue_options
  @queue_options
end

Instance Method Details

#declare_republish_queueObject



55
56
57
58
59
60
61
# File 'lib/bunny_publisher/mandatory.rb', line 55

def declare_republish_queue
  name = queue_name || message_options[:routing_key]

  ensure_can_create_queue!(name)

  channel.queue(name, queue_options)
end

#declare_republish_queue_binding(queue) ⇒ Object



63
64
65
66
67
# File 'lib/bunny_publisher/mandatory.rb', line 63

def declare_republish_queue_binding(queue)
  routing_key = message_options[:routing_key] || queue_name

  queue.bind(exchange, routing_key: routing_key)
end

#initialize(queue: nil, queue_options: {}, timeout_at_exit: 5, **options) ⇒ Object



41
42
43
44
45
46
47
48
49
# File 'lib/bunny_publisher/mandatory.rb', line 41

def initialize(queue: nil, queue_options: {}, timeout_at_exit: 5, **options)
  super(**options)

  @queue_name = queue
  @queue_options = queue_options
  @returned_messages = ::Queue.new # ruby queue, not Bunny's one

  at_exit { wait_for_unrouted_messages_processing(timeout: timeout_at_exit) }
end

#publish(message, options = {}) ⇒ Object



51
52
53
# File 'lib/bunny_publisher/mandatory.rb', line 51

def publish(message, options = {})
  super(message, options.merge(mandatory: true))
end