Class: AMQP::Channel

Inherits:
AMQ::Client::Channel
  • Object
show all
Defined in:
lib/amqp/channel.rb

Overview

What are AMQP channels

To quote AMQP 0.9.1 specification:

AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

Opening a channel

Channels are opened asynchronously. There are two ways to do it: using a callback or pseudo-synchronous mode.

Unless your application needs multiple channels, this approach is recommended. Alternatively, AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open, however, it can be used as if it was a synchronous, blocking method:

Even though in the example above channel isn’t immediately open, it is safe to declare exchanges using it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration and other operations on exchanges and queues. Library methods that rely on channel being open will be enqueued and executed in a FIFO manner when broker confirms channel opening. Note, however, that this “pseudo-synchronous mode” is easy to abuse and introduce race conditions AMQP gem cannot resolve for you. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.

Key methods

Key methods of Channel class are

refer to documentation for those methods for usage examples.

Channel provides a number of convenience methods that instantiate queues and exchanges of various types associated with this channel:

Error handling

It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error:

When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application needs.

Closing a channel

Channels are opened when objects is instantiated and closed using #close method when application no longer needs it.

RabbitMQ extensions.

AMQP gem supports several RabbitMQ extensions taht extend Channel functionality. Learn more in VendorSpecificExtensions

Examples:

Opening a channel with a callback

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  AMQP::Channel.new(client) do |channel, open_ok|
    # when this block is executed, channel is open and ready for use
  end
end

Instantiating a channel that will be open eventually

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  channel  = AMQP::Channel.new(client)
  exchange = channel.default_exchange

  # ...
end

Queue declaration with incompatible attributes results in a channel-level exception

AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672") do |connection, open_ok|
  AMQP::Channel.new do |channel, open_ok|
    puts "Channel ##{channel.id} is now open!"

    channel.on_error do |ch, close|
      puts "Handling channel-level exception"

      connection.close {
        EM.stop { exit }
      }
    end

    EventMachine.add_timer(0.4) do
      # these two definitions result in a race condition. For sake of this example,
      # however, it does not matter. Whatever definition succeeds first, 2nd one will
      # cause a channel-level exception (because attributes are not identical)
      AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue|
        puts "#{queue.name} is ready to go"
      end

      AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue|
        puts "#{queue.name} is ready to go"
      end
    end
  end
end

Closing a channel your application no longer needs

# this assumes EventMachine reactor is running
AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
  AMQP::Channel.new(client) do |channel, open_ok|
    channel.close do |close_ok|
      # when this block is executed, channel is successfully closed
    end
  end
end

See Also:

Instance Attribute Summary (collapse)

Declaring exchanges (collapse)

Declaring queues (collapse)

Channel lifecycle (collapse)

QoS and flow handling (collapse)

Message acknowledgements (collapse)

Transactions (collapse)

Error handling (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Channel) initialize(connection = nil, id = self.class.next_channel_id, options = {}) {|channel, open_ok| ... }

A new instance of Channel

Examples:

Instantiating a channel for default connection (accessible as AMQP.connection)

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # channel is ready: set up your messaging flow by creating exchanges,
    # queues, binding them together and so on.
  end
end

Instantiating a channel for explicitly given connection

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # ...
  end
end

Instantiating a channel with a :prefetch option

AMQP.connect do |connection|
  AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :prefetch => 5) do |channel, open_ok|
    # ...
  end
end

Parameters:

  • connection (AMQP::Session) (defaults to: nil)

    Connection to open this channel on. If not given, default AMQP connection (accessible via AMQP.connection) will be used.

  • id (Integer) (defaults to: self.class.next_channel_id)

    Channel id. Must not be greater than max channel id client and broker negotiated on during connection setup. Almost always the right thing to do is to let AMQP gem pick channel identifier for you. If you want to get next channel id, use next_channel_id (it is thread-safe).

  • options (Hash) (defaults to: {})

    A hash of options

Options Hash (options):

  • :prefetch (Boolean) — default: nil

    Specifies number of messages to prefetch. Channel-specific. See #prefetch.

  • :auto_recovery (Boolean) — default: nil

    Turns on automatic network failure recovery mode for this channel.

Yields:

  • (channel, open_ok)

    Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional.

Yield Parameters:

  • channel (Channel)

    Channel that is successfully open

  • open_ok (AMQP::Protocol::Channel::OpenOk)

    AMQP channel.open-ok) instance

See Also:



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/amqp/channel.rb', line 204

def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
  raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?

  @connection = connection || AMQP.connection || AMQP.start
  # this means 2nd argument is options
  if id.kind_of?(Hash)
    options = options.merge(id)
    id      = self.class.next_channel_id
  end

  super(@connection, id, options)

  @rpcs                       = Hash.new
  # we need this deferrable to mimic what AMQP gem 0.7 does to enable
  # the following (pseudo-synchronous) style of programming some people use in their
  # existing codebases:
  #
  # connection = AMQP.connect
  # channel    = AMQP::Channel.new(connection)
  # queue      = AMQP::Queue.new(channel)
  #
  # ...
  #
  # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
  @channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new

  # only send channel.open when connection is actually open. Makes it possible to
  # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
  @connection.on_connection do
    self.open do |ch, open_ok|
      @channel_is_open_deferrable.succeed

      if block
        case block.arity
        when 1 then block.call(ch)
        else block.call(ch, open_ok)
        end # case
      end # if

      self.prefetch(options[:prefetch], false) if options[:prefetch]
    end # self.open
  end # @connection.on_open
end

Instance Attribute Details

- (Boolean) auto_recovery

True if this channel is in automatic recovery mode

Returns:

  • (Boolean)

    true if this channel is in automatic recovery mode

See Also:



250
251
252
# File 'lib/amqp/channel.rb', line 250

def auto_recovery
  @auto_recovery
end

- (Connection) connection (readonly) Also known as: conn

AMQP connection this channel is part of

Returns:

  • (Connection)


152
153
154
# File 'lib/amqp/channel.rb', line 152

def connection
  @connection
end

- (Symbol) status (readonly)

Status of this channel (one of: :opening, :closing, :open, :closed)

Returns:

  • (Symbol)


157
158
159
# File 'lib/amqp/channel.rb', line 157

def status
  @status
end

Class Method Details

+ (Object) method_missing(meth, *args, &blk)

Deprecated.

Allows for calls to all MQ instance methods. This implicitly calls AMQP::Channel.new so that a new channel is allocated for subsequent operations.



1264
1265
1266
# File 'lib/amqp/channel.rb', line 1264

def self.method_missing(meth, *args, &blk)
  self.default.__send__(meth, *args, &blk)
end

+ (Fixnum) next_channel_id

Returns next available channel id. This method is thread safe.

Returns:

  • (Fixnum)

See Also:



1185
1186
1187
1188
1189
1190
1191
# File 'lib/amqp/channel.rb', line 1185

def self.next_channel_id
  channel_id_mutex.synchronize do
    self.initialize_channel_id_allocator

    @int_allocator.allocate
  end
end

+ (Object) on_error(&block)

Deprecated.

Defines a global callback to be run on channel-level exception across all channels. Consider using Channel#on_error instead. This method is here for sake of backwards compatibility with 0.6.x and 0.7.x releases.

See Also:



1080
1081
1082
# File 'lib/amqp/channel.rb', line 1080

def self.on_error(&block)
  self.error(&block)
end

+ (Object) release_channel_id(i)

Releases previously allocated channel id. This method is thread safe.

Parameters:

  • Channel (Fixnum)

    id to release

See Also:



1199
1200
1201
1202
1203
1204
1205
# File 'lib/amqp/channel.rb', line 1199

def self.release_channel_id(i)
  channel_id_mutex.synchronize do
    self.initialize_channel_id_allocator

    @int_allocator.release(i)
  end
end

+ (Object) reset_channel_id_allocator

Resets channel allocator. This method is thread safe.



1211
1212
1213
1214
1215
1216
1217
# File 'lib/amqp/channel.rb', line 1211

def self.reset_channel_id_allocator
  channel_id_mutex.synchronize do
    initialize_channel_id_allocator

    @int_allocator.reset
  end
end

Instance Method Details

- (Object) acknowledge(delivery_tag, multiple = false)

Acknowledge one or all messages on the channel.



1000
1001
1002
# File 'lib/amqp/channel.rb', line 1000

def acknowledge(delivery_tag, multiple = false)
  super(delivery_tag, multiple)
end

- (Object) auto_recover

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/amqp/channel.rb', line 261

def auto_recover
  return unless auto_recovering?

  self.open do
    @channel_is_open_deferrable.succeed

    # re-establish prefetch
    self.prefetch(@options[:prefetch], false) if @options[:prefetch]

    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end

- (Boolean) auto_recovering?

True if this channel uses automatic recovery mode

Returns:

  • (Boolean)

    true if this channel uses automatic recovery mode



253
254
255
# File 'lib/amqp/channel.rb', line 253

def auto_recovering?
  @auto_recovery
end

- (Object) close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)

Closes AMQP channel.



937
938
939
940
941
# File 'lib/amqp/channel.rb', line 937

def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
  r = super(reply_code, reply_text, class_id, method_id, &block)

  r
end

- (Exchange) default_exchange

Returns exchange object with the same name as default (aka unnamed) exchange. Default exchange is a direct exchange and automatically routes messages to queues when routing key matches queue name exactly. This feature is known as “automatic binding” (of queues to default exchange).

Use default exchange when you want to route messages directly to specific queues (queue names are known, you don’t mind this kind of coupling between applications).

Examples:

Using default exchange to publish messages to queues with known names

AMQP.start(:host => 'localhost') do |connection|
  ch        = AMQP::Channel.new(connection)

  queue1    = ch.queue("queue1").subscribe do |payload|
    puts "[#{queue1.name}] => #{payload}"
  end
  queue2    = ch.queue("queue2").subscribe do |payload|
    puts "[#{queue2.name}] => #{payload}"
  end
  queue3    = ch.queue("queue3").subscribe do |payload|
    puts "[#{queue3.name}] => #{payload}"
  end
  queues    = [queue1, queue2, queue3]

  # Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec.
  exchange = AMQP::Exchange.default
  EM.add_periodic_timer(1) do
    q = queues.sample

    exchange.publish "Some payload from #{Time.now.to_i}", :routing_key => q.name
  end
end

Returns:

See Also:



431
432
433
# File 'lib/amqp/channel.rb', line 431

def default_exchange
  @default_exchange ||= Exchange.default(self)
end

- (Exchange) direct(name = 'amq.direct', opts = {}, &block)

Defines, intializes and returns a direct Exchange instance.

Learn more about direct exchanges in Exchange class documentation.

Examples:

Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style)

# an exchange application A will be using to publish updates
# to some search index
exchange = channel.direct("index.updates")

# In the same (or different) process declare a queue that broker will
# generate name for, bind it to aforementioned exchange using method chaining
queue    = channel.queue("").
                   # queue will be receiving messages that were published with
                   # :routing_key attribute value of "search.index.updates"
                   bind(exchange, :routing_key => "search.index.updates").
                   # register a callback that will be run when messages arrive
                   subscribe { |header, message| puts("Received #{message}") }

# now publish a new document contents for indexing,
# message will be delivered to the queue we declared and bound on the line above
exchange.publish(document.content, :routing_key => "search.index.updates")

Instantiating a direct exchange using #direct with a callback

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel|
    channel.direct("email.replies_listener") do |exchange, declare_ok|
      # by now exchange is ready and waiting
    end
  end
end

Parameters:

  • name (String) (defaults to: 'amq.direct')

    (amq.direct) Exchange name.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Returns:

Raises:

  • (AMQP::Error)

    Raised when exchange is redeclared with parameters different from original declaration.

  • (AMQP::Error)

    Raised when exchange is declared with :passive => true and the exchange does not exist.

See Also:



378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/amqp/channel.rb', line 378

def direct(name = 'amq.direct', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:direct, name, opts, block)

    validate_parameters_match!(exchange, extended_opts)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :direct, name, opts, &block))
  end
end

- (Exchange) fanout(name = 'amq.fanout', opts = {}, &block)

Defines, intializes and returns a fanout Exchange instance.

Learn more about fanout exchanges in Exchange class documentation.

Examples:

Using fanout exchange to deliver messages to multiple consumers

# open up a channel
# declare a fanout exchange
# declare 3 queues, binds them
# publish a message

Parameters:

  • name (String) (defaults to: 'amq.fanout')

    (amq.fanout) Exchange name.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Returns:

Raises:

  • (AMQP::Error)

    Raised when exchange is redeclared with parameters different from original declaration.

  • (AMQP::Error)

    Raised when exchange is declared with :passive => true and the exchange does not exist.

See Also:



486
487
488
489
490
491
492
493
494
495
496
497
# File 'lib/amqp/channel.rb', line 486

def fanout(name = 'amq.fanout', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:fanout, name, opts, block)

    validate_parameters_match!(exchange, extended_opts)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :fanout, name, opts, &block))
  end
end

- (Object) flow(active = false, &block)

Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flow­control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.

Parameters:

  • Desired (Boolean)

    flow state.

See Also:



960
961
962
# File 'lib/amqp/channel.rb', line 960

def flow(active = false, &block)
  super(active, &block)
end

- (Boolean) flow_is_active?

True if flow in this channel is active (messages will be delivered to consumers that use this channel).

Returns:

  • (Boolean)

    True if flow in this channel is active (messages will be delivered to consumers that use this channel).



967
968
969
# File 'lib/amqp/channel.rb', line 967

def flow_is_active?
  @flow_is_active
end

- (Exchange) headers(name = 'amq.match', opts = {}, &block)

Defines, intializes and returns a headers Exchange instance.

Learn more about headers exchanges in Exchange class documentation.

Examples:

Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores)

puts "=> Headers routing example"
puts
AMQP.start do |connection|
  channel   = AMQP::Channel.new(connection)
  channel.on_error do |ch, channel_close|
    puts "A channel-level exception: #{channel_close.inspect}"
  end

  exchange = channel.headers("amq.match", :durable => true)

  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |, payload|
    puts "[linux/x64] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |, payload|
    puts "[linux/x32] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |, payload|
    puts "[linux] Got a message: #{payload}"
  end
  channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |, payload|
    puts "[macosx|octocore] Got a message: #{payload}"
  end

  EventMachine.add_timer(0.5) do
    exchange.publish "For linux/x64",   :headers => { :arch => "x64", :os => 'linux' }
    exchange.publish "For linux/x32",   :headers => { :arch => "x32", :os => 'linux' }
    exchange.publish "For linux",       :headers => { :os => 'linux'  }
    exchange.publish "For OS X",        :headers => { :os => 'macosx' }
    exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' }
    exchange.publish "For ocotocore",   :headers => { :cores => 8  }
  end

  show_stopper = Proc.new do
    $stdout.puts "Stopping..."
    connection.close {
      EventMachine.stop { exit }
    }
  end

  Signal.trap "INT", show_stopper
  EventMachine.add_timer(2, show_stopper)
end

Parameters:

  • name (String) (defaults to: 'amq.match')

    (amq.match) Exchange name.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Returns:

Raises:

  • (AMQP::Error)

    Raised when exchange is redeclared with parameters different from original declaration.

  • (AMQP::Error)

    Raised when exchange is declared with :passive => true and the exchange does not exist.

See Also:



708
709
710
711
712
713
714
715
716
717
718
719
# File 'lib/amqp/channel.rb', line 708

def headers(name = 'amq.match', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:headers, name, opts, block)

    validate_parameters_match!(exchange, extended_opts)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :headers, name, opts, &block))
  end
end

- (Object) on_error(&block)

Defines a callback that will be executed when channel is closed after channel-level exception.



1069
1070
1071
# File 'lib/amqp/channel.rb', line 1069

def on_error(&block)
  super(&block)
end

- (Object) once_open(&block) Also known as: once_opened

Takes a block that will be deferred till the moment when channel is considered open (channel.open-ok is received from the broker). If you need to delay an operation till the moment channel is open, this method is what you are looking for.

Multiple callbacks are supported. If when this moment is called, channel is already open, block is executed immediately.



929
930
931
# File 'lib/amqp/channel.rb', line 929

def once_open(&block)
  @channel_is_open_deferrable.callback(&block)
end

- (Object) open(&block)

Note:

Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.

Opens AMQP channel.



911
912
913
# File 'lib/amqp/channel.rb', line 911

def open(&block)
  super(&block)
end

- (Boolean) open?

True if channel is not closed.

Returns:

  • (Boolean)

    true if channel is not closed.



917
918
919
# File 'lib/amqp/channel.rb', line 917

def open?
  self.status == :opened || self.status == :opening
end

- (Channel) prefetch(count, global = false, &block)

Self

Parameters:

  • Message (Fixnum)

    count

  • global (Boolean) (defaults to: false)

    (false)

Returns:



979
980
981
982
983
984
985
986
# File 'lib/amqp/channel.rb', line 979

def prefetch(count, global = false, &block)
  self.once_open do
    # RabbitMQ as of 2.3.1 does not support prefetch_size.
    self.qos(0, count, global, &block)
  end

  self
end

- (Queue) queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) {|queue, declare_ok| ... }

Declares and returns a Queue instance associated with this channel. See Queue class documentation for more information about queues.

To make broker generate queue name for you (a classic example is exclusive queues that are only used for a short period of time), pass empty string as name value. Then queue will get it’s name as soon as broker’s response (queue.declare-ok) arrives. Note that in this case, block is required.

Like for exchanges, queue names starting with ‘amq.’ cannot be modified and should not be used by applications.

Examples:

Declaring a queue in a mail delivery app using Channel#queue without a block

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |ch|
    # message producers will be able to send messages to this queue
    # using direct exchange and routing key = "mail.delivery"
    queue = ch.queue("mail.delivery", :durable => true)
    queue.subscribe do |headers, payload|
      # ...
    end
  end
end

Declaring a server-named exclusive queue that receives all messages related to events, using a block.

AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |ch|
    # message producers will be able to send messages to this queue
    # using amq.topic exchange with routing keys that begin with "events"
    ch.queue("", :exclusive => true) do |queue|
      queue.bind(ch.exchange("amq.topic"), :routing_key => "events.#").subscribe do |headers, payload|
        # ...
      end
    end
  end
end

Parameters:

  • name (String) (defaults to: AMQ::Protocol::EMPTY_STRING)

    Queue name. If you want a server-named queue, you can omit the name (note that in this case, using block is mandatory). See Queue class documentation for discussion of queue lifecycles and when use of server-named queues is optimal.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM). Any remaining messages in the queue will be purged when the queue is deleted regardless of the message’s persistence setting.

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :exclusive (Boolean) — default: false

    Exclusive queues may only be used by a single connection. Exclusivity also implies that queue is automatically deleted when connection is closed. Only one consumer is allowed to remove messages from exclusive queue.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Yields:

  • (queue, declare_ok)

    Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional.

Yield Parameters:

  • queue (Queue)

    Queue that is successfully declared and is ready to be used.

  • declare_ok (AMQP::Protocol::Queue::DeclareOk)

    AMQP queue.declare-ok) instance.

Returns:

Raises:

  • (AMQP::Error)

    Raised when queue is redeclared with parameters different from original declaration.

  • (AMQP::Error)

    Raised when queue is declared with :passive => true and the queue does not exist.

  • (AMQP::Error)

    Raised when queue is declared with :exclusive => true and queue with that name already exist.

See Also:



809
810
811
812
813
814
815
816
817
818
819
820
821
822
# File 'lib/amqp/channel.rb', line 809

def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
  raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?

  if name && !name.empty? && (queue = find_queue(name))
    extended_opts = Queue.add_default_options(name, opts, block)

    validate_parameters_match!(queue, extended_opts)

    block.call(queue) if block
    queue
  else
    self.queue!(name, opts, &block)
  end
end

- (Queue) queue!(name, opts = {}, &block)

Same as #queue but when queue with the same name already exists in this channel object’s cache, this method will replace existing queue with a newly defined one. Consider using #queue instead.

Returns:

See Also:



832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
# File 'lib/amqp/channel.rb', line 832

def queue!(name, opts = {}, &block)
  queue = if block.nil?
            Queue.new(self, name, opts)
          else
            shim = Proc.new { |q, method|
      if block.arity == 1
        block.call(q)
      else
        queue = find_queue(method.queue)
        block.call(queue, method.consumer_count, method.message_count)
      end
    }
            Queue.new(self, name, opts, &shim)
          end

  register_queue(queue)
end

- (Channel) recover(requeue = true, &block)

Note:

RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.

Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.



1023
1024
1025
# File 'lib/amqp/channel.rb', line 1023

def recover(requeue = true, &block)
  super(requeue, &block)
end

- (Object) reject(delivery_tag, requeue = true)

Reject a message with given delivery tag.



1010
1011
1012
# File 'lib/amqp/channel.rb', line 1010

def reject(delivery_tag, requeue = true)
  super(delivery_tag, requeue)
end

- (Object) reuse

Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens itself with this new id, releasing the old id after the new one is allocated.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/amqp/channel.rb', line 280

def reuse
  old_id = @id
  # must release after we allocate a new id, otherwise we will end up
  # with the same value. MK.
  @id    = self.class.next_channel_id
  self.class.release_channel_id(old_id)

  self.open do
    @channel_is_open_deferrable.succeed

    # re-establish prefetch
    self.prefetch(@options[:prefetch], false) if @options[:prefetch]

    # exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
    @exchanges.each { |name, e| e.auto_recover }
    @queues.each    { |name, q| q.auto_recover }
  end
end

- (RPC) rpc(name, obj = nil)

Instantiates and returns an RPC instance associated with this channel.

The optional object may be a class name, module name or object instance. When given a class or module name, the object is instantiated during this setup. The passed queue is automatically subscribed to so it passes all messages (and their arguments) to the object.

Marshalling and unmarshalling the objects is handled internally. This marshalling is subject to the same restrictions as defined in the [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard library.

When the optional object is not passed, the returned rpc reference is used to send messages and arguments to the queue. See class="caps">RPC#method_missing which does all of the heavy lifting with the proxy. Some client elsewhere must call this method with the optional block so that there is a valid destination. Failure to do so will just enqueue marshalled messages that are never consumed.

Examples:

Use of RPC

# TODO

Parameters:

  • Queue (String, Queue)

    to be used by RPC server.

Returns:

  • (RPC)


888
889
890
# File 'lib/amqp/channel.rb', line 888

def rpc(name, obj = nil)
  RPC.new(self, name, obj)
end

- (Object) rpcs

Returns a hash of all rpc proxy objects.

Most of the time, this method is not called by application code.



899
900
901
# File 'lib/amqp/channel.rb', line 899

def rpcs
  @rpcs.values
end

- (Exchange) topic(name = 'amq.topic', opts = {}, &block)

Defines, intializes and returns a topic Exchange instance.

Learn more about topic exchanges in Exchange class documentation.

Examples:

Using topic exchange to deliver relevant news updates

AMQP.connect do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.topic("pub/sub")

  # Subscribers.
  channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload|
    puts "A new dev post: '#{payload}'"
  end
  channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload|
    puts "A new post about Ruby: '#{payload}'"
  end

  # Let's publish some data.
  exchange.publish "Ruby post",     :routing_key => "technology.dev.ruby"
  exchange.publish "Erlang post",   :routing_key => "technology.dev.erlang"
  exchange.publish "Sinatra post",  :routing_key => "technology.web.ruby"
  exchange.publish "Jewelery post", :routing_key => "jewelery.ruby"
end

Using topic exchange to deliver geographically-relevant data

AMQP.connect do |connection|
  channel  = AMQP::Channel.new(connection)
  exchange = channel.topic("pub/sub")

  # Subscribers.
  channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
    puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
    puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
    puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
    puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
    puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
  end
  channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
    puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
  end

  exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
    publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
    publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
    publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
    publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
    publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
    publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
    publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
    publish("Rome update",             :routing_key => "europe.italy.roma").
    publish("Paris update",            :routing_key => "europe.france.paris")
end

Parameters:

  • name (String) (defaults to: 'amq.topic')

    (amq.topic) Exchange name.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :passive (Boolean) — default: false

    If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable (Boolean) — default: false

    If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges and their bindings are recreated upon a server restart (information about them is persisted). Non-durable (transient) exchanges do not survive if/when a server restarts (information about them is stored exclusively in RAM).

  • :auto_delete (Boolean) — default: false

    If set, the exchange is deleted when all queues have finished using it. The server waits for a short period of time before determining the exchange is unused to give time to the client code to bind a queue to it.

  • :internal (Boolean) — default: default false

    If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications. This is a RabbitMQ-specific extension.

  • :nowait (Boolean) — default: true

    If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

Returns:

Raises:

  • (AMQP::Error)

    Raised when exchange is redeclared with parameters different from original declaration.

  • (AMQP::Error)

    Raised when exchange is declared with :passive => true and the exchange does not exist.

See Also:



602
603
604
605
606
607
608
609
610
611
612
613
# File 'lib/amqp/channel.rb', line 602

def topic(name = 'amq.topic', opts = {}, &block)
  if exchange = find_exchange(name)
    extended_opts = Exchange.add_default_options(:topic, name, opts, block)

    validate_parameters_match!(exchange, extended_opts)

    block.call(exchange) if block
    exchange
  else
    register_exchange(Exchange.new(self, :topic, name, opts, &block))
  end
end

- (Object) tx_commit(&block)

Commits AMQP transaction.



1045
1046
1047
# File 'lib/amqp/channel.rb', line 1045

def tx_commit(&block)
  super(&block)
end

- (Object) tx_rollback(&block)

Rolls AMQP transaction back.



1052
1053
1054
# File 'lib/amqp/channel.rb', line 1052

def tx_rollback(&block)
  super(&block)
end

- (Object) tx_select(&block)

Sets the channel to use standard transactions. One must use this method at least once on a channel before using #tx_tommit or tx_rollback methods.



1038
1039
1040
# File 'lib/amqp/channel.rb', line 1038

def tx_select(&block)
  super(&block)
end