Module: AMQ::Client::Async::Extensions::RabbitMQ::Confirm::ChannelMixin

Included in:
Channel
Defined in:
lib/amq/client/async/extensions/rabbitmq/confirm.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#publisher_indexInteger

Publisher index is an index of the last message since the confirmations were activated, started with 0. It’s incremented by 1 every time a message is published. This is done on both client and server, hence this acknowledged messages can be matched via its delivery-tag.

Returns:

  • (Integer)

    Current publisher index.



67
68
69
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 67

def publisher_index
  @publisher_index ||= 0
end

Instance Method Details

#confirm_select(nowait = false) {|method| ... } ⇒ self

Turn on confirmations for this channel and, if given, register callback for Confirm.Select-Ok.

Parameters:

  • nowait (Boolean) (defaults to: false)

    Whether we expect Confirm.Select-Ok to be returned by the broker or not.

Yields:

  • (method)

    Callback which will be executed once we receive Confirm.Select-Ok.

Yield Parameters:

  • method (AMQ::Protocol::Confirm::SelectOk)

    Protocol method class instance.

Returns:

  • (self)

    self.

Raises:

  • (RuntimeError)

    Occurs when confirmations are already activated.

  • (RuntimeError)

    Occurs when nowait is true and block is given.

See Also:

  • #confirm


101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 101

def confirm_select(nowait = false, &block)
  if nowait && block
    raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense"
  end

  @uses_publisher_confirmations = true
  reset_publisher_index!

  self.redefine_callback(:confirm_select, &block) unless nowait
  self.redefine_callback(:after_publish) do
    increment_publisher_index!
  end
  @connection.send_frame(Protocol::Confirm::Select.encode(@id, nowait))

  self
end

#handle_basic_ack(method) ⇒ Object

Handler for Basic.Ack. By default, it just executes hook specified via the #confirm method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Ack).



175
176
177
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 175

def handle_basic_ack(method)
  self.exec_callback(:ack, method)
end

#handle_basic_nack(method) ⇒ Object

Handler for Basic.Nack. By default, it just executes hook specified via the #confirm_failed method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Nack).



186
187
188
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 186

def handle_basic_nack(method)
  self.exec_callback(:nack, method)
end

#handle_select_ok(method) ⇒ Object

Handler for Confirm.Select-Ok. By default, it just executes hook specified via the #confirmations method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Confirm::SelectOk) and then it deletes the callback, since Confirm.Select is supposed to be sent just once.



165
166
167
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 165

def handle_select_ok(method)
  self.exec_callback_once(:confirm_select, method)
end

#increment_publisher_index!Object

This method is executed after publishing of each message via Exchage#publish. Currently it just increments publisher index by 1, so messages can be actually matched.



84
85
86
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 84

def increment_publisher_index!
  @publisher_index += 1
end

#on_ack(nowait = false) {|basick_ack| ... } ⇒ self

Turn on confirmations for this channel and, if given, register callback for basic.ack from the broker.

Parameters:

  • nowait (Boolean) (defaults to: false)

    Whether we expect Confirm.Select-Ok to be returned by the broker or not.

Yields:

  • (basick_ack)

    Callback which will be executed every time we receive Basic.Ack from the broker.

Yield Parameters:

  • basick_ack (AMQ::Protocol::Basic::Ack)

    Protocol method class instance.

Returns:

  • (self)

    self.

Raises:

  • (RuntimeError)

    Occurs when confirmations are already activated.

  • (RuntimeError)

    Occurs when nowait is true and block is given.



135
136
137
138
139
140
141
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 135

def on_ack(nowait = false, &block)
  self.use_publisher_confirmations! unless self.uses_publisher_confirmations?

  self.define_callback(:ack, &block) if block

  self
end

#on_nack(&block) ⇒ self

Register error callback for Basic.Nack. It’s called when message(s) is rejected.

Returns:

  • (self)

    self



148
149
150
151
152
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 148

def on_nack(&block)
  self.define_callback(:nack, &block) if block

  self
end

#reset_publisher_index!Object

Resets publisher index to 0



74
75
76
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 74

def reset_publisher_index!
  @publisher_index = 0
end

#reset_state!Object



191
192
193
194
195
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 191

def reset_state!
  super

  @uses_publisher_confirmations = false
end

#uses_publisher_confirmations?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/amq/client/async/extensions/rabbitmq/confirm.rb', line 119

def uses_publisher_confirmations?
  @uses_publisher_confirmations
end