Class: Fluffle::Confirmer
- Inherits:
-
Object
- Object
- Fluffle::Confirmer
- Defined in:
- lib/fluffle/confirmer.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
Instance Method Summary collapse
-
#confirm_select ⇒ Object
Enables confirms on the channel and sets up callback to receive and unblock corresponding ‘with_confirmation` call.
-
#initialize(channel:) ⇒ Confirmer
constructor
A new instance of Confirmer.
-
#with_confirmation(timeout:) ⇒ Object
Wraps a block (which should publish a message) with a blocking check that it received a confirmation from the RabbitMQ server that the message that was received and routed successfully.
Constructor Details
#initialize(channel:) ⇒ Confirmer
Returns a new instance of Confirmer.
5 6 7 8 9 |
# File 'lib/fluffle/confirmer.rb', line 5 def initialize(channel:) @channel = channel @pending_confirms = Concurrent::Map.new end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
3 4 5 |
# File 'lib/fluffle/confirmer.rb', line 3 def channel @channel end |
Instance Method Details
#confirm_select ⇒ Object
Enables confirms on the channel and sets up callback to receive and unblock corresponding ‘with_confirmation` call.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/fluffle/confirmer.rb', line 13 def confirm_select handle_confirm = ->(tag, _multiple, nack) do ivar = @pending_confirms.delete tag if ivar ivar.set nack else self.logger.error "Missing confirm IVar: tag=#{tag}" end end # Set the channel in confirmation mode so that we can receive confirms # of published messages @channel.confirm_select handle_confirm end |
#with_confirmation(timeout:) ⇒ Object
Wraps a block (which should publish a message) with a blocking check that it received a confirmation from the RabbitMQ server that the message that was received and routed successfully.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/fluffle/confirmer.rb', line 32 def with_confirmation(timeout:) tag = @channel.next_publish_seq_no confirm_ivar = Concurrent::IVar.new @pending_confirms[tag] = confirm_ivar result = yield nack = confirm_ivar.value timeout if confirm_ivar.incomplete? raise Errors::ConfirmTimeoutError.new('Timed out waiting for confirm') elsif nack raise Errors::NackError.new('Received nack from confirmation') end result end |