Class: MarchHare::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/channel.rb

Overview

Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

Opening Channels

Channels can be opened either via MarchHare::Session#create_channel (sufficient in the majority of cases) or by instantiating MarchHare::Channel directly:

This will automatically allocate a channel id.

Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

Higher-level API

MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

Exchange Operations In Higher-level API

Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

Channel IDs

Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.

Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying channel.close method information.

Examples:

conn = MarchHare.new
conn.start

ch   = conn.create_channel

ch  = conn.create_channel
ch.close

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue MarchHare::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "rubymarchhare.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue MarchHare::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Instance Attribute Summary collapse

Exchanges collapse

Queues collapse

basic.* collapse

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object


877
878
879
# File 'lib/march_hare/channel.rb', line 877

def method_missing(selector, *args)
  @delegate.__send__(selector, *args)
end

Instance Attribute Details

#consumersArray<MarchHare::Consumer> (readonly)


118
119
120
# File 'lib/march_hare/channel.rb', line 118

def consumers
  @consumers
end

#recoveries_counterObject (readonly)

Returns the value of attribute recoveries_counter


290
291
292
# File 'lib/march_hare/channel.rb', line 290

def recoveries_counter
  @recoveries_counter
end

Instance Method Details

#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.


665
666
667
668
669
# File 'lib/march_hare/channel.rb', line 665

def ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_ack(delivery_tag.to_i, multiple)
  end
end

#basic_ack(delivery_tag, multiple) ⇒ NilClass

Acknowledges one or more messages (deliveries).


751
752
753
754
755
# File 'lib/march_hare/channel.rb', line 751

def basic_ack(delivery_tag, multiple)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_ack(delivery_tag.to_i, multiple)
  end
end

#basic_consume(queue, auto_ack, consumer_tag = nil, consumer) ⇒ Object


617
618
619
620
621
622
623
624
625
626
627
628
629
# File 'lib/march_hare/channel.rb', line 617

def basic_consume(queue, auto_ack, consumer_tag = nil, consumer)
  consumer.auto_ack = auto_ack
  tag = converting_rjc_exceptions_to_ruby do
    if consumer_tag
      @delegate.basic_consume(queue, auto_ack, consumer_tag, consumer)
    else
      @delegate.basic_consume(queue, auto_ack, consumer)
    end
  end
  self.register_consumer(tag, consumer)

  tag
end

#basic_get(queue, auto_ack) ⇒ Object


611
612
613
614
615
# File 'lib/march_hare/channel.rb', line 611

def basic_get(queue, auto_ack)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_get(queue, auto_ack)
  end
end

#basic_nack(delivery_tag, multiple = false, requeue = false) ⇒ NilClass

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.


767
768
769
770
771
# File 'lib/march_hare/channel.rb', line 767

def basic_nack(delivery_tag, multiple = false, requeue = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

#basic_publish(exchange, routing_key, mandatory, properties, body) ⇒ MarchHare::Channel

Publishes a message using basic.publish AMQP 0.9.1 method.

Options Hash (properties):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID


605
606
607
608
609
# File 'lib/march_hare/channel.rb', line 605

def basic_publish(exchange, routing_key, mandatory, properties, body)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body)
  end
end

#basic_qos(prefetch_count) ⇒ Object


631
632
633
634
635
636
637
638
# File 'lib/march_hare/channel.rb', line 631

def basic_qos(prefetch_count)
  r = converting_rjc_exceptions_to_ruby do
    @delegate.basic_qos(prefetch_count)
  end
  @prefetch_count = prefetch_count

  r
end

#basic_recover(requeue = true) ⇒ Object

Redeliver unacknowledged messages


777
778
779
780
781
# File 'lib/march_hare/channel.rb', line 777

def basic_recover(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover(requeue)
  end
end

#basic_recover_async(requeue = true) ⇒ Object

Redeliver unacknowledged messages


787
788
789
790
791
# File 'lib/march_hare/channel.rb', line 787

def basic_recover_async(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover_async(requeue)
  end
end

#basic_reject(delivery_tag, requeue) ⇒ NilClass

Rejects or requeues a message.

Examples:

Requeue a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

See Also:


738
739
740
741
742
# File 'lib/march_hare/channel.rb', line 738

def basic_reject(delivery_tag, requeue)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_reject(delivery_tag.to_i, requeue)
  end
end

#channel_numberInteger Also known as: id, number


148
149
150
# File 'lib/march_hare/channel.rb', line 148

def channel_number
  @delegate.channel_number
end

#close(code = 200, reason = "Goodbye") ⇒ Object

Closes the channel.

Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.


164
165
166
167
168
169
170
171
172
173
174
# File 'lib/march_hare/channel.rb', line 164

def close(code = 200, reason = "Goodbye")
  v = @delegate.close(code, reason)

  @consumers.each do |tag, consumer|
    consumer.gracefully_shut_down
  end

  @connection.unregister_channel(self)

  v
end

#confirm_selectNilClass

Enables publisher confirms on the channel.


800
801
802
803
804
805
# File 'lib/march_hare/channel.rb', line 800

def confirm_select
  converting_rjc_exceptions_to_ruby do
    @confirm_mode = true
    @delegate.confirm_select
  end
end

#default_exchangeObject

Provides access to the default exchange


402
403
404
# File 'lib/march_hare/channel.rb', line 402

def default_exchange
  @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct")
end

#direct(name, opts = {}) ⇒ MarchHare::Exchange

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:


350
351
352
353
354
355
356
# File 'lib/march_hare/channel.rb', line 350

def direct(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#exchange(name, options = {}) ⇒ MarchHare::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Options Hash (options):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or "x-consistent-hash"

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

See Also:


308
309
310
311
312
313
314
# File 'lib/march_hare/channel.rb', line 308

def exchange(name, options={})
  dx = Exchange.new(self, name, options).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#exchange_bind(destination, source, routing_key, arguments = nil) ⇒ Object

Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)


434
435
436
437
438
# File 'lib/march_hare/channel.rb', line 434

def exchange_bind(destination, source, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_bind(destination, source, routing_key, arguments)
  end
end

#exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil) ⇒ Object

Declares a echange using echange.declare AMQP 0.9.1 method.


417
418
419
420
421
# File 'lib/march_hare/channel.rb', line 417

def exchange_declare(name, type, durable = false, auto_delete = false, internal = false, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_declare(name, type, durable, auto_delete, internal, arguments)
  end
end

#exchange_unbind(destination, source, routing_key, arguments = nil) ⇒ Object

Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)


451
452
453
454
455
# File 'lib/march_hare/channel.rb', line 451

def exchange_unbind(destination, source, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.exchange_unbind(destination, source, routing_key, arguments)
  end
end

#fanout(name, opts = {}) ⇒ MarchHare::Exchange

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:


329
330
331
332
333
334
335
# File 'lib/march_hare/channel.rb', line 329

def fanout(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#headers(name, opts = {}) ⇒ MarchHare::Exchange

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

See Also:


392
393
394
395
396
397
398
# File 'lib/march_hare/channel.rb', line 392

def headers(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.


695
696
697
698
699
# File 'lib/march_hare/channel.rb', line 695

def nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

#next_publisher_seq_noObject


831
832
833
# File 'lib/march_hare/channel.rb', line 831

def next_publisher_seq_no
  @delegate.next_publisher_seq_no
end

#on_confirm(&block) ⇒ Object

Defines a publisher confirm handler


871
872
873
874
875
# File 'lib/march_hare/channel.rb', line 871

def on_confirm(&block)
  ch = BlockConfirmListener.from(block)
  self.add_confirm_listener(ch)
  @confirm_hooks << ch
end

#on_return(&block) ⇒ Object

Defines a returned message handler.


865
866
867
# File 'lib/march_hare/channel.rb', line 865

def on_return(&block)
  self.add_return_listener(BlockReturnListener.from(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.


179
180
181
182
183
184
185
186
# File 'lib/march_hare/channel.rb', line 179

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)

  @shutdown_hooks << sh
  @delegate.add_shutdown_listener(sh)

  sh
end

#open?Boolean


155
156
157
# File 'lib/march_hare/channel.rb', line 155

def open?
  @delegate.open?
end

#prefetchInteger


655
656
657
# File 'lib/march_hare/channel.rb', line 655

def prefetch
  @prefetch_count || 0
end

#prefetch=(n) ⇒ Object

Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages


650
651
652
# File 'lib/march_hare/channel.rb', line 650

def prefetch=(n)
  basic_qos(n)
end

#qos(options = {}) ⇒ Object


640
641
642
# File 'lib/march_hare/channel.rb', line 640

def qos(options={})
  basic_qos(options.fetch(:prefetch_count, 0))
end

#queue(name, options = {}) ⇒ MarchHare::Queue

Declares a queue or looks it up in the per-channel cache.

Options Hash (options):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:


475
476
477
478
479
480
481
# File 'lib/march_hare/channel.rb', line 475

def queue(name, options={})
  dq = Queue.new(self, name, options).tap do |q|
    q.declare!
  end

  self.register_queue(dq)
end

#queue_bind(queue, exchange, routing_key, arguments = nil) ⇒ Object

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method


542
543
544
545
546
# File 'lib/march_hare/channel.rb', line 542

def queue_bind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_bind(queue, exchange, routing_key, arguments)
  end
end

#queue_declare(name, durable, exclusive, auto_delete, arguments = {}) ⇒ Object

Declares a queue using queue.declare AMQP 0.9.1 method.


498
499
500
501
502
# File 'lib/march_hare/channel.rb', line 498

def queue_declare(name, durable, exclusive, auto_delete, arguments = {})
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments)
  end
end

#queue_declare_passive(name) ⇒ Object

Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.


510
511
512
513
514
# File 'lib/march_hare/channel.rb', line 510

def queue_declare_passive(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare_passive(name)
  end
end

#queue_delete(name, if_empty = false, if_unused = false) ⇒ Object

Deletes a queue using queue.delete AMQP 0.9.1 method


525
526
527
528
529
# File 'lib/march_hare/channel.rb', line 525

def queue_delete(name, if_empty = false, if_unused = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_delete(name, if_empty, if_unused)
  end
end

#queue_purge(name) ⇒ Object

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.


571
572
573
574
575
# File 'lib/march_hare/channel.rb', line 571

def queue_purge(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_purge(name)
  end
end

#queue_unbind(queue, exchange, routing_key, arguments = nil) ⇒ Object

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method


559
560
561
562
563
# File 'lib/march_hare/channel.rb', line 559

def queue_unbind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_unbind(queue, exchange, routing_key, arguments)
  end
end

#recover_confirm_modeObject

Recovers publisher confirms mode. Used by the Automatic Network Failure Recovery feature.


234
235
236
# File 'lib/march_hare/channel.rb', line 234

def recover_confirm_mode
  confirm_select if defined?(@confirm_mode) && @confirm_mode
end

#recover_consumersObject

Recovers consumers. Used by the Automatic Network Failure Recovery feature.


273
274
275
276
277
278
279
280
281
282
283
# File 'lib/march_hare/channel.rb', line 273

def recover_consumers
  @consumers.values.each do |c|
    begin
      self.unregister_consumer(c.consumer_tag)
      c.recover_from_network_failure
    rescue Exception => e
      # TODO: logger
      $stderr.puts "Caught exception when recovering consumer #{c.consumer_tag}"
    end
  end
end

#recover_exchangesObject

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.


247
248
249
250
251
252
253
254
255
256
# File 'lib/march_hare/channel.rb', line 247

def recover_exchanges
  @exchanges.values.each do |x|
    begin
      x.recover_from_network_failure
    rescue Exception => e
      # TODO: logger
      $stderr.puts "Caught exception when recovering exchange #{x.name}"
    end
  end
end

#recover_prefetch_settingObject

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.


228
229
230
# File 'lib/march_hare/channel.rb', line 228

def recover_prefetch_setting
  basic_qos(@prefetch_count) if defined?(@prefetch_count) && @prefetch_count
end

#recover_queuesObject

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.


260
261
262
263
264
265
266
267
268
269
# File 'lib/march_hare/channel.rb', line 260

def recover_queues
  @queues.values.each do |q|
    begin
      q.recover_from_network_failure
    rescue Exception => e
      # TODO: logger
      $stderr.puts "Caught exception when recovering queue #{q.name}"
    end
  end
end

#recover_tx_modeObject

Recovers transaction mode. Used by the Automatic Network Failure Recovery feature.


240
241
242
# File 'lib/march_hare/channel.rb', line 240

def recover_tx_mode
  tx_select if defined?(@tx_mode) && @tx_mode
end

#reject(delivery_tag, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.


680
681
682
683
684
# File 'lib/march_hare/channel.rb', line 680

def reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_reject(delivery_tag.to_i, requeue)
  end
end

#sessionMarchHare::Session Also known as: client, connection


141
142
143
# File 'lib/march_hare/channel.rb', line 141

def session
  @connection
end

#topic(name, opts = {}) ⇒ MarchHare::Exchange

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

See Also:


371
372
373
374
375
376
377
# File 'lib/march_hare/channel.rb', line 371

def topic(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

#tx_commitObject

Commits a transaction


850
851
852
853
854
# File 'lib/march_hare/channel.rb', line 850

def tx_commit
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_commit
  end
end

#tx_rollbackObject

Rolls back a transaction


857
858
859
860
861
# File 'lib/march_hare/channel.rb', line 857

def tx_rollback
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_rollback
  end
end

#tx_selectObject

Enables transactions on the channel


836
837
838
839
840
841
# File 'lib/march_hare/channel.rb', line 836

def tx_select
  converting_rjc_exceptions_to_ruby do
    @tx_mode = true
    @delegate.tx_select
  end
end

#using_publisher_confirms?Boolean Also known as: uses_publisher_confirms?


808
809
810
# File 'lib/march_hare/channel.rb', line 808

def using_publisher_confirms?
  !!@confirm_mode
end

#using_tx?Boolean Also known as: uses_tx?


844
845
846
# File 'lib/march_hare/channel.rb', line 844

def using_tx?
  !!@tx_mode
end

#wait_for_confirms(timeout = nil) ⇒ Boolean

Waits until all outstanding publisher confirms arrive.

Takes an optional timeout in milliseconds. Will raise an exception in timeout has occured.


821
822
823
824
825
826
827
828
829
# File 'lib/march_hare/channel.rb', line 821

def wait_for_confirms(timeout = nil)
  if timeout
    converting_rjc_exceptions_to_ruby do
      @delegate.wait_for_confirms(timeout)
    end
  else
    @delegate.wait_for_confirms
  end
end