Class: GorgonBunny::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon_bunny/lib/gorgon_bunny/channel.rb

Overview

## Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

## Opening Channels

Channels can be opened either via ‘GorgonBunny::Session#create_channel` (sufficient in the majority of cases) or by instantiating `GorgonBunny::Channel` directly:

This will automatically allocate a channel id.

## Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

## Higher-level API

GorgonBunny offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

### Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

### Exchange Operations In Higher-level API

## Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

## Channel IDs

Channels are identified by their ids which are integers. GorgonBunny takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is not a problem.

## Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With GorgonBunny, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying ‘channel.close` method information.

Examples:

conn = GorgonBunny.new
conn.start

ch   = conn.create_channel

ch  = conn.create_channel
ch.close

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue GorgonBunny::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "bunny.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue GorgonBunny::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Constant Summary collapse

DEFAULT_CONTENT_TYPE =
"application/octet-stream".freeze

Instance Attribute Summary collapse

Backwards compatibility with 0.8.0 collapse

Higher-level API for exchange operations collapse

Higher-level API for queue operations collapse

QoS and Flow Control collapse

Message acknowledgements collapse

Consumer and Message operations (basic.*) collapse

Queue operations (queue.*) collapse

Exchange operations (exchange.*) collapse

Flow control (channel.*) collapse

Transactions (tx.*) collapse

Publisher Confirms (confirm.*) collapse

Misc collapse

Network Failure Recovery collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1)) ⇒ Channel

Returns a new instance of Channel.

Parameters:

  • connection (GorgonBunny::Session) (defaults to: nil)

    AMQP 0.9.1 connection

  • id (Integer) (defaults to: nil)

    Channel id, pass nil to make GorgonBunny automatically allocate it

  • work_pool (GorgonBunny::ConsumerWorkPool) (defaults to: ConsumerWorkPool.new(1))

    Thread pool for delivery processing, by default of size 1



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 165

def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
  @connection = connection
  @logger     = connection.logger
  @id         = id || @connection.next_channel_id
  @status     = :opening

  @connection.register_channel(self)

  @queues     = Hash.new
  @exchanges  = Hash.new
  @consumers  = Hash.new
  @work_pool  = work_pool

  # synchronizes frameset delivery. MK.
  @publishing_mutex = @connection.mutex_impl.new
  @consumer_mutex   = @connection.mutex_impl.new

  @unconfirmed_set_mutex = @connection.mutex_impl.new

  self.reset_continuations

  # threads awaiting on continuations. Used to unblock
  # them when network connection goes down so that busy loops
  # that perform synchronous operations can work. MK.
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @next_publish_seq_no = 0

  @recoveries_counter = GorgonBunny::Concurrent::AtomicFixnum.new(0)
end

Instance Attribute Details

#connectionGorgonBunny::Session (readonly)

Returns AMQP connection this channel was opened on.

Returns:



142
143
144
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 142

def connection
  @connection
end

#consumersHash<String, GorgonBunny::Consumer> (readonly)

Returns Consumer instances declared on this channel.

Returns:



158
159
160
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 158

def consumers
  @consumers
end

#exchangesHash<String, GorgonBunny::Exchange> (readonly)

Returns Exchange instances declared on this channel.

Returns:



152
153
154
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 152

def exchanges
  @exchanges
end

#idInteger

Returns Channel id.

Returns:

  • (Integer)

    Channel id



140
141
142
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 140

def id
  @id
end

#nacked_setSet<Integer> (readonly)

Returns Set of nacked message indexes that have been nacked.

Returns:

  • (Set<Integer>)

    Set of nacked message indexes that have been nacked



156
157
158
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 156

def nacked_set
  @nacked_set
end

#next_publish_seq_noInteger (readonly)

Returns Next publisher confirmations sequence index.

Returns:

  • (Integer)

    Next publisher confirmations sequence index



148
149
150
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 148

def next_publish_seq_no
  @next_publish_seq_no
end

#queuesHash<String, GorgonBunny::Queue> (readonly)

Returns Queue instances declared on this channel.

Returns:



150
151
152
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 150

def queues
  @queues
end

#recoveries_counterObject (readonly)

Returns the value of attribute recoveries_counter.



198
199
200
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 198

def recoveries_counter
  @recoveries_counter
end

#statusSymbol (readonly)

Returns Channel status (:opening, :open, :closed).

Returns:

  • (Symbol)

    Channel status (:opening, :open, :closed)



144
145
146
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 144

def status
  @status
end

#unconfirmed_setSet<Integer> (readonly)

Returns Set of published message indexes that are currently unconfirmed.

Returns:

  • (Set<Integer>)

    Set of published message indexes that are currently unconfirmed



154
155
156
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 154

def unconfirmed_set
  @unconfirmed_set
end

#work_poolGorgonBunny::ConsumerWorkPool (readonly)

Returns Thread pool delivered messages are dispatched to.

Returns:



146
147
148
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 146

def work_pool
  @work_pool
end

Instance Method Details

#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to acknowledge

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be acknowledged as well?

See Also:



459
460
461
462
463
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 459

def ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_ack(delivery_tag.to_i, multiple)
  end
end

#activeBoolean

Returns true if this channel is open.

Returns:

  • (Boolean)

    true if this channel is open



254
255
256
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 254

def active
  open?
end

#add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block) ⇒ Object



1506
1507
1508
1509
1510
1511
1512
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1506

def add_consumer(queue, consumer_tag, no_ack, exclusive, arguments, &block)
  @consumer_mutex.synchronize do
    c = Consumer.new(self, queue, consumer_tag, no_ack, exclusive, arguments)
    c.on_delivery(&block) if block
    @consumers[consumer_tag] = c
  end
end

#any_consumers?Boolean

Returns true if there are consumers on this channel.

Returns:

  • (Boolean)

    true if there are consumers on this channel



925
926
927
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 925

def any_consumers?
  @consumer_mutex.synchronize { @consumers.any? }
end

#basic_ack(delivery_tag, multiple) ⇒ NilClass

Acknowledges a delivery (message).

Examples:

Ack a message

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_ack(delivery_info.delivery_tag)
end

Ack a message fetched via basic.get

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
ch.basic_ack(delivery_info.delivery_tag)

Ack multiple messages fetched via basic.get

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
# ack all fetched messages up to payload3
ch.basic_ack(delivery_info.delivery_tag, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • multiple (Boolean)

    Should all deliveries up to this one be acknowledged?

Returns:

  • (NilClass)

    nil

See Also:



720
721
722
723
724
725
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 720

def basic_ack(delivery_tag, multiple)
  raise_if_no_longer_open!
  @connection.send_frame(GorgonAMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))

  nil
end

#basic_cancel(consumer_tag) ⇒ GorgonAMQ::Protocol::Basic::CancelOk

Removes a consumer. Messages for this consumer will no longer be delivered. If the queue it was on is auto-deleted and this consumer was the last one, the queue will be deleted.

Parameters:

  • consumer_tag (String)

    Consumer tag (unique identifier) to cancel

Returns:

See Also:



911
912
913
914
915
916
917
918
919
920
921
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 911

def basic_cancel(consumer_tag)
  @connection.send_frame(GorgonAMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))

  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_basic_cancel_ok = wait_on_continuations
  end

  maybe_kill_consumer_work_pool! unless any_consumers?

  @last_basic_cancel_ok
end

#basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block) ⇒ GorgonAMQ::Protocol::Basic::ConsumeOk Also known as: consume

Registers a consumer for queue. Delivered messages will be handled with the block provided to this method.

Parameters:

  • queue (String, GorgonBunny::Queue)

    Queue to consume from

  • consumer_tag (String) (defaults to: generate_consumer_tag)

    Consumer tag (unique identifier), generated by GorgonBunny by default

  • no_ack (Boolean) (defaults to: false)

    (false) If true, delivered messages will be automatically acknowledged. If false, manual acknowledgements will be necessary.

  • exclusive (Boolean) (defaults to: false)

    (false) Should this consumer be exclusive?

  • arguments (Hash) (defaults to: nil)

    (nil) Optional arguments that may be used by RabbitMQ extensions, etc

Returns:

See Also:



803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 803

def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  queue_name = if queue.respond_to?(:name)
                 queue.name
               else
                 queue
               end

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer_tag && consumer_tag.strip != GorgonAMQ::Protocol::EMPTY_STRING
    add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
  end

  @connection.send_frame(GorgonAMQ::Protocol::Basic::Consume.encode(@id,
      queue_name,
      consumer_tag,
      false,
      no_ack,
      exclusive,
      false,
      arguments))

  begin
    GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  add_consumer(queue_name, @last_basic_consume_ok.consumer_tag, no_ack, exclusive, arguments, &block)

  @last_basic_consume_ok
end

#basic_consume_with(consumer) ⇒ GorgonAMQ::Protocol::Basic::ConsumeOk Also known as: consume_with

Registers a consumer for queue as GorgonBunny::Consumer instance.

Parameters:

  • consumer (GorgonBunny::Consumer)

    Consumer to register. It should already have queue name, consumer tag and other attributes set.

Returns:

See Also:



859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 859

def basic_consume_with(consumer)
  raise_if_no_longer_open!
  maybe_start_consumer_work_pool!

  # helps avoid race condition between basic.consume-ok and basic.deliver if there are messages
  # in the queue already. MK.
  if consumer.consumer_tag && consumer.consumer_tag.strip != GorgonAMQ::Protocol::EMPTY_STRING
    register_consumer(consumer.consumer_tag, consumer)
  end

  @connection.send_frame(GorgonAMQ::Protocol::Basic::Consume.encode(@id,
      consumer.queue_name,
      consumer.consumer_tag,
      false,
      consumer.no_ack,
      consumer.exclusive,
      false,
      consumer.arguments))

  begin
    GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
      @last_basic_consume_ok = wait_on_continuations
    end
  rescue Exception => e
    # if basic.consume-ok never arrives, unregister the proactively
    # registered consumer. MK.
    unregister_consumer(@last_basic_consume_ok.consumer_tag)

    raise e
  end

  # in case there is another exclusive consumer and we get a channel.close
  # response here. MK.
  raise_if_channel_close!(@last_basic_consume_ok)

  # covers server-generated consumer tags
  register_consumer(@last_basic_consume_ok.consumer_tag, consumer)

  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_consume_ok
end

#basic_get(queue, opts = {:ack => true}) ⇒ Array

Synchronously fetches a message from the queue, if there are any. This method is for cases when the convenience of synchronous operations is more important than throughput.

Examples:

Using GorgonBunny::Channel#basic_get with manual acknowledgements

conn = GorgonBunny.new
conn.start
ch   = conn.create_channel
# here we assume the queue already exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue1", :ack => true)
ch.acknowledge(delivery_info.delivery_tag)

Parameters:

  • queue (String)

    Queue name

  • opts (Hash) (defaults to: {:ack => true})

    Options

Options Hash (opts):

  • :ack (Boolean) — default: true

    Will this message be acknowledged manually?

Returns:

  • (Array)

    A triple of delivery info, message properties and message content

See Also:



572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 572

def basic_get(queue, opts = {:ack => true})
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Basic::Get.encode(@id, queue, !(opts[:ack])))
  # this is a workaround for the edge case when basic_get is called in a tight loop
  # and network goes down we need to perform recovery. The problem is, basic_get will
  # keep blocking the thread that calls it without clear way to constantly unblock it
  # from the network activity loop (where recovery happens) with the current continuations
  # implementation (and even more correct and convenient ones, such as wait/notify, should
  # we implement them). So we return a triple of nils immediately which apps should be
  # able to handle anyway as "got no message, no need to act". MK.
  @last_basic_get_response = if @connection.open?
                               wait_on_basic_get_continuations
                             else
                               [nil, nil, nil]
                             end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_basic_get_response
end

#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.

Examples:

Requeue a message

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag, false, true)
end

Reject a message

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_nack(delivery_info.delivery_tag)
end

Requeue a message fetched via basic.get

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
ch.basic_nack(delivery_info.delivery_tag, false, true)

Requeue multiple messages fetched via basic.get

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
_, _, payload1 = ch.basic_get("bunny.examples.queue3", :ack => true)
_, _, payload2 = ch.basic_get("bunny.examples.queue3", :ack => true)
delivery_info, properties, payload3 = ch.basic_get("bunny.examples.queue3", :ack => true)
# requeue all fetched messages up to payload3
ch.basic_nack(delivery_info.delivery_tag, true, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean) (defaults to: false)

    Should the message be requeued?

  • multiple (Boolean) (defaults to: false)

    Should all deliveries up to this one be rejected/requeued?

Returns:

  • (NilClass)

    nil

See Also:



780
781
782
783
784
785
786
787
788
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 780

def basic_nack(delivery_tag, multiple = false, requeue = false)
  raise_if_no_longer_open!
  @connection.send_frame(GorgonAMQ::Protocol::Basic::Nack.encode(@id,
      delivery_tag,
      multiple,
      requeue))

  nil
end

#basic_publish(payload, exchange, routing_key, opts = {}) ⇒ GorgonBunny::Channel

Publishes a message using basic.publish AMQP 0.9.1 method.

Parameters:

  • payload (String)

    Message payload. It will never be modified by GorgonBunny or RabbitMQ in any way.

  • exchange (String)

    Exchange to publish to

  • routing_key (String)

    Routing key

  • opts (Hash) (defaults to: {})

    Publishing options

Options Hash (opts):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 514

def basic_publish(payload, exchange, routing_key, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  mode = if opts.fetch(:persistent, true)
           2
         else
           1
         end

  opts[:delivery_mode] ||= mode
  opts[:content_type]  ||= DEFAULT_CONTENT_TYPE
  opts[:priority]      ||= 0

  if @next_publish_seq_no > 0
    @unconfirmed_set.add(@next_publish_seq_no)
    @next_publish_seq_no += 1
  end

  frames = GorgonAMQ::Protocol::Basic::Publish.encode(@id,
    payload,
    opts,
    exchange_name,
    routing_key,
    opts[:mandatory],
    false,
    @connection.frame_max)
  @connection.send_frameset_without_timeout(frames, self)

  self
end

#basic_qos(prefetch_count, global = false) ⇒ GorgonAMQ::Protocol::Basic::QosOk

Controls message delivery rate using basic.qos AMQP 0.9.1 method.

Parameters:

  • prefetch_count (Integer)

    How many messages can consumers on this channel be given at a time (before they have to acknowledge or reject one of the earlier received messages)

  • global (Boolean) (defaults to: false)

    (false) Ignored, as it is not supported by RabbitMQ

Returns:

Raises:

  • (ArgumentError)

See Also:



602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 602

def basic_qos(prefetch_count, global = false)
  raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))

  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_basic_qos_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @prefetch_count = prefetch_count

  @last_basic_qos_ok
end

#basic_recover(requeue) ⇒ GorgonAMQ::Protocol::Basic::RecoverOk

Redeliver unacknowledged messages

Parameters:

  • requeue (Boolean)

    Should messages be requeued?

Returns:



623
624
625
626
627
628
629
630
631
632
633
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 623

def basic_recover(requeue)
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Basic::Recover.encode(@id, requeue))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_basic_recover_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_basic_recover_ok
end

#basic_reject(delivery_tag, requeue) ⇒ NilClass

Rejects or requeues a message.

Examples:

Requeue a message

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = GorgonBunny.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean)

    Should the message be requeued?

Returns:

  • (NilClass)

    nil

See Also:



673
674
675
676
677
678
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 673

def basic_reject(delivery_tag, requeue)
  raise_if_no_longer_open!
  @connection.send_frame(GorgonAMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))

  nil
end

#channel_flow(active) ⇒ GorgonAMQ::Protocol::Channel::FlowOk

Note:

Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects that consumers do not keep up with them.

Enables or disables message flow for the channel. When message flow is disabled, no new messages will be delivered to consumers on this channel. This is typically used by consumers that cannot keep up with the influx of messages.

Returns:

See Also:



1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1255

def channel_flow(active)
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Channel::Flow.encode(@id, active))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_channel_flow_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_channel_flow_ok
end

#channel_level_exception_after_operation_that_has_no_response?(method) ⇒ Boolean

Returns:

  • (Boolean)


1590
1591
1592
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1590

def channel_level_exception_after_operation_that_has_no_response?(method)
  method.reply_code == 406 && method.reply_text =~ /unknown delivery tag/
end

#clientGorgonBunny::Session

Returns Connection this channel was opened on.

Returns:



259
260
261
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 259

def client
  @connection
end

#closeObject

Closes the channel. Closed channels can no longer be used (this includes associated Queue, Exchange and GorgonBunny::Consumer instances.



225
226
227
228
229
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 225

def close
  @connection.close_channel(self)
  closed!
  maybe_kill_consumer_work_pool!
end

#closed?Boolean

Returns true if this channel is closed (manually or because of an exception), false otherwise.

Returns:

  • (Boolean)

    true if this channel is closed (manually or because of an exception), false otherwise



239
240
241
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 239

def closed?
  @status == :closed
end

#confirm_select(callback = nil) ⇒ GorgonAMQ::Protocol::Confirm::SelectOk

Enables publisher confirms for the channel.



1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1337

def confirm_select(callback=nil)
  raise_if_no_longer_open!

  if @next_publish_seq_no == 0
    @confirms_continuations = new_continuation
    @unconfirmed_set        = Set.new
    @nacked_set             = Set.new
    @next_publish_seq_no    = 1
  end

  @confirms_callback = callback

  @connection.send_frame(GorgonAMQ::Protocol::Confirm::Select.encode(@id, false))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_confirm_select_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!
  @last_confirm_select_ok
end

#default_exchangeObject

Provides access to the default exchange



352
353
354
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 352

def default_exchange
  self.direct(GorgonAMQ::Protocol::EMPTY_STRING, :no_declare => true)
end

#deregister_exchange(exchange) ⇒ Object



1772
1773
1774
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1772

def deregister_exchange(exchange)
  @exchanges.delete(exchange.name)
end

#deregister_queue(queue) ⇒ Object



1752
1753
1754
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1752

def deregister_queue(queue)
  @queues.delete(queue.name)
end

#deregister_queue_named(name) ⇒ Object



1757
1758
1759
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1757

def deregister_queue_named(name)
  @queues.delete(name)
end

#direct(name, opts = {}) ⇒ GorgonBunny::Exchange

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



309
310
311
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 309

def direct(name, opts = {})
  Exchange.new(self, :direct, name, opts)
end

#exchange(name, opts = {}) ⇒ GorgonBunny::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or “x-consistent-hash”

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



370
371
372
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 370

def exchange(name, opts = {})
  Exchange.new(self, opts.fetch(:type, :direct), name, opts)
end

#exchange_bind(source, destination, opts = {}) ⇒ GorgonAMQ::Protocol::Exchange::BindOk

Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • destination (String)

    Destination exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1166

def exchange_bind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(GorgonAMQ::Protocol::Exchange::Bind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_exchange_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_bind_ok
end

#exchange_declare(name, type, opts = {}) ⇒ GorgonAMQ::Protocol::Exchange::DeclareOk

Declares a echange using echange.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange properties

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this echange be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived exchanges.

  • auto_delete (Boolean) — default: false

    Should this echange be deleted when it is no longer used?

  • passive (Boolean) — default: false

    If true, exchange will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

See Also:



1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1106

def exchange_declare(name, type, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Exchange::Declare.encode(@id,
      name,
      type.to_s,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:auto_delete, false),
      false,
      false,
      opts[:arguments]))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_exchange_declare_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_declare_ok
end

#exchange_delete(name, opts = {}) ⇒ GorgonAMQ::Protocol::Exchange::DeleteOk

Deletes a exchange using exchange.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this exchange be deleted only if it is no longer used

Returns:

See Also:



1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1136

def exchange_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Exchange::Delete.encode(@id,
      name,
      opts[:if_unused],
      false))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_exchange_delete_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_delete_ok
end

#exchange_unbind(source, destination, opts = {}) ⇒ GorgonAMQ::Protocol::Exchange::UnbindOk

Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • destination (String)

    Destination exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1210

def exchange_unbind(source, destination, opts = {})
  raise_if_no_longer_open!

  source_name = if source.respond_to?(:name)
                  source.name
                else
                  source
                end

  destination_name = if destination.respond_to?(:name)
                       destination.name
                     else
                       destination
                     end

  @connection.send_frame(GorgonAMQ::Protocol::Exchange::Unbind.encode(@id,
      destination_name,
      source_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_exchange_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_exchange_unbind_ok
end

#fanout(name, opts = {}) ⇒ GorgonBunny::Exchange

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



291
292
293
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 291

def fanout(name, opts = {})
  Exchange.new(self, :fanout, name, opts)
end

#find_exchange(name) ⇒ Object



1782
1783
1784
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1782

def find_exchange(name)
  @exchanges[name]
end

#find_queue(name) ⇒ Object



1767
1768
1769
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1767

def find_queue(name)
  @queues[name]
end

#flow(active) ⇒ Object

Flow control. When set to false, RabbitMQ will stop delivering messages on this channel.

Parameters:

  • active (Boolean)

    Should messages to consumers on this channel be delivered?



420
421
422
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 420

def flow(active)
  channel_flow(active)
end

#frame_sizeObject



264
265
266
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 264

def frame_size
  @connection.frame_max
end

#generate_consumer_tag(name = "bunny") ⇒ String

Unique string supposed to be used as a consumer tag.

Returns:

  • (String)

    Unique string.



1388
1389
1390
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1388

def generate_consumer_tag(name = "bunny")
  "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end

#handle_ack_or_nack(delivery_tag, multiple, nack) ⇒ Object



1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1629

def handle_ack_or_nack(delivery_tag, multiple, nack)
  if nack
    cloned_set = @unconfirmed_set.clone
    if multiple
      cloned_set.keep_if { |i| i <= delivery_tag }
      @nacked_set.merge(cloned_set)
    else
      @nacked_set.add(delivery_tag)
    end
  end

  if multiple
    @unconfirmed_set.delete_if { |i| i <= delivery_tag }
  else
    @unconfirmed_set.delete(delivery_tag)
  end

  @unconfirmed_set_mutex.synchronize do
    @only_acks_received = (@only_acks_received && !nack)

    @confirms_continuations.push(true) if @unconfirmed_set.empty?
    @confirms_callback.call(delivery_tag, multiple, nack) if @confirms_callback
  end
end

#handle_basic_get_empty(basic_get_empty) ⇒ Object



1601
1602
1603
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1601

def handle_basic_get_empty(basic_get_empty)
  @basic_get_continuations.push([nil, nil, nil])
end

#handle_basic_get_ok(basic_get_ok, properties, content) ⇒ Object



1595
1596
1597
1598
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1595

def handle_basic_get_ok(basic_get_ok, properties, content)
  basic_get_ok.delivery_tag = VersionedDeliveryTag.new(basic_get_ok.delivery_tag, @recoveries_counter.get)
  @basic_get_continuations.push([basic_get_ok, properties, content])
end

#handle_basic_return(basic_return, properties, content) ⇒ Object



1618
1619
1620
1621
1622
1623
1624
1625
1626
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1618

def handle_basic_return(basic_return, properties, content)
  x = find_exchange(basic_return.exchange)

  if x
    x.handle_return(ReturnInfo.new(basic_return), MessageProperties.new(properties), content)
  else
    @logger.warn "Exchange #{basic_return.exchange} is not in channel #{@id}'s cache! Dropping returned message!"
  end
end

#handle_frameset(basic_deliver, properties, content) ⇒ Object



1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1606

def handle_frameset(basic_deliver, properties, content)
  consumer = @consumers[basic_deliver.consumer_tag]
  if consumer
    @work_pool.submit do
      consumer.call(DeliveryInfo.new(basic_deliver, consumer, self), MessageProperties.new(properties), content)
    end
  else
    @logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!"
  end
end

#handle_method(method) ⇒ Object



1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1515

def handle_method(method)
  @logger.debug "Channel#handle_frame on channel #{@id}: #{method.inspect}"
  case method
  when GorgonAMQ::Protocol::Queue::DeclareOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Queue::DeleteOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Queue::PurgeOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Queue::BindOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Queue::UnbindOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Exchange::BindOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Exchange::UnbindOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Exchange::DeclareOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Exchange::DeleteOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Basic::QosOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Basic::RecoverOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Channel::FlowOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Basic::ConsumeOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Basic::Cancel then
    if consumer = @consumers[method.consumer_tag]
      @work_pool.submit do
        begin
          @consumers.delete(method.consumer_tag)
          consumer.handle_cancellation(method)
        rescue Exception => e
          @logger.error "Got exception when notifying consumer #{method.consumer_tag} about cancellation!"
        end
      end
    else
      @logger.warn "No consumer for tag #{method.consumer_tag} on channel #{@id}!"
    end
  when GorgonAMQ::Protocol::Basic::CancelOk then
    @continuations.push(method)
    unregister_consumer(method.consumer_tag)
  when GorgonAMQ::Protocol::Tx::SelectOk, GorgonAMQ::Protocol::Tx::CommitOk, GorgonAMQ::Protocol::Tx::RollbackOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Tx::SelectOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Confirm::SelectOk then
    @continuations.push(method)
  when GorgonAMQ::Protocol::Basic::Ack then
    handle_ack_or_nack(method.delivery_tag, method.multiple, false)
  when GorgonAMQ::Protocol::Basic::Nack then
    handle_ack_or_nack(method.delivery_tag, method.multiple, true)
  when GorgonAMQ::Protocol::Channel::Close then
    closed!
    @connection.send_frame(GorgonAMQ::Protocol::Channel::CloseOk.encode(@id))

    # basic.ack, basic.reject, basic.nack. MK.
    if channel_level_exception_after_operation_that_has_no_response?(method)
      @on_error.call(self, method) if @on_error
    else
      @last_channel_error = instantiate_channel_level_exception(method)
      @continuations.push(method)
    end

  when GorgonAMQ::Protocol::Channel::CloseOk then
    @continuations.push(method)
  else
    raise "Do not know how to handle #{method.inspect} in GorgonBunny::Channel#handle_method"
  end
end

#headers(name, opts = {}) ⇒ GorgonBunny::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



345
346
347
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 345

def headers(name, opts = {})
  Exchange.new(self, :headers, name, opts)
end

#increment_recoveries_counterObject



1474
1475
1476
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1474

def increment_recoveries_counter
  @recoveries_counter.increment
end

#maybe_kill_consumer_work_pool!Object



1740
1741
1742
1743
1744
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1740

def maybe_kill_consumer_work_pool!
  if @work_pool && @work_pool.running?
    @work_pool.kill
  end
end

#maybe_pause_consumer_work_pool!Object



1735
1736
1737
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1735

def maybe_pause_consumer_work_pool!
  @work_pool.pause if @work_pool && @work_pool.running?
end

#maybe_start_consumer_work_pool!Object

Starts consumer work pool. Lazily called by #basic_consume to avoid creating new threads that won’t do any real work for channels that do not register consumers (e.g. only used for publishing). MK.



1728
1729
1730
1731
1732
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1728

def maybe_start_consumer_work_pool!
  if @work_pool && !@work_pool.running?
    @work_pool.start
  end
end

#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be rejected as well?

  • requeue (Boolean) (defaults to: false)

    (false) Should this message be requeued instead of dropping it?

See Also:



476
477
478
479
480
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 476

def nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

#numberInteger

Returns Channel id.

Returns:

  • (Integer)

    Channel id



249
250
251
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 249

def number
  self.id
end

#on_error(&block) ⇒ Object

Defines a handler for errors that are not responses to a particular operations (e.g. basic.ack, basic.reject, basic.nack).



1403
1404
1405
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1403

def on_error(&block)
  @on_error = block
end

#openGorgonBunny::Channel

Opens the channel and resets its internal state

Returns:



208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 208

def open
  @threads_waiting_on_continuations           = Set.new
  @threads_waiting_on_confirms_continuations  = Set.new
  @threads_waiting_on_basic_get_continuations = Set.new

  @connection.open_channel(self)
  # clear last channel error
  @last_channel_error = nil

  @status = :open

  self
end

#open?Boolean

Returns true if this channel is open, false otherwise.

Returns:

  • (Boolean)

    true if this channel is open, false otherwise



233
234
235
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 233

def open?
  @status == :open
end

#prefetch(prefetch_count) ⇒ Object

Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages

Parameters:

  • prefetch_count (Integer)

    Prefetch (QoS setting) for this channel

See Also:



411
412
413
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 411

def prefetch(prefetch_count)
  self.basic_qos(prefetch_count, false)
end

#queue(name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {}) ⇒ GorgonBunny::Queue

Declares a queue or looks it up in the per-channel cache.

Parameters:

  • name (String) (defaults to: GorgonAMQ::Protocol::EMPTY_STRING)

    Queue name. Pass an empty string to declare a server-named queue (make RabbitMQ generate a unique name).

  • opts (Hash) (defaults to: {})

    Queue properties and other options

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

Returns:

See Also:



393
394
395
396
397
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 393

def queue(name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {})
  q = find_queue(name) || GorgonBunny::Queue.new(self, name, opts)

  register_queue(q)
end

#queue_bind(name, exchange, opts = {}) ⇒ GorgonAMQ::Protocol::Queue::BindOk

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1029

def queue_bind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(GorgonAMQ::Protocol::Queue::Bind.encode(@id,
      name,
      exchange_name,
      opts[:routing_key],
      false,
      opts[:arguments]))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_queue_bind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_bind_ok
end

#queue_declare(name, opts = {}) ⇒ GorgonAMQ::Protocol::Queue::DeclareOk

Declares a queue using queue.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

  • opts (Hash) (defaults to: {})

    Queue properties

Options Hash (opts):

  • durable (Boolean) — default: false

    Should information about this queue be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived queues.

  • auto_delete (Boolean) — default: false

    Should this queue be deleted when the last consumer is cancelled?

  • exclusive (Boolean) — default: false

    Should only this connection be able to use this queue? If true, the queue will be automatically deleted when this connection is closed

  • passive (Boolean) — default: false

    If true, queue will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

See Also:



951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 951

def queue_declare(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Queue::Declare.encode(@id,
      name,
      opts.fetch(:passive, false),
      opts.fetch(:durable, false),
      opts.fetch(:exclusive, false),
      opts.fetch(:auto_delete, false),
      false,
      opts[:arguments]))
  @last_queue_declare_ok = wait_on_continuations

  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_declare_ok
end

#queue_delete(name, opts = {}) ⇒ GorgonAMQ::Protocol::Queue::DeleteOk

Deletes a queue using queue.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

Returns:

See Also:



980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 980

def queue_delete(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Queue::Delete.encode(@id,
      name,
      opts[:if_unused],
      opts[:if_empty],
      false))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_queue_delete_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_delete_ok
end

#queue_purge(name, opts = {}) ⇒ GorgonAMQ::Protocol::Queue::PurgeOk

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

Returns:

See Also:



1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1003

def queue_purge(name, opts = {})
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Queue::Purge.encode(@id, name, false))

  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_queue_purge_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_queue_purge_ok
end

#queue_unbind(name, exchange, opts = {}) ⇒ GorgonAMQ::Protocol::Queue::UnbindOk

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1065

def queue_unbind(name, exchange, opts = {})
  raise_if_no_longer_open!

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end

  @connection.send_frame(GorgonAMQ::Protocol::Queue::Unbind.encode(@id,
      name,
      exchange_name,
      opts[:routing_key],
      opts[:arguments]))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_queue_unbind_ok = wait_on_continuations
  end

  raise_if_continuation_resulted_in_a_channel_error!
  @last_queue_unbind_ok
end

#read_next_frame(options = {}) ⇒ Object



1747
1748
1749
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1747

def read_next_frame(options = {})
  @connection.read_next_frame(options = {})
end

#read_write_timeoutObject



201
202
203
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 201

def read_write_timeout
  @connection.read_write_timeout
end

#recover(ignored = true) ⇒ Object

Tells RabbitMQ to redeliver unacknowledged messages



426
427
428
429
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 426

def recover(ignored = true)
  # RabbitMQ only supports basic.recover with requeue = true
  basic_recover(true)
end

#recover_consumersObject

Recovers consumers. Used by the Automatic Network Failure Recovery feature.



1463
1464
1465
1466
1467
1468
1469
1470
1471
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1463

def recover_consumers
  unless @consumers.empty?
    @work_pool = ConsumerWorkPool.new(@work_pool.size)
    @work_pool.start
  end
  @consumers.values.dup.each do |c|
    c.recover_from_network_failure
  end
end

#recover_exchangesObject

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.



1442
1443
1444
1445
1446
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1442

def recover_exchanges
  @exchanges.values.dup.each do |x|
    x.recover_from_network_failure
  end
end

#recover_from_network_failureObject

Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure Recovery feature.



1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1418

def recover_from_network_failure
  @logger.debug "Recovering channel #{@id} after network failure"
  release_all_continuations

  recover_prefetch_setting
  recover_exchanges
  # this includes recovering bindings
  recover_queues
  recover_consumers
  increment_recoveries_counter
end

#recover_prefetch_settingObject

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.



1434
1435
1436
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1434

def recover_prefetch_setting
  basic_qos(@prefetch_count) if @prefetch_count
end

#recover_queuesObject

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.



1452
1453
1454
1455
1456
1457
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1452

def recover_queues
  @queues.values.dup.each do |q|
    @logger.debug "Recovering queue #{q.name}"
    q.recover_from_network_failure
  end
end

#register_consumer(consumer_tag, consumer) ⇒ Object



1492
1493
1494
1495
1496
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1492

def register_consumer(consumer_tag, consumer)
  @consumer_mutex.synchronize do
    @consumers[consumer_tag] = consumer
  end
end

#register_exchange(exchange) ⇒ Object



1777
1778
1779
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1777

def register_exchange(exchange)
  @exchanges[exchange.name] = exchange
end

#register_queue(queue) ⇒ Object



1762
1763
1764
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1762

def register_queue(queue)
  @queues[queue.name] = queue
end

#reject(delivery_tag, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • requeue (Boolean) (defaults to: false)

    Should this message be requeued instead of dropping it?

See Also:



446
447
448
449
450
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 446

def reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_reject(delivery_tag.to_i, requeue)
  end
end

#release_all_continuationsObject

Releases all continuations. Used by automatic network recovery.



1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1710

def release_all_continuations
  @threads_waiting_on_confirms_continuations.each do |t|
    t.run
  end
  @threads_waiting_on_continuations.each do |t|
    t.run
  end
  @threads_waiting_on_basic_get_continuations.each do |t|
    t.run
  end

  self.reset_continuations
end

#synchronize(&block) ⇒ Object

Synchronizes given block using this channel’s mutex.



1380
1381
1382
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1380

def synchronize(&block)
  @publishing_mutex.synchronize(&block)
end

#to_sString

Returns Brief human-readable representation of the channel.

Returns:

  • (String)

    Brief human-readable representation of the channel



1482
1483
1484
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1482

def to_s
  "#<#{self.class.name}:#{object_id} @id=#{self.number} @connection=#{@connection.to_s}>"
end

#topic(name, opts = {}) ⇒ GorgonBunny::Exchange

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



327
328
329
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 327

def topic(name, opts = {})
  Exchange.new(self, :topic, name, opts)
end

#tx_commitGorgonAMQ::Protocol::Tx::CommitOk

Commits current transaction

Returns:



1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1291

def tx_commit
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Tx::Commit.encode(@id))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_tx_commit_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_commit_ok
end

#tx_rollbackGorgonAMQ::Protocol::Tx::RollbackOk

Rolls back current transaction

Returns:



1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1306

def tx_rollback
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Tx::Rollback.encode(@id))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_tx_rollback_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_rollback_ok
end

#tx_selectGorgonAMQ::Protocol::Tx::SelectOk

Puts the channel into transaction mode (starts a transaction)

Returns:



1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1276

def tx_select
  raise_if_no_longer_open!

  @connection.send_frame(GorgonAMQ::Protocol::Tx::Select.encode(@id))
  GorgonBunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
    @last_tx_select_ok = wait_on_continuations
  end
  raise_if_continuation_resulted_in_a_channel_error!

  @last_tx_select_ok
end

#unregister_consumer(consumer_tag) ⇒ Object



1499
1500
1501
1502
1503
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1499

def unregister_consumer(consumer_tag)
  @consumer_mutex.synchronize do
    @consumers.delete(consumer_tag)
  end
end

#using_publisher_confirmations?Boolean

Returns true if this channel has Publisher Confirms enabled, false otherwise.

Returns:

  • (Boolean)

    true if this channel has Publisher Confirms enabled, false otherwise



1326
1327
1328
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1326

def using_publisher_confirmations?
  @next_publish_seq_no > 0
end

#wait_for_confirmsBoolean

Blocks calling thread until confirms are received for all currently unacknowledged published messages.

Returns:

  • (Boolean)

    true if all messages were acknowledged positively, false otherwise

See Also:



1366
1367
1368
1369
1370
1371
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1366

def wait_for_confirms
  @only_acks_received = true
  wait_on_confirms_continuations

  @only_acks_received
end

#wait_on_basic_get_continuationsObject



1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1673

def wait_on_basic_get_continuations
  if @connection.threaded
    t = Thread.current
    @threads_waiting_on_basic_get_continuations << t

    begin
      @basic_get_continuations.poll(@connection.continuation_timeout)
    ensure
      @threads_waiting_on_basic_get_continuations.delete(t)
    end
  else
    connection.event_loop.run_once until @basic_get_continuations.length > 0

    @basic_get_continuations.pop
  end
end

#wait_on_confirms_continuationsObject



1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1691

def wait_on_confirms_continuations
  if @connection.threaded
    t = Thread.current
    @threads_waiting_on_confirms_continuations << t

    begin
      @confirms_continuations.poll(@connection.continuation_timeout)
    ensure
      @threads_waiting_on_confirms_continuations.delete(t)
    end
  else
    connection.event_loop.run_once until @confirms_continuations.length > 0

    @confirms_continuations.pop
  end
end

#wait_on_continuationsObject



1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
# File 'lib/gorgon_bunny/lib/gorgon_bunny/channel.rb', line 1655

def wait_on_continuations
  if @connection.threaded
    t = Thread.current
    @threads_waiting_on_continuations << t

    begin
      @continuations.poll(@connection.continuation_timeout)
    ensure
      @threads_waiting_on_continuations.delete(t)
    end
  else
    connection.reader_loop.run_once until @continuations.length > 0

    @continuations.pop
  end
end