Module: AMQ::Client::Async::Extensions::RabbitMQ::Confirm::ChannelMixin
- Included in:
- Channel
- Defined in:
- lib/amq/client/async/extensions/rabbitmq/confirm.rb
Instance Attribute Summary collapse
-
#publisher_index ⇒ Integer
Publisher index is an index of the last message since the confirmations were activated, started with 0.
Instance Method Summary collapse
-
#confirm_select(nowait = false) {|method| ... } ⇒ self
Turn on confirmations for this channel and, if given, register callback for Confirm.Select-Ok.
-
#handle_basic_ack(method) ⇒ Object
Handler for Basic.Ack.
-
#handle_basic_nack(method) ⇒ Object
Handler for Basic.Nack.
-
#handle_select_ok(method) ⇒ Object
Handler for Confirm.Select-Ok.
-
#increment_publisher_index! ⇒ Object
This method is executed after publishing of each message via Exchage#publish.
-
#on_ack(nowait = false) {|basick_ack| ... } ⇒ self
Turn on confirmations for this channel and, if given, register callback for basic.ack from the broker.
-
#on_nack(&block) ⇒ self
Register error callback for Basic.Nack.
-
#reset_publisher_index! ⇒ Object
Resets publisher index to 0.
- #reset_state! ⇒ Object
- #uses_publisher_confirmations? ⇒ Boolean
Instance Attribute Details
#publisher_index ⇒ Integer
Publisher index is an index of the last message since the confirmations were activated, started with 0. It’s incremented by 1 every time a message is published. This is done on both client and server, hence this acknowledged messages can be matched via its delivery-tag.
67 68 69 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 67 def publisher_index @publisher_index ||= 0 end |
Instance Method Details
#confirm_select(nowait = false) {|method| ... } ⇒ self
Turn on confirmations for this channel and, if given, register callback for Confirm.Select-Ok.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 101 def confirm_select(nowait = false, &block) if nowait && block raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense" end @uses_publisher_confirmations = true reset_publisher_index! self.redefine_callback(:confirm_select, &block) unless nowait self.redefine_callback(:after_publish) do increment_publisher_index! end @connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait)) self end |
#handle_basic_ack(method) ⇒ Object
Handler for Basic.Ack. By default, it just executes hook specified via the #confirm method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Ack).
175 176 177 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 175 def handle_basic_ack(method) self.exec_callback(:ack, method) end |
#handle_basic_nack(method) ⇒ Object
Handler for Basic.Nack. By default, it just executes hook specified via the #confirm_failed method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Nack).
186 187 188 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 186 def handle_basic_nack(method) self.exec_callback(:nack, method) end |
#handle_select_ok(method) ⇒ Object
Handler for Confirm.Select-Ok. By default, it just executes hook specified via the #confirmations method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Confirm::SelectOk) and then it deletes the callback, since Confirm.Select is supposed to be sent just once.
165 166 167 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 165 def handle_select_ok(method) self.exec_callback_once(:confirm_select, method) end |
#increment_publisher_index! ⇒ Object
This method is executed after publishing of each message via Exchage#publish. Currently it just increments publisher index by 1, so messages can be actually matched.
84 85 86 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 84 def increment_publisher_index! @publisher_index += 1 end |
#on_ack(nowait = false) {|basick_ack| ... } ⇒ self
Turn on confirmations for this channel and, if given, register callback for basic.ack from the broker.
135 136 137 138 139 140 141 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 135 def on_ack(nowait = false, &block) self.use_publisher_confirmations! unless self.uses_publisher_confirmations? self.define_callback(:ack, &block) if block self end |
#on_nack(&block) ⇒ self
Register error callback for Basic.Nack. It’s called when message(s) is rejected.
148 149 150 151 152 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 148 def on_nack(&block) self.define_callback(:nack, &block) if block self end |
#reset_publisher_index! ⇒ Object
Resets publisher index to 0
74 75 76 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 74 def reset_publisher_index! @publisher_index = 0 end |
#reset_state! ⇒ Object
191 192 193 194 195 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 191 def reset_state! super @uses_publisher_confirmations = false end |
#uses_publisher_confirmations? ⇒ Boolean
119 120 121 |
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 119 def uses_publisher_confirmations? @uses_publisher_confirmations end |