Class: AMQP::Consumer

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers
Includes:
Callbacks
Defined in:
lib/amqp/consumer.rb

Overview

AMQP consumers are entities that handle messages delivered to them (“push API” as opposed to “pull API”) by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).

Instance Attribute Summary collapse

Acknowledging & Rejecting Messages collapse

Error Handling & Recovery collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Constructor Details

#initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) ⇒ Consumer

Returns a new instance of Consumer.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/amqp/consumer.rb', line 50

def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block)
  @callbacks    = Hash.new

  @channel       = channel            || raise(ArgumentError, "channel is nil")
  @connection    = channel.connection || raise(ArgumentError, "connection is nil")
  @queue         = queue        || raise(ArgumentError, "queue is nil")
  @consumer_tag  = consumer_tag || self.class.tag_generator.generate_for(queue)
  @exclusive     = exclusive
  @no_ack        = no_ack
  @arguments     = arguments

  @no_local     = no_local

  self.register_with_channel
  self.register_with_queue
end

Instance Attribute Details

#argumentsHash (readonly)

Returns Custom subscription metadata.

Returns:

  • (Hash)

    Custom subscription metadata



35
36
37
# File 'lib/amqp/consumer.rb', line 35

def arguments
  @arguments
end

#channelAMQP::Channel (readonly)

Returns Channel this consumer uses.

Returns:



29
30
31
# File 'lib/amqp/consumer.rb', line 29

def channel
  @channel
end

#consumer_tagString (readonly)

Returns Consumer tag, unique consumer identifier.

Returns:

  • (String)

    Consumer tag, unique consumer identifier



33
34
35
# File 'lib/amqp/consumer.rb', line 33

def consumer_tag
  @consumer_tag
end

#queueAMQP::Queue (readonly)

Returns Queue messages are consumed from.

Returns:



31
32
33
# File 'lib/amqp/consumer.rb', line 31

def queue
  @queue
end

Class Method Details

.tag_generatorAMQP::ConsumerTagGenerator

Returns Consumer tag generator.

Returns:



39
40
41
# File 'lib/amqp/consumer.rb', line 39

def self.tag_generator
  @tag_generator ||= AMQP::ConsumerTagGenerator.new
end

.tag_generator=(generator) ⇒ AMQP::ConsumerTagGenerator

Returns Provided argument.

Parameters:

Returns:



45
46
47
# File 'lib/amqp/consumer.rb', line 45

def self.tag_generator=(generator)
  @tag_generator = generator
end

Instance Method Details

#acknowledge(delivery_tag) ⇒ Consumer

Acknowledge a delivery tag.



200
201
202
203
204
# File 'lib/amqp/consumer.rb', line 200

def acknowledge(delivery_tag)
  @channel.acknowledge(delivery_tag)

  self
end

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



275
276
277
278
279
# File 'lib/amqp/consumer.rb', line 275

def auto_recover
  self.exec_callback_yielding_self(:before_recovery)
  self.resubscribe
  self.exec_callback_yielding_self(:after_recovery)
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



255
256
257
# File 'lib/amqp/consumer.rb', line 255

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#cancel(nowait = false, &block) ⇒ AMQP::Consumer

Returns self.

Returns:



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/amqp/consumer.rb', line 114

def cancel(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait))
      if !nowait
        self.redefine_callback(:cancel, &block)
        @channel.consumers_awaiting_cancel_ok.push(self)
      end

      self
    end
  end

  self
end

#consume(nowait = false, &block) ⇒ AMQP::Consumer

Begin consuming messages from the queue

Returns:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/amqp/consumer.rb', line 76

def consume(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments))

      if !nowait
        self.redefine_callback(:consume, &block)
        @channel.consumers_awaiting_consume_ok.push(self)
      end

      self
    end
  end

  self
end

#exclusive?Boolean

Returns true if this consumer is exclusive (other consumers for the same queue are not allowed).

Returns:

  • (Boolean)

    true if this consumer is exclusive (other consumers for the same queue are not allowed)



68
69
70
# File 'lib/amqp/consumer.rb', line 68

def exclusive?
  !!@exclusive
end

#handle_cancel(basic_cancel) ⇒ Object

on_cancel(&block)



187
188
189
# File 'lib/amqp/consumer.rb', line 187

def handle_cancel(basic_cancel)
  self.exec_callback(:scancel, basic_cancel)
end

#handle_cancel_ok(cancel_ok) ⇒ Object

handle_consume_ok(consume_ok)



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/amqp/consumer.rb', line 301

def handle_cancel_ok(cancel_ok)
  self.exec_callback_once(:cancel, cancel_ok)

  self.unregister_with_channel
  self.unregister_with_queue

  @consumer_tag = nil

  # detach from object graph so that this object will be garbage-collected
  @queue        = nil
  @channel      = nil
  @connection   = nil

  self.clear_callbacks(:delivery)
  self.clear_callbacks(:consume)
  self.clear_callbacks(:cancel)
  self.clear_callbacks(:scancel)
end

#handle_consume_ok(consume_ok) ⇒ Object

handle_delivery(basic_deliver, metadata, payload)



297
298
299
# File 'lib/amqp/consumer.rb', line 297

def handle_consume_ok(consume_ok)
  self.exec_callback_once(:consume, consume_ok)
end

#handle_delivery(basic_deliver, metadata, payload) ⇒ Object

Implementation



293
294
295
# File 'lib/amqp/consumer.rb', line 293

def handle_delivery(basic_deliver, , payload)
  self.exec_callback(:delivery, basic_deliver, , payload)
end

#inspectString

Returns Readable representation of relevant object state.

Returns:

  • (String)

    Readable representation of relevant object state.



176
177
178
# File 'lib/amqp/consumer.rb', line 176

def inspect
  "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
end

#on_cancel(&block) ⇒ Object



181
182
183
184
185
# File 'lib/amqp/consumer.rb', line 181

def on_cancel(&block)
  self.append_callback(:scancel, &block)

  self
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



239
240
241
# File 'lib/amqp/consumer.rb', line 239

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_delivery(&block) ⇒ AMQP::Consumer

Register a block that will be used to handle delivered messages.

Returns:

See Also:



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/amqp/consumer.rb', line 152

def on_delivery(&block)
  # We have to maintain this multiple arities jazz
  # because older versions this gem are used in examples in at least 3
  # books published by O'Reilly :(. MK.
  delivery_shim = Proc.new { |basic_deliver, headers, payload|
    case block.arity
    when 1 then
      block.call(payload)
    when 2 then
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload)
    else
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
    end
  }

  self.append_callback(:delivery, &delivery_shim)

  self
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



225
226
227
# File 'lib/amqp/consumer.rb', line 225

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#register_with_channelObject (protected)



354
355
356
# File 'lib/amqp/consumer.rb', line 354

def register_with_channel
  @channel.consumers[@consumer_tag] = self
end

#register_with_queueObject (protected)

register_with_channel



358
359
360
# File 'lib/amqp/consumer.rb', line 358

def register_with_queue
  @queue.consumers[@consumer_tag]   = self
end

#reject(delivery_tag, requeue = true) ⇒ Consumer

Returns self.



211
212
213
214
215
# File 'lib/amqp/consumer.rb', line 211

def reject(delivery_tag, requeue = true)
  @channel.reject(delivery_tag, requeue)

  self
end

#resubscribe(&block) ⇒ AMQP::Consumer

Used by automatic recovery code.

Returns:



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/amqp/consumer.rb', line 96

def resubscribe(&block)
  @channel.once_open do
    @queue.once_declared do
      self.unregister_with_channel
      @consumer_tag = self.class.tag_generator.generate_for(@queue)
      self.register_with_channel

      @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments))
      self.redefine_callback(:consume, &block) if block

      self
    end
  end

  self
end

#subscribed?Boolean

Queue API compatibility.

Returns:

  • (Boolean)

    true if this consumer is active (subscribed for message delivery)



134
135
136
# File 'lib/amqp/consumer.rb', line 134

def subscribed?
  !@callbacks[:delivery].empty?
end

#to_sObject



284
285
286
# File 'lib/amqp/consumer.rb', line 284

def to_s
  "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>"
end

#unregister_with_channelObject (protected)

register_with_queue



362
363
364
# File 'lib/amqp/consumer.rb', line 362

def unregister_with_channel
  @channel.consumers.delete(@consumer_tag)
end

#unregister_with_queueObject (protected)

register_with_channel



366
367
368
# File 'lib/amqp/consumer.rb', line 366

def unregister_with_queue
  @queue.consumers.delete(@consumer_tag)
end