Class: Bunny::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/channel.rb

Overview

## Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

## Opening Channels

Channels can be opened either via ‘Bunny::Session#create_channel` (sufficient in the majority of cases) or by instantiating `Bunny::Channel` directly:

conn = Bunny.new
conn.start

ch   = conn.create_channel

This will automatically allocate a channel id.

## Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

ch = conn.create_channel
ch.close

## Higher-level API

Bunny offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

### Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

### Exchange Operations In Higher-level API

## Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

## Channel IDs

Channels are identified by their ids which are integers. Bunny takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is not a problem.

## Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With Bunny, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.

Examples:

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue Bunny::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "bunny.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue Bunny::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Consumer and Message operations (basic.*) collapse

MAX_PREFETCH_COUNT =

prefetch_count is of type short in the protocol. MK.

(2 ** 16) - 1

Constant Summary collapse

DEFAULT_CONTENT_TYPE =
"application/octet-stream".freeze
SHORTSTR_LIMIT =
255

Instance Attribute Summary collapse

Backwards compatibility with 0.8.0 collapse

Other settings collapse

Higher-level API for exchange operations collapse

Higher-level API for queue operations collapse

QoS and Flow Control collapse

Message acknowledgements collapse

Consumer and Message operations (basic.*) collapse

Queue operations (queue.*) collapse

Exchange operations (exchange.*) collapse

Flow control (channel.*) collapse

Transactions (tx.*) collapse

Publisher Confirms (confirm.*) collapse

Misc collapse

Network Failure Recovery collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) ⇒ Channel

Returns a new instance of Channel.

Parameters:

  • connection (Bunny::Session) (defaults to: nil)

    AMQP 0.9.1 connection

  • id (Integer) (defaults to: nil)

    Channel id, pass nil to make Bunny automatically allocate it

  • work_pool (Bunny::ConsumerWorkPool) (defaults to: ConsumerWorkPool.new(1))

    Thread pool for delivery processing, by default of size 1



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/bunny/channel.rb', line 172

def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
  @connection = connection
  @logger     = connection.logger
  @id         = id || @connection.next_channel_id

  # channel allocator is exhausted
  if @id < 0
    msg = "Cannot open a channel: max number of channels on connection reached. Connection channel_max value: #{@connection.channel_max}"
    @logger.error(msg)

    raise msg
  else
    @logger.debug { "Allocated channel id: #{@id}" }
  end

  @status     = :opening

  @connection.register_channel(self)

  @queues     = Hash.new
  @exchanges  = Hash.new
  @consumers  = Hash.new
  @work_pool  = work_pool

  # synchronizes frameset delivery. MK.
  @publishing_mutex = @connection.mutex_impl.new
  @consumer_mutex   = @connection.mutex_impl.new

  @queue_mutex    = @connection.mutex_impl.new
  @exchange_mutex = @connection.mutex_impl.new

  @unconfirmed_set_mutex = @connection.mutex_impl.new

  self.reset_continuations

  # threads awaiting on continuations. Used to unblock
  # them when network connection goes down so that busy loops
  # that perform synchronous operations can work. MK.
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @next_publish_seq_no = 0
  @delivery_tag_offset = 0

  @recoveries_counter = Bunny::Concurrent::AtomicFixnum.new(0)
  @uncaught_exception_handler = Proc.new do |e, consumer|
    @logger.error "Uncaught exception from consumer #{consumer.to_s}: #{e.inspect} @ #{e.backtrace[0]}"
  end

  @cancel_consumers_before_closing = false
end

Instance Attribute Details

#cancel_consumers_before_closingObject (readonly)

Returns the value of attribute cancel_consumers_before_closing.



164
165
166
# File 'lib/bunny/channel.rb', line 164

def cancel_consumers_before_closing
  @cancel_consumers_before_closing
end

#connectionBunny::Session (readonly)

Returns AMQP connection this channel was opened on.

Returns:



137
138
139
# File 'lib/bunny/channel.rb', line 137

def connection
  @connection
end

#consumersHash<String, Bunny::Consumer> (readonly)

Returns Consumer instances declared on this channel.

Returns:

  • (Hash<String, Bunny::Consumer>)

    Consumer instances declared on this channel



157
158
159
# File 'lib/bunny/channel.rb', line 157

def consumers
  @consumers
end

#delivery_tag_offsetInteger (readonly)

This will be set to the current sequence index during automatic network failure recovery to keep the sequence monotonic for the user and abstract the reset from the protocol

Returns:

  • (Integer)

    Offset for the confirmations sequence index.



147
148
149
# File 'lib/bunny/channel.rb', line 147

def delivery_tag_offset
  @delivery_tag_offset
end

#exchangesHash<String, Bunny::Exchange> (readonly)

Returns Exchange instances declared on this channel.

Returns:

  • (Hash<String, Bunny::Exchange>)

    Exchange instances declared on this channel



151
152
153
# File 'lib/bunny/channel.rb', line 151

def exchanges
  @exchanges
end

#idInteger

Returns Channel id.

Returns:

  • (Integer)

    Channel id



135
136
137
# File 'lib/bunny/channel.rb', line 135

def id
  @id
end

#nacked_setSet<Integer> (readonly)

Returns Set of nacked message indexes that have been nacked.

Returns:

  • (Set<Integer>)

    Set of nacked message indexes that have been nacked



155
156
157
# File 'lib/bunny/channel.rb', line 155

def nacked_set
  @nacked_set
end

#next_publish_seq_noInteger (readonly)

Returns Next publisher confirmations sequence index.

Returns:

  • (Integer)

    Next publisher confirmations sequence index



143
144
145
# File 'lib/bunny/channel.rb', line 143

def next_publish_seq_no
  @next_publish_seq_no
end

#prefetch_countInteger (readonly)

Returns active basic.qos prefetch value.

Returns:

  • (Integer)

    active basic.qos prefetch value



160
161
162
# File 'lib/bunny/channel.rb', line 160

def prefetch_count
  @prefetch_count
end

#prefetch_globalInteger (readonly)

Returns active basic.qos prefetch global mode.

Returns:

  • (Integer)

    active basic.qos prefetch global mode



162
163
164
# File 'lib/bunny/channel.rb', line 162

def prefetch_global
  @prefetch_global
end

#queuesHash<String, Bunny::Queue> (readonly)

Returns Queue instances declared on this channel.

Returns:

  • (Hash<String, Bunny::Queue>)

    Queue instances declared on this channel



149
150
151
# File 'lib/bunny/channel.rb', line 149

def queues
  @queues
end

#recoveries_counterObject (readonly)

Returns the value of attribute recoveries_counter.



225
226
227
# File 'lib/bunny/channel.rb', line 225

def recoveries_counter
  @recoveries_counter
end

#statusSymbol (readonly)

Returns Channel status (:opening, :open, :closed).

Returns:

  • (Symbol)

    Channel status (:opening, :open, :closed)



139
140
141
# File 'lib/bunny/channel.rb', line 139

def status
  @status
end

#unconfirmed_setSet<Integer> (readonly)

Returns Set of published message indexes that are currently unconfirmed.

Returns:

  • (Set<Integer>)

    Set of published message indexes that are currently unconfirmed



153
154
155
# File 'lib/bunny/channel.rb', line 153

def unconfirmed_set
  @unconfirmed_set
end

#work_poolBunny::ConsumerWorkPool (readonly)

Returns Thread pool delivered messages are dispatched to.

Returns:



141
142
143
# File 'lib/bunny/channel.rb', line 141

def work_pool
  @work_pool
end

Instance Method Details

#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to acknowledge

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be acknowledged as well?

See Also:



592
593
594
# File 'lib/bunny/channel.rb', line 592

def ack(delivery_tag, multiple = false)
  basic_ack(delivery_tag.to_i, multiple)
end

#activeBoolean

Returns true if this channel is open.

Returns:

  • (Boolean)

    true if this channel is open



301
302
303
# File 'lib/bunny/channel.rb', line 301

def active
  open?
end

#add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block) ⇒ Object



1759
1760
1761
1762
1763
1764
1765
# File 'lib/bunny/channel.rb', line 1759

def add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block)
  @consumer_mutex.synchronize do
    c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments)
    c.on_delivery(&block) if block
    @consumers[consumer_tag] = c
  end
end

#any_consumers?Boolean

Returns true if there are consumers on this channel.

Returns:

  • (Boolean)

    true if there are consumers on this channel



1098
1099
1100
# File 'lib/bunny/channel.rb', line 1098

def any_consumers?
  @consumer_mutex.synchronize { @consumers.any? }
end

#basic_ack(delivery_tag, multiple = false) ⇒ NilClass

Acknowledges a delivery (message).

Examples:

Ack a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_ack(delivery_info.delivery_tag.to_i)
end

Ack a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_ack(delivery_info.delivery_tag.to_i)

Ack multiple messages fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
# ack all fetched messages up to payload3
ch.basic_ack(delivery_info.delivery_tag.to_i, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • multiple (Boolean) (defaults to: false)

    Should all deliveries up to this one be acknowledged?

Returns:

  • (NilClass)

    nil

See Also:



878
879
880
881
882
883
884
885
# File 'lib/bunny/channel.rb', line 878

def basic_ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    raise_if_no_longer_open!
    @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))

    nil
  end
end

#basic_cancel(consumer_tag, opts = {}) ⇒ AMQ::Protocol::Basic::CancelOk

Removes a consumer. Messages for this consumer will no longer be delivered. If the queue it was on is auto-deleted and this consumer was the last one, the queue will be deleted.

Parameters:

  • consumer_tag (String)

    Consumer tag (unique identifier) to cancel

  • arguments (Hash)

    ({}) Optional arguments

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :no_wait (Boolean) — default: false

    if set to true, this method won’t receive a response and will immediately return nil

Returns:

  • (AMQ::Protocol::Basic::CancelOk)

    RabbitMQ response or nil, if the no_wait option is used

See Also:



1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
# File 'lib/bunny/channel.rb', line 1077

def basic_cancel(consumer_tag, opts = {})
  no_wait = opts.fetch(:no_wait, false)
  @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, no_wait))

  if no_wait
    @last_basic_cancel_ok = nil
  else
    with_continuation_timeout do
      @last_basic_cancel_ok = wait_on_continuations
    end
  end

  # reduces thread usage for channels that don't have any
  # consumers
  @work_pool.shutdown(true) unless self.any_consumers?

  @last_basic_cancel_ok
end

#basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block) ⇒ AMQ::Protocol::Basic::ConsumeOk Also known as: consume

Registers a consumer for queue. Delivered messages will be handled with the block provided to this method.

Parameters:

  • queue (String, Bunny::Queue)

    Queue to consume from

  • consumer_tag (String) (defaults to: generate_consumer_tag)

    Consumer tag (unique identifier), generated by Bunny by default

  • no_ack (Boolean) (defaults to: false)

    (false) If true, delivered messages will be automatically acknowledged. If false, manual acknowledgements will be necessary.

  • exclusive (Boolean) (defaults to: false)

    (false) Should this consumer be exclusive?

  • arguments (Hash) (defaults to: nil)

    (nil) Optional arguments that may be used by RabbitMQ extensions, etc

Returns:

  • (AMQ::Protocol::Basic::ConsumeOk)

    RabbitMQ response

See Also:



965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
# File 'lib/bunny/channel.rb', line 965

def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  queue_name = if queue.respond_to?(:name)
                 queue.name
               else
                 queue
               end

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer_tag && consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
    add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
  end

  @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
      queue_name,
      consumer_tag,
      false,
      no_ack,
      exclusive,
      false,
      arguments))

  begin
    with_continuation_timeout do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)

  @last_basic_consume_ok
end

#basic_consume_with(consumer) ⇒ AMQ::Protocol::Basic::ConsumeOk Also known as: consume_with

Registers a consumer for queue as Bunny::Consumer instance.

Parameters:

  • consumer (Bunny::Consumer)

    Consumer to register. It should already have queue name, consumer tag and other attributes set.

Returns:

  • (AMQ::Protocol::Basic::ConsumeOk)

    RabbitMQ response

See Also:



1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
# File 'lib/bunny/channel.rb', line 1021

def basic_consume_with(consumer)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer.consumer_tag && consumer.consumer_tag.strip != AMQ::Protocol::EMPTY_STRING
    register_consumer(consumer.consumer_tag, consumer)
  end

  @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@id,
      consumer.queue_name,
      consumer.consumer_tag,
      false,
      consumer.no_ack,
      consumer.exclusive,
      false,
      consumer.arguments))

  begin
    with_continuation_timeout do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  register_consumer(@last_basic_consume_ok.consumer_tag, consumer)

  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_consume_ok
end

#basic_get(queue, opts = {:manual_ack => true}) ⇒ Array

Synchronously fetches a message from the queue, if there are any. This method is for cases when the convenience of synchronous operations is more important than throughput.

Examples:

Using Bunny::Channel#basic_get with manual acknowledgements

conn = Bunny.new
conn.start
ch   = conn.create_channel
# here we assume the queue already exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue1", :manual_ack => true)
ch.acknowledge(delivery_info.delivery_tag)

Parameters:

  • queue (String)

    Queue name

  • opts (Hash) (defaults to: {:manual_ack => true})

    Options

Options Hash (opts):

  • :ack (Boolean) — default: true
    DEPRECATED

    Use :manual_ack instead

  • :manual_ack (Boolean) — default: true

    Will this message be acknowledged manually?

Returns:

  • (Array)

    A triple of delivery info, message properties and message content

See Also:



705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
# File 'lib/bunny/channel.rb', line 705

def basic_get(queue, opts = {:manual_ack => true})
  raise_if_no_longer_open!

  unless opts[:ack].nil?
    warn "[DEPRECATION] `:ack` is deprecated.  Please use `:manual_ack` instead."
    opts[:manual_ack] = opts[:ack]
  end

  @connection.send_frame(AMQ::Protocol::Basic::Get.encode(@id, queue, !(opts[:manual_ack])))
  # this is a workaround for the edge case when basic_get is called in a tight loop
  # and network goes down we need to perform recovery. The problem is, basic_get will
  # keep blocking the thread that calls it without clear way to constantly unblock it
  # from the network activity loop (where recovery happens) with the current continuations
  # implementation (and even more correct and convenient ones, such as wait/notify, should
  # we implement them). So we return a triple of nils immediately which apps should be
  # able to handle anyway as "got no message, no need to act". MK.
  last_basic_get_response = if @connection.open?
                              begin
                                wait_on_basic_get_continuations
                              rescue Timeout::Error => e
                                raise_if_continuation_resulted_in_a_channel_error!
                                raise e
                              end
                            else
                              [nil, nil, nil]
                            end

  raise_if_continuation_resulted_in_a_channel_error!
  last_basic_get_response
end

#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.

Examples:

Requeue a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag, false, true)
end

Reject a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag)
end

Requeue a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_nack(delivery_info.delivery_tag, false, true)

Requeue multiple messages fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
# requeue all fetched messages up to payload3
ch.basic_nack(delivery_info.delivery_tag, true, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean) (defaults to: false)

    Should the message be requeued?

  • multiple (Boolean) (defaults to: false)

    Should all deliveries up to this one be rejected/requeued?

Returns:

  • (NilClass)

    nil

See Also:



940
941
942
943
944
945
946
947
948
949
950
# File 'lib/bunny/channel.rb', line 940

def basic_nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    raise_if_no_longer_open!
    @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
                                                             delivery_tag,
                                                             multiple,
                                                             requeue))

    nil
  end
end

#basic_publish(payload, exchange, routing_key, opts = {}) ⇒ Bunny::Channel

Publishes a message using basic.publish AMQP 0.9.1 method.

Parameters:

  • payload (String)

    Message payload. It will never be modified by Bunny or RabbitMQ in any way.

  • exchange (String)

    Exchange to publish to

  • routing_key (String)

    Routing key

  • opts (Hash) (defaults to: {})

    Publishing options

Options Hash (opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:

Raises:

  • (ArgumentError)


643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
# File 'lib/bunny/channel.rb', line 643

def basic_publish(payload, exchange, routing_key, opts = {})
  raise_if_no_longer_open!
  raise ArgumentError, "routing key cannot be longer than #{SHORTSTR_LIMIT} characters" if routing_key && routing_key.size > SHORTSTR_LIMIT

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  mode = if opts.fetch(:persistent, true)
           2
         else
           1
         end

  opts[:delivery_mode] ||= mode
  opts[:content_type]  ||= DEFAULT_CONTENT_TYPE
  opts[:priority]      ||= 0

  if @next_publish_seq_no > 0
    @unconfirmed_set_mutex.synchronize do
      @unconfirmed_set.add(@next_publish_seq_no)
      @next_publish_seq_no += 1
    end
  end

  frames = AMQ::Protocol::Basic::Publish.encode(@id,
    payload,
    opts,
    exchange_name,
    routing_key,
    opts[:mandatory],
    false,
    @connection.frame_max)
  @connection.send_frameset(frames, self)

  self
end

#basic_qos(count, global = false) ⇒ AMQ::Protocol::Basic::QosOk Also known as: prefetch

Controls message delivery rate using basic.qos AMQP 0.9.1 method.

Parameters:

  • prefetch_count (Integer)

    How many messages can consumers on this channel be given at a time (before they have to acknowledge or reject one of the earlier received messages)

  • global (Boolean) (defaults to: false)

    Whether to use global mode for prefetch:

    • false: per-consumer

    • true: per-channel

    Note that the default value (false) hasn’t actually changed, but previous documentation described that as meaning per-channel and unsupported in RabbitMQ, whereas it now actually appears to mean per-consumer and supported (www.rabbitmq.com/consumer-prefetch.html).

Returns:

  • (AMQ::Protocol::Basic::QosOk)

    RabbitMQ response

Raises:

  • (ArgumentError)

See Also:



756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
# File 'lib/bunny/channel.rb', line 756

def basic_qos(count, global = false)
  raise ArgumentError.new("prefetch count must be a positive integer, given: #{count}") if count < 0
  raise ArgumentError.new("prefetch count must be no greater than #{MAX_PREFETCH_COUNT}, given: #{count}") if count > MAX_PREFETCH_COUNT
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, count, global))

  with_continuation_timeout do
    @last_basic_qos_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @prefetch_count  = count
  @prefetch_global = global

  @last_basic_qos_ok
end

#basic_recover(requeue) ⇒ AMQ::Protocol::Basic::RecoverOk

Redeliver unacknowledged messages

Parameters:

  • requeue (Boolean)

    Should messages be requeued?

Returns:

  • (AMQ::Protocol::Basic::RecoverOk)

    RabbitMQ response



780
781
782
783
784
785
786
787
788
789
790
# File 'lib/bunny/channel.rb', line 780

def basic_recover(requeue)
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
  with_continuation_timeout do
    @last_basic_recover_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_recover_ok
end

#basic_reject(delivery_tag, requeue = false) ⇒ NilClass

Rejects or requeues a message.

Examples:

Requeue a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = Bunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # reject the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = Bunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :manual_ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean) (defaults to: false)

    Should the message be requeued?

Returns:

  • (NilClass)

    nil

See Also:



830
831
832
833
834
835
836
837
# File 'lib/bunny/channel.rb', line 830

def basic_reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    raise_if_no_longer_open!
    @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))

    nil
  end
end

#can_accept_queue_declare_ok?(method) ⇒ Boolean

Returns:

  • (Boolean)


1773
1774
1775
1776
# File 'lib/bunny/channel.rb', line 1773

def can_accept_queue_declare_ok?(method)
  @pending_queue_declare_name == method.queue ||
    pending_server_named_queue_declaration?
end

#cancel_consumers_before_closing!Object



325
326
327
# File 'lib/bunny/channel.rb', line 325

def cancel_consumers_before_closing!
  @cancel_consumers_before_closing = true
end

#cancel_consumers_before_closing?Boolean

Returns:

  • (Boolean)


329
330
331
# File 'lib/bunny/channel.rb', line 329

def cancel_consumers_before_closing?
  !!@cancel_consumers_before_closing
end

#channel_flow(active) ⇒ AMQ::Protocol::Channel::FlowOk

Note:

Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects that consumers do not keep up with them.

Enables or disables message flow for the channel. When message flow is disabled, no new messages will be delivered to consumers on this channel. This is typically used by consumers that cannot keep up with the influx of messages.

Returns:

  • (AMQ::Protocol::Channel::FlowOk)

    RabbitMQ response

See Also:



1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
# File 'lib/bunny/channel.rb', line 1448

def channel_flow(active)
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
  with_continuation_timeout do
    @last_channel_flow_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_channel_flow_ok
end

#channel_level_exception_after_operation_that_has_no_response?(method) ⇒ Boolean

Returns:

  • (Boolean)


1872
1873
1874
# File 'lib/bunny/channel.rb', line 1872

def channel_level_exception_after_operation_that_has_no_response?(method)
  method.reply_code == 406 && (method.reply_text =~ /unknown delivery tag/ || method.reply_text =~ /delivery acknowledgement on channel \d+ timed out/)
end

#clientBunny::Session

Returns Connection this channel was opened on.

Returns:



306
307
308
# File 'lib/bunny/channel.rb', line 306

def client
  @connection
end

#closeObject

Closes the channel. Closed channels can no longer be used (this includes associated Queue, Exchange and Bunny::Consumer instances.



252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/bunny/channel.rb', line 252

def close
  # see bunny#528
  raise_if_no_longer_open!

  # This is a best-effort attempt to cancel all consumers before closing the channel.
  # Retries are extremely unlikely to succeed, and the channel itself is about to be closed,
  # so we don't bother retrying.
  if self.cancel_consumers_before_closing?
   # cancelling a consumer involves using the same mutex, so avoid holding the lock
    keys = @consumer_mutex.synchronize { @consumers.keys }
    keys.each do |ctag|
      begin
        self.basic_cancel(ctag)
      rescue Bunny::Exception
        # ignore
      rescue Bunny::ClientTimeout
        # ignore
      end
    end
  end

  @connection.close_channel(self)
  @status = :closed
  @work_pool.shutdown
  maybe_kill_consumer_work_pool!
end

#closed?Boolean

Returns true if this channel is closed (manually or because of an exception), false otherwise.

Returns:

  • (Boolean)

    true if this channel is closed (manually or because of an exception), false otherwise



287
288
289
# File 'lib/bunny/channel.rb', line 287

def closed?
  @status == :closed
end

#configure(&block) ⇒ Object



319
320
321
322
323
# File 'lib/bunny/channel.rb', line 319

def configure(&block)
  block.call(self) if block_given?

  self
end

#confirm_select(callback = nil) ⇒ AMQ::Protocol::Confirm::SelectOk

Enables publisher confirms for the channel.

Returns:

  • (AMQ::Protocol::Confirm::SelectOk)

    RabbitMQ response

See Also:



1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
# File 'lib/bunny/channel.rb', line 1537

def confirm_select(callback = nil)
  raise_if_no_longer_open!

  if @next_publish_seq_no == 0
    @confirms_continuations = new_continuation
    @unconfirmed_set        = Set.new
    @nacked_set             = Set.new
    @next_publish_seq_no    = 1
    @only_acks_received = true
  end

  @confirms_callback = callback

  @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
  with_continuation_timeout do
    @last_confirm_select_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!
  @last_confirm_select_ok
end

#default_exchangeObject

Provides access to the default exchange



416
417
418
# File 'lib/bunny/channel.rb', line 416

def default_exchange
  @default_exchange ||= Exchange.default(self)
end

#deregister_exchange(exchange) ⇒ Object



2073
2074
2075
# File 'lib/bunny/channel.rb', line 2073

def deregister_exchange(exchange)
  @exchange_mutex.synchronize { @exchanges.delete(exchange.name) }
end

#deregister_queue(queue) ⇒ Object



2053
2054
2055
# File 'lib/bunny/channel.rb', line 2053

def deregister_queue(queue)
  @queue_mutex.synchronize { @queues.delete(queue.name) }
end

#deregister_queue_named(name) ⇒ Object



2058
2059
2060
# File 'lib/bunny/channel.rb', line 2058

def deregister_queue_named(name)
  @queue_mutex.synchronize { @queues.delete(name) }
end

#direct(name, opts = {}) ⇒ Bunny::Exchange

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



373
374
375
# File 'lib/bunny/channel.rb', line 373

def direct(name, opts = {})
  find_exchange(name) || Exchange.new(self, :direct, name, opts)
end

#durable_queue(name, type = "classic", opts = {}) ⇒ Bunny::Queue

Declares a new server-named queue that is automatically deleted when the connection is closed.

Parameters:

  • name (String)

    Queue name. Empty (server-generated) names are not supported by this method.

  • opts (Hash) (defaults to: {})

    Queue properties and other options. Durability, exclusivity, auto-deletion options will be ignored.

Options Hash (opts):

  • :arguments (Hash) — default: {}

    Optional arguments (x-arguments)

Returns:

See Also:



516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'lib/bunny/channel.rb', line 516

def durable_queue(name, type = "classic", opts = {})
  throw ArgumentError.new("queue name must not be nil") if name.nil?
  throw ArgumentError.new("queue name must not be empty (server-named durable queues do not make sense)") if name.empty?

  final_opts = opts.merge({
    :type        => type,
    :durable     => true,
    # exclusive or auto-delete QQs do not make much sense
    :exclusive   => false,
    :auto_delete => false
  })
  q = find_queue(name) || Bunny::Queue.new(self, name, final_opts)

  register_queue(q)
end

#exchange(name, opts = {}) ⇒ Bunny::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or “x-consistent-hash”

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



434
435
436
# File 'lib/bunny/channel.rb', line 434

def exchange(name, opts = {})
  find_exchange(name) || Exchange.new(self, opts.fetch(:type, :direct), name, opts)
end

#exchange_bind(source, destination, opts = {}) ⇒ AMQ::Protocol::Exchange::BindOk

Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • destination (String)

    Destination exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Exchange::BindOk)

    RabbitMQ response

See Also:



1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
# File 'lib/bunny/channel.rb', line 1359

def exchange_bind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(AMQ::Protocol::Exchange::Bind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  with_continuation_timeout do
    @last_exchange_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_bind_ok
end

#exchange_declare(name, type, opts = {}) ⇒ AMQ::Protocol::Exchange::DeclareOk

Declares a exchange using exchange.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    The name of the exchange. Note that LF and CR characters will be stripped from the value.

  • type (String, Symbol)

    Exchange type, e.g. :fanout or :topic

  • opts (Hash) (defaults to: {})

    Exchange properties

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this exchange be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived exchanges.

  • auto_delete (Boolean) — default: false

    Should this exchange be deleted when it is no longer used?

  • passive (Boolean) — default: false

    If true, exchange will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

  • (AMQ::Protocol::Exchange::DeclareOk)

    RabbitMQ response

See Also:



1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
# File 'lib/bunny/channel.rb', line 1296

def exchange_declare(name, type, opts = {})
  raise_if_no_longer_open!

  # strip trailing new line and carriage returns
  # just like RabbitMQ does
  safe_name = name.gsub(/[\r\n]/, "")
  @connection.send_frame(AMQ::Protocol::Exchange::Declare.encode(@id,
      safe_name,
      type.to_s,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:auto_delete, false),
      opts.fetch(:internal, false),
      opts.fetch(:no_wait, false),
      opts[:arguments]))
  with_continuation_timeout do
    @last_exchange_declare_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_declare_ok
end

#exchange_delete(name, opts = {}) ⇒ AMQ::Protocol::Exchange::DeleteOk

Deletes a exchange using exchange.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this exchange be deleted only if it is no longer used

Returns:

  • (AMQ::Protocol::Exchange::DeleteOk)

    RabbitMQ response

See Also:



1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
# File 'lib/bunny/channel.rb', line 1329

def exchange_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Exchange::Delete.encode(@id,
      name,
      opts[:if_unused],
      false))
  with_continuation_timeout do
    @last_exchange_delete_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_delete_ok
end

#exchange_unbind(source, destination, opts = {}) ⇒ AMQ::Protocol::Exchange::UnbindOk

Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • destination (String)

    Destination exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Exchange::UnbindOk)

    RabbitMQ response

See Also:



1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
# File 'lib/bunny/channel.rb', line 1403

def exchange_unbind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(AMQ::Protocol::Exchange::Unbind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  with_continuation_timeout do
    @last_exchange_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_unbind_ok
end

#fanout(name, opts = {}) ⇒ Bunny::Exchange

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



355
356
357
# File 'lib/bunny/channel.rb', line 355

def fanout(name, opts = {})
  find_exchange(name) || Exchange.new(self, :fanout, name, opts)
end

#find_exchange(name) ⇒ Object



2083
2084
2085
# File 'lib/bunny/channel.rb', line 2083

def find_exchange(name)
  @exchange_mutex.synchronize { @exchanges[name] }
end

#find_queue(name) ⇒ Object



2068
2069
2070
# File 'lib/bunny/channel.rb', line 2068

def find_queue(name)
  @queue_mutex.synchronize { @queues[name] }
end

#flow(active) ⇒ Object

Flow control. When set to false, RabbitMQ will stop delivering messages on this channel.

Parameters:

  • active (Boolean)

    Should messages to consumers on this channel be delivered?



555
556
557
# File 'lib/bunny/channel.rb', line 555

def flow(active)
  channel_flow(active)
end

#frame_sizeObject



311
312
313
# File 'lib/bunny/channel.rb', line 311

def frame_size
  @connection.frame_max
end

#generate_consumer_tag(name = "bunny") ⇒ String

Unique string supposed to be used as a consumer tag.

Returns:

  • (String)

    Unique string.



1588
1589
1590
1591
# File 'lib/bunny/channel.rb', line 1588

def generate_consumer_tag(name = "bunny")
  t = Bunny::Timestamp.now
  "#{name}-#{t.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end

#handle_ack_or_nack(delivery_tag_before_offset, multiple, nack) ⇒ Object

Handle delivery tag offset calculations to keep the the delivery tag monotonic after a reset due to automatic network failure recovery. @unconfirmed_set contains indices already offsetted.



1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
# File 'lib/bunny/channel.rb', line 1917

def handle_ack_or_nack(delivery_tag_before_offset, multiple, nack)
  @unconfirmed_set_mutex.synchronize do
    delivery_tag          = delivery_tag_before_offset + @delivery_tag_offset
    confirmed_range_start = multiple ? @unconfirmed_set.min : delivery_tag
    confirmed_range_end   = delivery_tag
    confirmed_range       = (confirmed_range_start..confirmed_range_end)

    if nack
      @nacked_set.merge(@unconfirmed_set & confirmed_range)
    end

    @unconfirmed_set.subtract(confirmed_range)

    @only_acks_received = (@only_acks_received && !nack)

    @confirms_continuations.push(true) if @unconfirmed_set.empty?

    if @confirms_callback
      confirmed_range.each { |tag| @confirms_callback.call(tag, false, nack) }
    end
  end
end

#handle_basic_get_empty(basic_get_empty) ⇒ Object



1883
1884
1885
# File 'lib/bunny/channel.rb', line 1883

def handle_basic_get_empty(basic_get_empty)
  @basic_get_continuations.push([nil, nil, nil])
end

#handle_basic_get_ok(basic_get_ok, properties, content) ⇒ Object



1877
1878
1879
1880
# File 'lib/bunny/channel.rb', line 1877

def handle_basic_get_ok(basic_get_ok, properties, content)
  basic_get_ok.delivery_tag = VersionedDeliveryTag.new(basic_get_ok.delivery_tag, @recoveries_counter.get)
  @basic_get_continuations.push([basic_get_ok, properties, content])
end

#handle_basic_return(basic_return, properties, content) ⇒ Object



1904
1905
1906
1907
1908
1909
1910
1911
1912
# File 'lib/bunny/channel.rb', line 1904

def handle_basic_return(basic_return, properties, content)
  x = find_exchange(basic_return.exchange)

  if x
    x.handle_return(ReturnInfo.new(basic_return), MessageProperties.new(properties), content)
  else
    @logger.warn "Exchange #{basic_return.exchange} is not in channel #{@id}'s cache! Dropping returned message!"
  end
end

#handle_frameset(basic_deliver, properties, content) ⇒ Object



1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
# File 'lib/bunny/channel.rb', line 1888

def handle_frameset(basic_deliver, properties, content)
  consumer = @consumers[basic_deliver.consumer_tag]
  if consumer
    @work_pool.submit do
      begin
        consumer.call(DeliveryInfo.new(basic_deliver, consumer, self), MessageProperties.new(properties), content)
      rescue StandardError => e
        @uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler
      end
    end
  else
    @logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!"
  end
end

#handle_method(method) ⇒ Object



1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
# File 'lib/bunny/channel.rb', line 1779

def handle_method(method)
  @logger.debug { "Channel#handle_frame on channel #{@id}: #{method.inspect}" }
  case method
  when AMQ::Protocol::Queue::DeclareOk then
    # safeguard against late arrivals of responses and
    # so on, see ruby-amqp/bunny#558
    if can_accept_queue_declare_ok?(method)
      @continuations.push(method)
    else
      if !pending_server_named_queue_declaration?
        # this response is for an outdated/overwritten
        # queue.declare, drop it
        @logger.warn "Received a queue.declare-ok response for a mismatching queue (#{method.queue} instead of #{@pending_queue_declare_name}) on channel #{@id}, possibly due to concurrent channel use or a timeout, ignoring it"
      end
    end
  when AMQ::Protocol::Queue::DeleteOk then
    @continuations.push(method)
  when AMQ::Protocol::Queue::PurgeOk then
    @continuations.push(method)
  when AMQ::Protocol::Queue::BindOk then
    @continuations.push(method)
  when AMQ::Protocol::Queue::UnbindOk then
    @continuations.push(method)
  when AMQ::Protocol::Exchange::BindOk then
    @continuations.push(method)
  when AMQ::Protocol::Exchange::UnbindOk then
    @continuations.push(method)
  when AMQ::Protocol::Exchange::DeclareOk then
    @continuations.push(method)
  when AMQ::Protocol::Exchange::DeleteOk then
    @continuations.push(method)
  when AMQ::Protocol::Basic::QosOk then
    @continuations.push(method)
  when AMQ::Protocol::Basic::RecoverOk then
    @continuations.push(method)
  when AMQ::Protocol::Channel::FlowOk then
    @continuations.push(method)
  when AMQ::Protocol::Basic::ConsumeOk then
    @continuations.push(method)
  when AMQ::Protocol::Basic::Cancel then
    if consumer = @consumers[method.consumer_tag]
      @work_pool.submit do
        begin
          if recovers_cancelled_consumers?
            consumer.handle_cancellation(method)
            @logger.info "Automatically recovering cancelled consumer #{consumer.consumer_tag} on queue #{consumer.queue_name}"

            consume_with(consumer)
          else
            @consumers.delete(method.consumer_tag)
            consumer.handle_cancellation(method)
          end
        rescue Exception => e
          @logger.error "Got exception when notifying consumer #{method.consumer_tag} about cancellation!"
          @uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler
        end
      end
    else
      @logger.warn "No consumer for tag #{method.consumer_tag} on channel #{@id}!"
    end
  when AMQ::Protocol::Basic::CancelOk then
    @continuations.push(method)
    unregister_consumer(method.consumer_tag)
  when AMQ::Protocol::Tx::SelectOk, AMQ::Protocol::Tx::CommitOk, AMQ::Protocol::Tx::RollbackOk then
    @continuations.push(method)
  when AMQ::Protocol::Tx::SelectOk then
    @continuations.push(method)
  when AMQ::Protocol::Confirm::SelectOk then
    @continuations.push(method)
  when AMQ::Protocol::Basic::Ack then
    handle_ack_or_nack(method.delivery_tag, method.multiple, false)
  when AMQ::Protocol::Basic::Nack then
    handle_ack_or_nack(method.delivery_tag, method.multiple, true)
  when AMQ::Protocol::Channel::Close then
    closed!
    @connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(@id))

    # basic.ack, basic.reject, basic.nack. MK.
    if channel_level_exception_after_operation_that_has_no_response?(method)
      @on_error.call(self, method) if @on_error
    else
      @last_channel_error = instantiate_channel_level_exception(method)
      @continuations.push(method)
    end

  when AMQ::Protocol::Channel::CloseOk then
    @continuations.push(method)
  else
    raise "Do not know how to handle #{method.inspect} in Bunny::Channel#handle_method"
  end
end

#headers(name, opts = {}) ⇒ Bunny::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



409
410
411
# File 'lib/bunny/channel.rb', line 409

def headers(name, opts = {})
  find_exchange(name) || Exchange.new(self, :headers, name, opts)
end

#increment_recoveries_counterObject



1708
1709
1710
# File 'lib/bunny/channel.rb', line 1708

def increment_recoveries_counter
  @recoveries_counter.increment
end

#inspectObject



1730
1731
1732
# File 'lib/bunny/channel.rb', line 1730

def inspect
  to_s
end

#maybe_kill_consumer_work_pool!Object



2041
2042
2043
2044
2045
# File 'lib/bunny/channel.rb', line 2041

def maybe_kill_consumer_work_pool!
  if @work_pool && @work_pool.running?
    @work_pool.kill
  end
end

#maybe_pause_consumer_work_pool!Object



2036
2037
2038
# File 'lib/bunny/channel.rb', line 2036

def maybe_pause_consumer_work_pool!
  @work_pool.pause if @work_pool && @work_pool.running?
end

#maybe_start_consumer_work_pool!Object

Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads that won’t do any real work for channels that do not register consumers (e.g. only used for publishing). MK.



2029
2030
2031
2032
2033
# File 'lib/bunny/channel.rb', line 2029

def maybe_start_consumer_work_pool!
  if @work_pool && !@work_pool.running?
    @work_pool.start
  end
end

#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be rejected as well?

  • requeue (Boolean) (defaults to: false)

    (false) Should this message be requeued instead of dropping it?

See Also:



607
608
609
# File 'lib/bunny/channel.rb', line 607

def nack(delivery_tag, multiple = false, requeue = false)
  basic_nack(delivery_tag.to_i, multiple, requeue)
end

#numberInteger

Returns Channel id.

Returns:

  • (Integer)

    Channel id



296
297
298
# File 'lib/bunny/channel.rb', line 296

def number
  self.id
end

#on_error(&block) ⇒ Object

Defines a handler for errors that are not responses to a particular operations (e.g. basic.ack, basic.reject, basic.nack).



1604
1605
1606
# File 'lib/bunny/channel.rb', line 1604

def on_error(&block)
  @on_error = block
end

#on_uncaught_exception(&block) ⇒ Object

Defines a handler for uncaught exceptions in consumers (e.g. delivered message handlers).



1612
1613
1614
# File 'lib/bunny/channel.rb', line 1612

def on_uncaught_exception(&block)
  @uncaught_exception_handler = block
end

#openBunny::Channel

Opens the channel and resets its internal state

Returns:



235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/bunny/channel.rb', line 235

def open
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @connection.open_channel(self)
  # clear last channel error
  @last_channel_error = nil

  @status = :open

  self
end

#open?Boolean

Returns true if this channel is open, false otherwise.

Returns:

  • (Boolean)

    true if this channel is open, false otherwise



281
282
283
# File 'lib/bunny/channel.rb', line 281

def open?
  @status == :open
end

#pending_server_named_queue_declaration?Boolean

Returns:

  • (Boolean)


1768
1769
1770
# File 'lib/bunny/channel.rb', line 1768

def pending_server_named_queue_declaration?
  @pending_queue_declare_name && @pending_queue_declare_name.empty?
end

#queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) ⇒ Bunny::Queue

Declares a queue or looks it up in the per-channel cache.

Parameters:

  • name (String) (defaults to: AMQ::Protocol::EMPTY_STRING)

    Queue name. Pass an empty string to declare a server-named queue (make RabbitMQ generate a unique name).

  • opts (Hash) (defaults to: {})

    Queue properties and other options

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Hash) — default: {}

    Optional arguments (x-arguments)

Returns:

  • (Bunny::Queue)

    Queue that was declared or looked up in the cache

See Also:



457
458
459
460
461
462
463
# File 'lib/bunny/channel.rb', line 457

def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
  throw ArgumentError.new("queue name must not be nil") if name.nil?

  q = find_queue(name) || Bunny::Queue.new(self, name, opts)

  register_queue(q)
end

#queue_bind(name, exchange, opts = {}) ⇒ AMQ::Protocol::Queue::BindOk

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Queue::BindOk)

    RabbitMQ response

See Also:



1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
# File 'lib/bunny/channel.rb', line 1217

def queue_bind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(AMQ::Protocol::Queue::Bind.encode(@id,
      name,
      exchange_name,
      (opts[:routing_key] || opts[:key]),
      false,
      opts[:arguments]))
  with_continuation_timeout do
    @last_queue_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_bind_ok
end

#queue_declare(name, opts = {}) ⇒ AMQ::Protocol::Queue::DeclareOk

Declares a queue using queue.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    The name of the queue or an empty string to let RabbitMQ generate a name. Note that LF and CR characters will be stripped from the value.

  • opts (Hash) (defaults to: {})

    Queue properties

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this queue be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived queues.

  • auto_delete (Boolean) — default: false

    Should this queue be deleted when the last consumer is cancelled?

  • exclusive (Boolean) — default: false

    Should only this connection be able to use this queue? If true, the queue will be automatically deleted when this connection is closed

  • passive (Boolean) — default: false

    If true, queue will be checked for existence. If it does not exist, NotFound will be raised.

  • :arguments (Hash) — default: {}

    Optional queue arguments (x-arguments)

Returns:

  • (AMQ::Protocol::Queue::DeclareOk)

    RabbitMQ response

See Also:



1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
# File 'lib/bunny/channel.rb', line 1125

def queue_declare(name, opts = {})
  raise_if_no_longer_open!

  Bunny::Queue.verify_type!(opts[:arguments]) if opts[:arguments]

  # strip trailing new line and carriage returns
  # just like RabbitMQ does
  safe_name = name.gsub(/[\r\n]/, "")
  @pending_queue_declare_name = safe_name
  @connection.send_frame(
    AMQ::Protocol::Queue::Declare.encode(@id,
      @pending_queue_declare_name,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:exclusive, false),
      opts.fetch(:auto_delete, false),
      false,
      opts[:arguments]))

  begin
    with_continuation_timeout do
      @last_queue_declare_ok = wait_on_continuations
    end
  ensure
    # clear pending continuation context if it belongs to us
    @pending_queue_declare_name = nil if @pending_queue_declare_name == safe_name
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_declare_ok
end

#queue_delete(name, opts = {}) ⇒ AMQ::Protocol::Queue::DeleteOk

Deletes a queue using queue.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

Returns:

  • (AMQ::Protocol::Queue::DeleteOk)

    RabbitMQ response

See Also:



1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
# File 'lib/bunny/channel.rb', line 1168

def queue_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Delete.encode(@id,
      name,
      opts[:if_unused],
      opts[:if_empty],
      false))
  with_continuation_timeout do
    @last_queue_delete_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_delete_ok
end

#queue_purge(name, opts = {}) ⇒ AMQ::Protocol::Queue::PurgeOk

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

Returns:

  • (AMQ::Protocol::Queue::PurgeOk)

    RabbitMQ response

See Also:



1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
# File 'lib/bunny/channel.rb', line 1191

def queue_purge(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))

  with_continuation_timeout do
    @last_queue_purge_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_purge_ok
end

#queue_unbind(name, exchange, opts = {}) ⇒ AMQ::Protocol::Queue::UnbindOk

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (AMQ::Protocol::Queue::UnbindOk)

    RabbitMQ response

See Also:



1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
# File 'lib/bunny/channel.rb', line 1253

def queue_unbind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(AMQ::Protocol::Queue::Unbind.encode(@id,
      name,
      exchange_name,
      opts[:routing_key],
      opts[:arguments]))
  with_continuation_timeout do
    @last_queue_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_unbind_ok
end

#quorum_queue(name, opts = {}) ⇒ Bunny::Queue

Declares a new client-named quorum queue.

Parameters:

  • name (String)

    Queue name. Empty (server-generated) names are not supported by this method.

  • opts (Hash) (defaults to: {})

    Queue properties and other options. Durability, exclusivity, auto-deletion options will be ignored.

Options Hash (opts):

  • :arguments (Hash) — default: {}

    Optional arguments (x-arguments)

Returns:

See Also:



476
477
478
479
480
481
# File 'lib/bunny/channel.rb', line 476

def quorum_queue(name, opts = {})
  throw ArgumentError.new("quorum queue name must not be nil") if name.nil?
  throw ArgumentError.new("quorum queue name must not be empty (server-named QQs do not make sense)") if name.empty?

  durable_queue(name, Bunny::Queue::Types::QUORUM, opts)
end

#read_and_reset_only_acks_receivedObject



2000
2001
2002
2003
2004
2005
2006
# File 'lib/bunny/channel.rb', line 2000

def read_and_reset_only_acks_received
  @unconfirmed_set_mutex.synchronize do
    result = @only_acks_received
    @only_acks_received = true
    result
  end
end

#read_next_frame(options = {}) ⇒ Object



2048
2049
2050
# File 'lib/bunny/channel.rb', line 2048

def read_next_frame(options = {})
  @connection.read_next_frame(options = {})
end

#recover(ignored = true) ⇒ Object

Tells RabbitMQ to redeliver unacknowledged messages



561
562
563
564
# File 'lib/bunny/channel.rb', line 561

def recover(ignored = true)
  # RabbitMQ only supports basic.recover with requeue = true
  basic_recover(true)
end

#recover_cancelled_consumers!Object



1713
1714
1715
# File 'lib/bunny/channel.rb', line 1713

def recover_cancelled_consumers!
  @recover_cancelled_consumers = true
end

#recover_confirm_modeObject

Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature. Set the offset to the previous publish sequence index as the protocol will reset the index to after recovery.



1653
1654
1655
1656
1657
1658
1659
1660
1661
# File 'lib/bunny/channel.rb', line 1653

def recover_confirm_mode
  if using_publisher_confirmations?
    @unconfirmed_set_mutex.synchronize do
      @unconfirmed_set.clear
      @delivery_tag_offset = @next_publish_seq_no - 1
    end
    confirm_select(@confirms_callback)
  end
end

#recover_consumersObject

Recovers consumers. Used by the Automatic Network Failure Recovery feature.



1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
# File 'lib/bunny/channel.rb', line 1696

def recover_consumers
  unless @consumers.empty?
    @work_pool = ConsumerWorkPool.new(@work_pool.size, @work_pool.abort_on_exception)
    @work_pool.start
  end

  @consumer_mutex.synchronize { @consumers.values }.each do |c|
    c.recover_from_network_failure
  end
end

#recover_exchangesObject

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.



1675
1676
1677
1678
1679
# File 'lib/bunny/channel.rb', line 1675

def recover_exchanges
  @exchange_mutex.synchronize { @exchanges.values }.each do |x|
    x.recover_from_network_failure
  end
end

#recover_from_network_failureObject

Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure Recovery feature.



1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
# File 'lib/bunny/channel.rb', line 1626

def recover_from_network_failure
  @logger.debug { "Recovering channel #{@id} after network failure" }
  release_all_continuations

  recover_prefetch_setting
  recover_confirm_mode
  recover_tx_mode
  recover_exchanges
  # this includes recovering bindings
  recover_queues
  recover_consumers
  increment_recoveries_counter
end

#recover_prefetch_settingObject

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.



1644
1645
1646
# File 'lib/bunny/channel.rb', line 1644

def recover_prefetch_setting
  basic_qos(@prefetch_count, @prefetch_global) if @prefetch_count
end

#recover_queuesObject

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.



1685
1686
1687
1688
1689
1690
# File 'lib/bunny/channel.rb', line 1685

def recover_queues
  @queue_mutex.synchronize { @queues.values }.each do |q|
    @logger.debug { "Recovering queue #{q.name}" }
    q.recover_from_network_failure
  end
end

#recover_tx_modeObject

Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.



1667
1668
1669
# File 'lib/bunny/channel.rb', line 1667

def recover_tx_mode
  tx_select if @tx_mode
end

#recovers_cancelled_consumers?Boolean

Returns:

  • (Boolean)


1718
1719
1720
# File 'lib/bunny/channel.rb', line 1718

def recovers_cancelled_consumers?
  !!@recover_cancelled_consumers
end

#register_consumer(consumer_tag, consumer) ⇒ Object



1745
1746
1747
1748
1749
# File 'lib/bunny/channel.rb', line 1745

def register_consumer(consumer_tag, consumer)
  @consumer_mutex.synchronize do
    @consumers[consumer_tag] = consumer
  end
end

#register_exchange(exchange) ⇒ Object



2078
2079
2080
# File 'lib/bunny/channel.rb', line 2078

def register_exchange(exchange)
  @exchange_mutex.synchronize { @exchanges[exchange.name] = exchange }
end

#register_queue(queue) ⇒ Object



2063
2064
2065
# File 'lib/bunny/channel.rb', line 2063

def register_queue(queue)
  @queue_mutex.synchronize { @queues[queue.name] = queue }
end

#reject(delivery_tag, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • requeue (Boolean) (defaults to: false)

    Should this message be requeued instead of dropping it?

See Also:



581
582
583
# File 'lib/bunny/channel.rb', line 581

def reject(delivery_tag, requeue = false)
  basic_reject(delivery_tag.to_i, requeue)
end

#release_all_continuationsObject

Releases all continuations. Used by automatic network recovery.



2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
# File 'lib/bunny/channel.rb', line 2011

def release_all_continuations
  @threads_waiting_on_confirms_continuations.each do |t|
    t.run
  end
  @threads_waiting_on_continuations.each do |t|
    t.run
  end
  @threads_waiting_on_basic_get_continuations.each do |t|
    t.run
  end

  self.reset_continuations
end

#stream(name, opts = {}) ⇒ Bunny::Queue

Declares a new client-named stream (that Bunny can use as if it was a queue). Note that Bunny would still use AMQP 0-9-1 to perform operations on this “queue”. To use stream-specific operations and to gain from stream protocol efficiency and partitioning, use a Ruby client for the RabbitMQ stream protocol.

Parameters:

  • name (String)

    Stream name. Empty (server-generated) names are not supported by this method.

  • opts (Hash) (defaults to: {})

    Queue properties and other options. Durability, exclusivity, auto-deletion options will be ignored.

Options Hash (opts):

  • :arguments (Hash) — default: {}

    Optional arguments (x-arguments)

Returns:

See Also:



498
499
500
501
502
503
# File 'lib/bunny/channel.rb', line 498

def stream(name, opts = {})
  throw ArgumentError.new("stream name must not be nil") if name.nil?
  throw ArgumentError.new("stream name must not be empty (server-named QQs do not make sense)") if name.empty?

  durable_queue(name, Bunny::Queue::Types::STREAM, opts)
end

#synchronize(&block) ⇒ Object

Synchronizes given block using this channel’s mutex.



1580
1581
1582
# File 'lib/bunny/channel.rb', line 1580

def synchronize(&block)
  @publishing_mutex.synchronize(&block)
end

#temporary_queue(opts = {}) ⇒ Bunny::Queue

Declares a new server-named queue that is automatically deleted when the connection is closed.

Returns:

See Also:



538
539
540
541
542
543
# File 'lib/bunny/channel.rb', line 538

def temporary_queue(opts = {})
  temporary_queue_opts = {
    :exclusive => true
  }
  queue("", opts.merge(temporary_queue_opts))
end

#to_sString

Returns Brief human-readable representation of the channel.

Returns:

  • (String)

    Brief human-readable representation of the channel



1726
1727
1728
# File 'lib/bunny/channel.rb', line 1726

def to_s
  "#<#{self.class.name}:#{object_id} @id=#{self.number} @connection=#{@connection.to_s} @open=#{open?}>"
end

#topic(name, opts = {}) ⇒ Bunny::Exchange

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



391
392
393
# File 'lib/bunny/channel.rb', line 391

def topic(name, opts = {})
  find_exchange(name) || Exchange.new(self, :topic, name, opts)
end

#tx_commitAMQ::Protocol::Tx::CommitOk

Commits current transaction

Returns:

  • (AMQ::Protocol::Tx::CommitOk)

    RabbitMQ response



1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
# File 'lib/bunny/channel.rb', line 1485

def tx_commit
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
  with_continuation_timeout do
    @last_tx_commit_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_commit_ok
end

#tx_rollbackAMQ::Protocol::Tx::RollbackOk

Rolls back current transaction

Returns:

  • (AMQ::Protocol::Tx::RollbackOk)

    RabbitMQ response



1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
# File 'lib/bunny/channel.rb', line 1500

def tx_rollback
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
  with_continuation_timeout do
    @last_tx_rollback_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_rollback_ok
end

#tx_selectAMQ::Protocol::Tx::SelectOk

Puts the channel into transaction mode (starts a transaction)

Returns:

  • (AMQ::Protocol::Tx::SelectOk)

    RabbitMQ response



1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
# File 'lib/bunny/channel.rb', line 1469

def tx_select
  raise_if_no_longer_open!

  @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
  with_continuation_timeout do
    @last_tx_select_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!
  @tx_mode = true

  @last_tx_select_ok
end

#unregister_consumer(consumer_tag) ⇒ Object



1752
1753
1754
1755
1756
# File 'lib/bunny/channel.rb', line 1752

def unregister_consumer(consumer_tag)
  @consumer_mutex.synchronize do
    @consumers.delete(consumer_tag)
  end
end

#using_publisher_confirmations?Boolean Also known as: using_publisher_confirms?

Returns true if this channel has Publisher Confirms enabled, false otherwise.

Returns:

  • (Boolean)

    true if this channel has Publisher Confirms enabled, false otherwise



1525
1526
1527
# File 'lib/bunny/channel.rb', line 1525

def using_publisher_confirmations?
  @next_publish_seq_no > 0
end

#using_tx?Boolean

Returns true if this channel has transactions enabled.

Returns:

  • (Boolean)

    true if this channel has transactions enabled



1513
1514
1515
# File 'lib/bunny/channel.rb', line 1513

def using_tx?
  !!@tx_mode
end

#wait_for_confirmsBoolean

Blocks calling thread until confirms are received for all currently unacknowledged published messages. Returns immediately if there are no outstanding confirms.

Returns:

  • (Boolean)

    true if all messages were acknowledged positively since the last time this method was called, false otherwise

See Also:



1568
1569
1570
1571
# File 'lib/bunny/channel.rb', line 1568

def wait_for_confirms
  wait_on_confirms_continuations
  read_and_reset_only_acks_received
end

#wait_on_basic_get_continuationsObject



1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
# File 'lib/bunny/channel.rb', line 1959

def wait_on_basic_get_continuations
  if @connection.threaded
    t = Thread.current
    @threads_waiting_on_basic_get_continuations << t

    begin
      @basic_get_continuations.poll(@connection.continuation_timeout)
    ensure
      @threads_waiting_on_basic_get_continuations.delete(t)
    end
  else
    connection.reader_loop.run_once until @basic_get_continuations.length > 0

    @basic_get_continuations.pop
  end
end

#wait_on_confirms_continuationsObject



1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
# File 'lib/bunny/channel.rb', line 1977

def wait_on_confirms_continuations
  raise_if_no_longer_open!

  if @connection.threaded
    t = Thread.current
    @threads_waiting_on_confirms_continuations << t

    begin
      while @unconfirmed_set_mutex.synchronize { !@unconfirmed_set.empty? }
        @confirms_continuations.poll(@connection.continuation_timeout)
      end
    ensure
      @threads_waiting_on_confirms_continuations.delete(t)
    end
  else
    unless @unconfirmed_set.empty?
      connection.reader_loop.run_once until @confirms_continuations.length > 0
      @confirms_continuations.pop
    end
  end
end

#wait_on_continuationsObject



1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
# File 'lib/bunny/channel.rb', line 1941

def wait_on_continuations
  if @connection.threaded
    t = Thread.current
    @threads_waiting_on_continuations << t

    begin
      @continuations.poll(@connection.continuation_timeout)
    ensure
      @threads_waiting_on_continuations.delete(t)
    end
  else
    connection.reader_loop.run_once until @continuations.length > 0

    @continuations.pop
  end
end

#wait_on_continuations_timeoutObject



228
229
230
# File 'lib/bunny/channel.rb', line 228

def wait_on_continuations_timeout
  @connection.transport_write_timeout
end

#with_continuation_timeout(&block) ⇒ Object



1740
1741
1742
# File 'lib/bunny/channel.rb', line 1740

def with_continuation_timeout(&block)
  Bunny::Timeout.timeout(wait_on_continuations_timeout, ClientTimeout, &block)
end