Class: AMQP::Consumer

Inherits:
AMQ::Client::Async::Consumer
  • Object
show all
Defined in:
lib/amqp/consumer.rb

Overview

AMQP consumers are entities that handle messages delivered to them (“push API” as opposed to “pull API”) by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).

See Also:

Instance Attribute Summary collapse

Acknowledging & Rejecting Messages collapse

Error Handling & Recovery collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false) ⇒ Consumer

Returns a new instance of Consumer.



42
43
44
# File 'lib/amqp/consumer.rb', line 42

def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false)
  super(channel, queue, (consumer_tag || self.class.tag_generator.generate_for(queue)), exclusive, no_ack, arguments, no_local)
end

Instance Attribute Details

#argumentsHash (readonly)

Returns Custom subscription metadata.

Returns:

  • (Hash)

    Custom subscription metadata



27
28
29
# File 'lib/amqp/consumer.rb', line 27

def arguments
  @arguments
end

#channelAMQP::Channel (readonly)

Returns Channel this consumer uses.

Returns:



21
22
23
# File 'lib/amqp/consumer.rb', line 21

def channel
  @channel
end

#consumer_tagString (readonly)

Returns Consumer tag, unique consumer identifier.

Returns:

  • (String)

    Consumer tag, unique consumer identifier



25
26
27
# File 'lib/amqp/consumer.rb', line 25

def consumer_tag
  @consumer_tag
end

#queueAMQP::Queue (readonly)

Returns Queue messages are consumed from.

Returns:



23
24
25
# File 'lib/amqp/consumer.rb', line 23

def queue
  @queue
end

Class Method Details

.tag_generatorAMQ::Client::ConsumerTagGenerator

Returns Consumer tag generator.

Returns:

  • (AMQ::Client::ConsumerTagGenerator)

    Consumer tag generator



31
32
33
# File 'lib/amqp/consumer.rb', line 31

def self.tag_generator
  @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new
end

.tag_generator=(generator) ⇒ AMQ::Client::ConsumerTagGenerator

Returns Provided argument.

Parameters:

  • Assigns (AMQ::Client::ConsumerTagGenerator)

    consumer tag generator that will be used by consumer instances

Returns:

  • (AMQ::Client::ConsumerTagGenerator)

    Provided argument



37
38
39
# File 'lib/amqp/consumer.rb', line 37

def self.tag_generator=(generator)
  @tag_generator = generator
end

Instance Method Details

#acknowledge(delivery_tag) ⇒ Consumer

Acknowledge a delivery tag.



143
144
145
# File 'lib/amqp/consumer.rb', line 143

def acknowledge(delivery_tag)
  super(delivery_tag)
end

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



194
195
196
# File 'lib/amqp/consumer.rb', line 194

def auto_recover
  super
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



176
177
178
# File 'lib/amqp/consumer.rb', line 176

def before_recovery(&block)
  super(&block)
end

#cancel(nowait = false, &block) ⇒ AMQP::Consumer

Returns self.

Returns:



83
84
85
86
87
88
89
90
91
# File 'lib/amqp/consumer.rb', line 83

def cancel(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      super(nowait, &block)
    end
  end

  self
end

#consume(nowait = false, &block) ⇒ AMQP::Consumer

Begin consuming messages from the queue

Returns:



55
56
57
58
59
60
61
62
63
# File 'lib/amqp/consumer.rb', line 55

def consume(nowait = false, &block)
  @channel.once_open do
    @queue.once_declared do
      super(nowait, &block)
    end
  end

  self
end

#exclusive?Boolean

Returns true if this consumer is exclusive (other consumers for the same queue are not allowed).

Returns:

  • (Boolean)

    true if this consumer is exclusive (other consumers for the same queue are not allowed)



47
48
49
# File 'lib/amqp/consumer.rb', line 47

def exclusive?
  super
end

#inspectString

Returns Readable representation of relevant object state.

Returns:

  • (String)

    Readable representation of relevant object state.



202
203
204
# File 'lib/amqp/consumer.rb', line 202

def inspect
  "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}"
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



165
166
167
# File 'lib/amqp/consumer.rb', line 165

def on_connection_interruption(&block)
  super(&block)
end

#on_delivery(&block) ⇒ AMQP::Consumer

Register a block that will be used to handle delivered messages.

Returns:

See Also:



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/amqp/consumer.rb', line 115

def on_delivery(&block)
  # We have to maintain this multiple arities jazz
  # because older versions this gem are used in examples in at least 3
  # books published by O'Reilly :(. MK.
  delivery_shim = Proc.new { |basic_deliver, headers, payload|
    case block.arity
    when 1 then
      block.call(payload)
    when 2 then
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload)
    else
      h = Header.new(@channel, basic_deliver, headers.decode_payload)
      block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key)
    end
  }

  super(&delivery_shim)
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



184
185
186
# File 'lib/amqp/consumer.rb', line 184

def on_recovery(&block)
  super(&block)
end

#reject(delivery_tag, requeue = true) ⇒ Consumer

Returns self.



152
153
154
# File 'lib/amqp/consumer.rb', line 152

def reject(delivery_tag, requeue = true)
  super(delivery_tag, requeue)
end

#resubscribe(&block) ⇒ AMQP::Consumer

Used by automatic recovery code.

Returns:



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/amqp/consumer.rb', line 68

def resubscribe(&block)
  @channel.once_open do
    @queue.once_declared do
      self.unregister_with_channel
      @consumer_tag = self.class.tag_generator.generate_for(@queue)
      self.register_with_channel

      super(&block)
    end
  end

  self
end

#subscribed?Boolean

Queue API compatibility.

Returns:

  • (Boolean)

    true if this consumer is active (subscribed for message delivery)



97
98
99
# File 'lib/amqp/consumer.rb', line 97

def subscribed?
  !@callbacks[:delivery].empty?
end