Method: AMQP::Channel#initialize

Defined in:
lib/amqp/channel.rb

#initialize(connection = nil, id = nil, options = {}) {|channel, open_ok| ... } ⇒ Channel

Returns a new instance of Channel.

Examples:

Instantiating a channel for default connection (accessible as AMQP.connection)


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # channel is ready: set up your messaging flow by creating exchanges,
    # queues, binding them together and so on.
  end
end

Instantiating a channel for explicitly given connection


AMQP.connect do |connection|
  AMQP::Channel.new(connection) do |channel, open_ok|
    # ...
  end
end

Instantiating a channel with a :prefetch option


AMQP.connect do |connection|
  AMQP::Channel.new(connection, :prefetch => 5) do |channel, open_ok|
    # ...
  end
end

Parameters:

  • connection (AMQP::Session) (defaults to: nil)

    Connection to open this channel on. If not given, default AMQP connection (accessible via AMQP.connection) will be used.

  • id (Integer) (defaults to: nil)

    Channel id. Must not be greater than max channel id client and broker negotiated on during connection setup. Almost always the right thing to do is to let AMQP gem pick channel identifier for you.

  • options (Hash) (defaults to: {})

    A hash of options

Options Hash (options):

  • :prefetch (Boolean) — default: nil

    Specifies number of messages to prefetch. Channel-specific. See #prefetch.

  • :auto_recovery (Boolean) — default: nil

    Turns on automatic network failure recovery mode for this channel.

Yields:

  • (channel, open_ok)

    Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional.

Yield Parameters:

  • channel (Channel)

    Channel that is successfully open

  • open_ok (AMQP::Protocol::Channel::OpenOk)

    AMQP channel.open-ok) instance

See Also:



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/amqp/channel.rb', line 233

def initialize(connection = nil, id = nil, options = {}, &block)
  raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?

  @connection = connection || AMQP.connection || AMQP.start
  # this means 2nd argument is options
  if id.kind_of?(Hash)
    options = options.merge(id)
    id      = @connection.next_channel_id
  end

  super(@connection)

  @id        = id || @connection.next_channel_id
  @exchanges = Hash.new
  @queues    = Hash.new
  @consumers = Hash.new
  @options       = { :auto_recovery => @connection.auto_recovering? }.merge(options)
  @auto_recovery = (!!@options[:auto_recovery])

  # we must synchronize frameset delivery. MK.
  @mutex     = Mutex.new

  reset_state!

  # 65536 is here for cases when channel is opened without passing a callback in,
  # otherwise channel_mix would be nil and it causes a lot of needless headaches.
  # lets just have this default. MK.
  channel_max = if @connection.open?
                  @connection.channel_max || 65536
                else
                  65536
                end

  if channel_max != 0 && !(0..channel_max).include?(@id)
    raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{@id}")
  end

  # we need this deferrable to mimic what AMQP gem 0.7 does to enable
  # the following (pseudo-synchronous) style of programming some people use in their
  # existing codebases:
  #
  # connection = AMQP.connect
  # channel    = AMQP::Channel.new(connection)
  # queue      = AMQP::Queue.new(channel)
  #
  # ...
  #
  # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
  @channel_is_open_deferrable = AMQP::Deferrable.new

  @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]}

  # only send channel.open when connection is actually open. Makes it possible to
  # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
  @connection.on_connection do
    self.open do |ch, open_ok|
      @channel_is_open_deferrable.succeed

      if block
        case block.arity
        when 1 then block.call(ch)
        else block.call(ch, open_ok)
        end # case
      end # if

      self.prefetch(@options[:prefetch], false) if @options[:prefetch]
    end # self.open
  end # @connection.on_open
end