Class: AMQP::Channel
- Inherits:
-
Object
- Object
- AMQP::Channel
- Extended by:
- ProtocolMethodHandlers, RegisterEntityMixin
- Includes:
- Entity
- Defined in:
- lib/amqp/channel.rb
Overview
What are AMQP channels
To quote AMQP 0.9.1 specification:
AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.
Opening a channel
Channels are opened asynchronously. There are two ways to do it: using a callback or pseudo-synchronous mode.
Unless your application needs multiple channels, this approach is recommended. Alternatively, AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open, however, it can be used as if it was a synchronous, blocking method:
Even though in the example above channel isn’t immediately open, it is safe to declare exchanges using it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration and other operations on exchanges and queues. Library methods that rely on channel being open will be enqueued and executed in a FIFO manner when broker confirms channel opening. Note, however, that this “pseudo-synchronous mode” is easy to abuse and introduce race conditions AMQP gem cannot resolve for you. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.
Key methods
Key methods of Channel class are
refer to documentation for those methods for usage examples.
Channel provides a number of convenience methods that instantiate queues and exchanges of various types associated with this channel:
Error handling
It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error:
When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application needs.
Closing a channel
Channels are opened when objects is instantiated and closed using #close method when application no longer needs it.
RabbitMQ extensions.
AMQP gem supports several RabbitMQ extensions that extend Channel functionality. Learn more in VendorSpecificExtensions
Constant Summary collapse
- DEFAULT_REPLY_TEXT =
"Goodbye".freeze
- RECOVERY_EVENTS =
[:after_connection_interruption, :before_recovery, :after_recovery, :error].freeze
Constants included from Openable
Instance Attribute Summary collapse
-
#auto_recovery ⇒ Boolean
True if this channel is in automatic recovery mode.
-
#connection ⇒ AMQP::Connection
readonly
AMQP connection this channel belongs to.
-
#consumers_awaiting_cancel_ok ⇒ Object
readonly
Returns the value of attribute consumers_awaiting_cancel_ok.
-
#consumers_awaiting_consume_ok ⇒ Object
readonly
Returns the value of attribute consumers_awaiting_consume_ok.
-
#exchanges_awaiting_bind_ok ⇒ Object
readonly
Returns the value of attribute exchanges_awaiting_bind_ok.
-
#exchanges_awaiting_declare_ok ⇒ Object
readonly
Returns the value of attribute exchanges_awaiting_declare_ok.
-
#exchanges_awaiting_delete_ok ⇒ Object
readonly
Returns the value of attribute exchanges_awaiting_delete_ok.
-
#exchanges_awaiting_unbind_ok ⇒ Object
readonly
Returns the value of attribute exchanges_awaiting_unbind_ok.
-
#flow_is_active ⇒ Object
Returns the value of attribute flow_is_active.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#publisher_index ⇒ Integer
Publisher index is an index of the last message since the confirmations were activated, started with 0.
-
#queues_awaiting_bind_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_bind_ok.
-
#queues_awaiting_declare_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_declare_ok.
-
#queues_awaiting_delete_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_delete_ok.
-
#queues_awaiting_get_response ⇒ Object
readonly
Returns the value of attribute queues_awaiting_get_response.
-
#queues_awaiting_purge_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_purge_ok.
-
#queues_awaiting_unbind_ok ⇒ Object
readonly
Returns the value of attribute queues_awaiting_unbind_ok.
-
#status ⇒ Symbol
readonly
Status of this channel (one of: :opening, :closing, :open, :closed).
Attributes included from Entity
Declaring exchanges collapse
-
#default_exchange ⇒ Exchange
Returns exchange object with the same name as default (aka unnamed) exchange.
-
#direct(name = 'amq.direct', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a direct Exchange instance.
-
#fanout(name = 'amq.fanout', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a fanout Exchange instance.
-
#headers(name = 'amq.match', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a headers Exchange instance.
-
#topic(name = 'amq.topic', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a topic Exchange instance.
Declaring queues collapse
-
#queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) {|queue, declare_ok| ... } ⇒ Queue
Declares and returns a Queue instance associated with this channel.
-
#queue!(name, opts = {}, &block) ⇒ Queue
Same as #queue but when queue with the same name already exists in this channel object’s cache, this method will replace existing queue with a newly defined one.
Channel lifecycle collapse
-
#close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) ⇒ Object
Closes AMQP channel.
- #closing? ⇒ Boolean
-
#once_open(&block) ⇒ Object
(also: #once_opened)
Takes a block that will be deferred till the moment when channel is considered open (channel.open-ok is received from the broker).
-
#open(&block) ⇒ Object
(also: #reopen)
Opens AMQP channel.
-
#open? ⇒ Boolean
True if channel is not closed.
QoS and flow handling collapse
-
#flow(active = false, &block) ⇒ Object
Asks the peer to pause or restart the flow of content data sent to a consumer.
-
#flow_is_active? ⇒ Boolean
True if flow in this channel is active (messages will be delivered to consumers that use this channel).
-
#prefetch(count, global = false, &block) ⇒ Channel
Self.
-
#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object
Requests a specific quality of service.
Message acknowledgements collapse
-
#acknowledge(delivery_tag, multiple = false) ⇒ Object
Acknowledge one or all messages on the channel.
-
#recover(requeue = true, &block) ⇒ Channel
Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.
-
#reject(delivery_tag, requeue = true, multi = false) ⇒ Object
Reject a message with given delivery tag.
Transactions collapse
-
#tx_commit(&block) ⇒ Object
Commits AMQP transaction.
-
#tx_rollback(&block) ⇒ Object
Rolls AMQP transaction back.
-
#tx_select(&block) ⇒ Object
Sets the channel to use standard transactions.
Error handling collapse
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_error(&block) ⇒ Object
Defines a callback that will be executed when channel is closed after channel-level exception.
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed after AMQP connection has recovered after a network failure.
Publisher Confirms collapse
Instance Method Summary collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#auto_recovering? ⇒ Boolean
True if this channel uses automatic recovery mode.
-
#conn ⇒ Connection
AMQP connection this channel is part of.
- #consumers ⇒ Hash<String, Consumer>
-
#exchanges ⇒ Hash<Exchange>
Collection of exchanges that were declared on this channel.
-
#find_exchange(name) ⇒ AMQP::Exchange
Finds exchange in the exchanges cache on this channel by name.
-
#handle_basic_ack(method) ⇒ Object
Handler for Basic.Ack.
-
#handle_basic_nack(method) ⇒ Object
Handler for Basic.Nack.
-
#handle_select_ok(method) ⇒ Object
Handler for Confirm.Select-Ok.
-
#increment_publisher_index! ⇒ Object
This method is executed after publishing of each message via Exchage#publish.
-
#initialize(connection = nil, id = nil, options = {}) {|channel, open_ok| ... } ⇒ Channel
constructor
A new instance of Channel.
-
#on_ack(nowait = false) {|basick_ack| ... } ⇒ self
Turn on confirmations for this channel and, if given, register callback for basic.ack from the broker.
-
#on_nack(&block) ⇒ self
Register error callback for Basic.Nack.
-
#register_exchange(exchange) ⇒ Object
Implementation.
-
#reset_publisher_index! ⇒ Object
Resets publisher index to 0.
-
#reuse ⇒ Object
Can be used to recover channels from channel-level exceptions.
-
#synchronize(&block) ⇒ Object
Synchronizes given block using this channel’s mutex.
- #uses_publisher_confirmations? ⇒ Boolean
Methods included from RegisterEntityMixin
Methods included from ProtocolMethodHandlers
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Methods included from Openable
#closed!, #closed?, #closing!, #opened!, #opened?, #opening!, #opening?
Constructor Details
#initialize(connection = nil, id = nil, options = {}) {|channel, open_ok| ... } ⇒ Channel
Returns a new instance of Channel.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/amqp/channel.rb', line 233 def initialize(connection = nil, id = nil, = {}, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? @connection = connection || AMQP.connection || AMQP.start # this means 2nd argument is options if id.kind_of?(Hash) = .merge(id) id = @connection.next_channel_id end super(@connection) @id = id || @connection.next_channel_id @exchanges = Hash.new @queues = Hash.new @consumers = Hash.new @options = { :auto_recovery => @connection.auto_recovering? }.merge() @auto_recovery = (!!@options[:auto_recovery]) # we must synchronize frameset delivery. MK. @mutex = Mutex.new reset_state! # 65536 is here for cases when channel is opened without passing a callback in, # otherwise channel_mix would be nil and it causes a lot of needless headaches. # lets just have this default. MK. channel_max = if @connection.open? @connection.channel_max || 65536 else 65536 end if channel_max != 0 && !(0..channel_max).include?(@id) raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{@id}") end # we need this deferrable to mimic what AMQP gem 0.7 does to enable # the following (pseudo-synchronous) style of programming some people use in their # existing codebases: # # connection = AMQP.connect # channel = AMQP::Channel.new(connection) # queue = AMQP::Queue.new(channel) # # ... # # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK. @channel_is_open_deferrable = AMQP::Deferrable.new @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]} # only send channel.open when connection is actually open. Makes it possible to # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK. @connection.on_connection do self.open do |ch, open_ok| @channel_is_open_deferrable.succeed if block case block.arity when 1 then block.call(ch) else block.call(ch, open_ok) end # case end # if self.prefetch(@options[:prefetch], false) if @options[:prefetch] end # self.open end # @connection.on_open end |
Instance Attribute Details
#auto_recovery ⇒ Boolean
Returns true if this channel is in automatic recovery mode.
305 306 307 |
# File 'lib/amqp/channel.rb', line 305 def auto_recovery @auto_recovery end |
#connection ⇒ AMQP::Connection (readonly)
AMQP connection this channel belongs to.
164 165 166 |
# File 'lib/amqp/channel.rb', line 164 def connection @connection end |
#consumers_awaiting_cancel_ok ⇒ Object (readonly)
Returns the value of attribute consumers_awaiting_cancel_ok.
177 178 179 |
# File 'lib/amqp/channel.rb', line 177 def consumers_awaiting_cancel_ok @consumers_awaiting_cancel_ok end |
#consumers_awaiting_consume_ok ⇒ Object (readonly)
Returns the value of attribute consumers_awaiting_consume_ok.
177 178 179 |
# File 'lib/amqp/channel.rb', line 177 def consumers_awaiting_consume_ok @consumers_awaiting_consume_ok end |
#exchanges_awaiting_bind_ok ⇒ Object (readonly)
Returns the value of attribute exchanges_awaiting_bind_ok.
175 176 177 |
# File 'lib/amqp/channel.rb', line 175 def exchanges_awaiting_bind_ok @exchanges_awaiting_bind_ok end |
#exchanges_awaiting_declare_ok ⇒ Object (readonly)
Returns the value of attribute exchanges_awaiting_declare_ok.
175 176 177 |
# File 'lib/amqp/channel.rb', line 175 def exchanges_awaiting_declare_ok @exchanges_awaiting_declare_ok end |
#exchanges_awaiting_delete_ok ⇒ Object (readonly)
Returns the value of attribute exchanges_awaiting_delete_ok.
175 176 177 |
# File 'lib/amqp/channel.rb', line 175 def exchanges_awaiting_delete_ok @exchanges_awaiting_delete_ok end |
#exchanges_awaiting_unbind_ok ⇒ Object (readonly)
Returns the value of attribute exchanges_awaiting_unbind_ok.
175 176 177 |
# File 'lib/amqp/channel.rb', line 175 def exchanges_awaiting_unbind_ok @exchanges_awaiting_unbind_ok end |
#flow_is_active ⇒ Object
Returns the value of attribute flow_is_active.
179 180 181 |
# File 'lib/amqp/channel.rb', line 179 def flow_is_active @flow_is_active end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
173 174 175 |
# File 'lib/amqp/channel.rb', line 173 def id @id end |
#publisher_index ⇒ Integer
Publisher index is an index of the last message since the confirmations were activated, started with 0. It’s incremented by 1 every time a message is published. This is done on both client and server, hence this acknowledged messages can be matched via its delivery-tag.
1285 1286 1287 |
# File 'lib/amqp/channel.rb', line 1285 def publisher_index @publisher_index ||= 0 end |
#queues_awaiting_bind_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_bind_ok.
176 177 178 |
# File 'lib/amqp/channel.rb', line 176 def queues_awaiting_bind_ok @queues_awaiting_bind_ok end |
#queues_awaiting_declare_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_declare_ok.
176 177 178 |
# File 'lib/amqp/channel.rb', line 176 def queues_awaiting_declare_ok @queues_awaiting_declare_ok end |
#queues_awaiting_delete_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_delete_ok.
176 177 178 |
# File 'lib/amqp/channel.rb', line 176 def queues_awaiting_delete_ok @queues_awaiting_delete_ok end |
#queues_awaiting_get_response ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_get_response.
176 177 178 |
# File 'lib/amqp/channel.rb', line 176 def queues_awaiting_get_response @queues_awaiting_get_response end |
#queues_awaiting_purge_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_purge_ok.
176 177 178 |
# File 'lib/amqp/channel.rb', line 176 def queues_awaiting_purge_ok @queues_awaiting_purge_ok end |
#queues_awaiting_unbind_ok ⇒ Object (readonly)
Returns the value of attribute queues_awaiting_unbind_ok.
176 177 178 |
# File 'lib/amqp/channel.rb', line 176 def queues_awaiting_unbind_ok @queues_awaiting_unbind_ok end |
#status ⇒ Symbol (readonly)
Status of this channel (one of: :opening, :closing, :open, :closed)
169 170 171 |
# File 'lib/amqp/channel.rb', line 169 def status @status end |
Instance Method Details
#acknowledge(delivery_tag, multiple = false) ⇒ Object
Acknowledge one or all messages on the channel.
1021 1022 1023 1024 1025 |
# File 'lib/amqp/channel.rb', line 1021 def acknowledge(delivery_tag, multiple = false) @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple)) self end |
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/amqp/channel.rb', line 316 def auto_recover return unless auto_recovering? @channel_is_open_deferrable.fail @channel_is_open_deferrable = AMQP::Deferrable.new self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end |
#auto_recovering? ⇒ Boolean
Returns true if this channel uses automatic recovery mode.
308 309 310 |
# File 'lib/amqp/channel.rb', line 308 def auto_recovering? @auto_recovery end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection has recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
1243 1244 1245 |
# File 'lib/amqp/channel.rb', line 1243 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) ⇒ Object
Closes AMQP channel.
950 951 952 953 954 955 956 957 |
# File 'lib/amqp/channel.rb', line 950 def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) self.once_open do self.status = :closing @connection.send_frame(AMQ::Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id)) self.redefine_callback :close, &block end end |
#closing? ⇒ Boolean
943 944 945 |
# File 'lib/amqp/channel.rb', line 943 def closing? self.status == :closing end |
#confirm_select(nowait = false, &block) ⇒ Object
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 |
# File 'lib/amqp/channel.rb', line 1119 def confirm_select(nowait = false, &block) self.once_open do if nowait && block raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense" end @uses_publisher_confirmations = true reset_publisher_index! self.redefine_callback(:confirm_select, &block) unless nowait self.redefine_callback(:after_publish) do increment_publisher_index! end @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, nowait)) self end end |
#conn ⇒ Connection
AMQP connection this channel is part of
165 166 167 |
# File 'lib/amqp/channel.rb', line 165 def connection @connection end |
#consumers ⇒ Hash<String, Consumer>
1183 1184 1185 |
# File 'lib/amqp/channel.rb', line 1183 def consumers @consumers end |
#default_exchange ⇒ Exchange
Returns exchange object with the same name as default (aka unnamed) exchange. Default exchange is a direct exchange and automatically routes messages to queues when routing key matches queue name exactly. This feature is known as “automatic binding” (of queues to default exchange).
Use default exchange when you want to route messages directly to specific queues (queue names are known, you don’t mind this kind of coupling between applications).
491 492 493 |
# File 'lib/amqp/channel.rb', line 491 def default_exchange @default_exchange ||= Exchange.default(self) end |
#direct(name = 'amq.direct', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a direct Exchange instance.
Learn more about direct exchanges in Exchange class documentation.
438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/amqp/channel.rb', line 438 def direct(name = 'amq.direct', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:direct, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :direct, name, opts, &block)) end end |
#exchanges ⇒ Hash<Exchange>
Returns Collection of exchanges that were declared on this channel.
1189 1190 1191 |
# File 'lib/amqp/channel.rb', line 1189 def exchanges @exchanges end |
#fanout(name = 'amq.fanout', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a fanout Exchange instance.
Learn more about fanout exchanges in Exchange class documentation.
541 542 543 544 545 546 547 548 549 550 551 552 |
# File 'lib/amqp/channel.rb', line 541 def fanout(name = 'amq.fanout', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:fanout, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :fanout, name, opts, &block)) end end |
#find_exchange(name) ⇒ AMQP::Exchange
Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if it was previously instantiated on this channel.
1394 1395 1396 |
# File 'lib/amqp/channel.rb', line 1394 def find_exchange(name) @exchanges[name] end |
#flow(active = false, &block) ⇒ Object
Asks the peer to pause or restart the flow of content data sent to a consumer. This is a simple flowcontrol mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. It does not affect contents returned to Queue#get callers.
976 977 978 979 980 981 |
# File 'lib/amqp/channel.rb', line 976 def flow(active = false, &block) @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active)) self.redefine_callback :flow, &block self end |
#flow_is_active? ⇒ Boolean
Returns True if flow in this channel is active (messages will be delivered to consumers that use this channel).
986 987 988 |
# File 'lib/amqp/channel.rb', line 986 def flow_is_active? @flow_is_active end |
#handle_basic_ack(method) ⇒ Object
Handler for Basic.Ack. By default, it just executes hook specified via the #confirm method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Ack).
1361 1362 1363 |
# File 'lib/amqp/channel.rb', line 1361 def handle_basic_ack(method) self.exec_callback(:ack, method) end |
#handle_basic_nack(method) ⇒ Object
Handler for Basic.Nack. By default, it just executes hook specified via the #confirm_failed method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Basic::Nack).
1372 1373 1374 |
# File 'lib/amqp/channel.rb', line 1372 def handle_basic_nack(method) self.exec_callback(:nack, method) end |
#handle_select_ok(method) ⇒ Object
Handler for Confirm.Select-Ok. By default, it just executes hook specified via the #confirmations method with a single argument, a protocol method class instance (an instance of AMQ::Protocol::Confirm::SelectOk) and then it deletes the callback, since Confirm.Select is supposed to be sent just once.
1351 1352 1353 |
# File 'lib/amqp/channel.rb', line 1351 def handle_select_ok(method) self.exec_callback_once(:confirm_select, method) end |
#headers(name = 'amq.match', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a headers Exchange instance.
Learn more about headers exchanges in Exchange class documentation.
754 755 756 757 758 759 760 761 762 763 764 765 |
# File 'lib/amqp/channel.rb', line 754 def headers(name = 'amq.match', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:headers, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :headers, name, opts, &block)) end end |
#increment_publisher_index! ⇒ Object
This method is executed after publishing of each message via Exchage#publish. Currently it just increments publisher index by 1, so messages can be actually matched.
1302 1303 1304 |
# File 'lib/amqp/channel.rb', line 1302 def increment_publisher_index! @publisher_index += 1 end |
#on_ack(nowait = false) {|basick_ack| ... } ⇒ self
Turn on confirmations for this channel and, if given, register callback for basic.ack from the broker.
1323 1324 1325 1326 1327 |
# File 'lib/amqp/channel.rb', line 1323 def on_ack(nowait = false, &block) self.define_callback(:ack, &block) if block self end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
1232 1233 1234 |
# File 'lib/amqp/channel.rb', line 1232 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_error(&block) ⇒ Object
Defines a callback that will be executed when channel is closed after channel-level exception.
1110 1111 1112 |
# File 'lib/amqp/channel.rb', line 1110 def on_error(&block) self.define_callback(:error, &block) end |
#on_nack(&block) ⇒ self
Register error callback for Basic.Nack. It’s called when message(s) is rejected.
1334 1335 1336 1337 1338 |
# File 'lib/amqp/channel.rb', line 1334 def on_nack(&block) self.define_callback(:nack, &block) if block self end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed after AMQP connection has recovered after a network failure. Only one callback can be defined (the one defined last replaces previously added ones).
1261 1262 1263 |
# File 'lib/amqp/channel.rb', line 1261 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#once_open(&block) ⇒ Object Also known as: once_opened
Takes a block that will be deferred till the moment when channel is considered open (channel.open-ok is received from the broker). If you need to delay an operation till the moment channel is open, this method is what you are looking for.
Multiple callbacks are supported. If when this moment is called, channel is already open, block is executed immediately.
932 933 934 935 936 937 938 |
# File 'lib/amqp/channel.rb', line 932 def once_open(&block) @channel_is_open_deferrable.callback do # guards against cases when deferred operations # don't complete before the channel is closed block.call if open? end end |
#open(&block) ⇒ Object Also known as: reopen
Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.
Opens AMQP channel.
908 909 910 911 912 913 914 |
# File 'lib/amqp/channel.rb', line 908 def open(&block) @connection.send_frame(AMQ::Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING)) @connection.channels[@id] = self self.status = :opening self.redefine_callback :open, &block end |
#open? ⇒ Boolean
Returns true if channel is not closed.
920 921 922 |
# File 'lib/amqp/channel.rb', line 920 def open? self.status == :opened || self.status == :opening end |
#prefetch(count, global = false, &block) ⇒ Channel
Returns self.
998 999 1000 1001 1002 1003 1004 1005 1006 1007 |
# File 'lib/amqp/channel.rb', line 998 def prefetch(count, global = false, &block) self.once_open do # RabbitMQ does not support prefetch_size. self.qos(0, count, global, &block) @options[:prefetch] = count end self end |
#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object
RabbitMQ as of 2.3.1 does not support prefetch_size.
Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection.
1214 1215 1216 1217 1218 1219 |
# File 'lib/amqp/channel.rb', line 1214 def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global)) self.redefine_callback :qos, &block self end |
#queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}) {|queue, declare_ok| ... } ⇒ Queue
Declares and returns a Queue instance associated with this channel. See Queue class documentation for more information about queues.
To make broker generate queue name for you (a classic example is exclusive queues that are only used for a short period of time), pass empty string as name value. Then queue will get it’s name as soon as broker’s response (queue.declare-ok) arrives. Note that in this case, block is required.
Like for exchanges, queue names starting with ‘amq.’ cannot be modified and should not be used by applications.
849 850 851 852 853 854 855 856 857 858 859 860 861 862 |
# File 'lib/amqp/channel.rb', line 849 def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? if name && !name.empty? && (queue = find_queue(name)) extended_opts = Queue.(name, opts, block) validate_parameters_match!(queue, extended_opts, :queue) block.call(queue) if block queue else self.queue!(name, opts, &block) end end |
#queue!(name, opts = {}, &block) ⇒ Queue
872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 |
# File 'lib/amqp/channel.rb', line 872 def queue!(name, opts = {}, &block) queue = if block.nil? Queue.new(self, name, opts) else shim = Proc.new { |q, method| if block.arity == 1 block.call(q) else queue = find_queue(method.queue) block.call(queue, method.consumer_count, method.) end } Queue.new(self, name, opts, &shim) end register_queue(queue) end |
#recover(requeue = true, &block) ⇒ Channel
RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
Notifies AMQ broker that consumer has recovered and unacknowledged messages need to be redelivered.
1052 1053 1054 1055 1056 1057 |
# File 'lib/amqp/channel.rb', line 1052 def recover(requeue = true, &block) @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue)) self.redefine_callback :recover, &block self end |
#register_exchange(exchange) ⇒ Object
Implementation
1382 1383 1384 1385 1386 |
# File 'lib/amqp/channel.rb', line 1382 def register_exchange(exchange) raise ArgumentError, "argument is nil!" if exchange.nil? @exchanges[exchange.name] = exchange end |
#reject(delivery_tag, requeue = true, multi = false) ⇒ Object
Reject a message with given delivery tag.
1033 1034 1035 1036 1037 1038 1039 1040 1041 |
# File 'lib/amqp/channel.rb', line 1033 def reject(delivery_tag, requeue = true, multi = false) if multi @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(self.id, delivery_tag, multi, requeue)) else @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue)) end self end |
#reset_publisher_index! ⇒ Object
Resets publisher index to 0
1292 1293 1294 |
# File 'lib/amqp/channel.rb', line 1292 def reset_publisher_index! @publisher_index = 0 end |
#reuse ⇒ Object
Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens itself with this new id, releasing the old id after the new one is allocated.
This includes recovery of known exchanges, queues and bindings, exactly the same way as when the client recovers from a network failure.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/amqp/channel.rb', line 341 def reuse old_id = @id # must release after we allocate a new id, otherwise we will end up # with the same value. MK. @id = @connection.next_channel_id @connection.release_channel_id(old_id) @channel_is_open_deferrable.fail @channel_is_open_deferrable = AMQP::Deferrable.new self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end |
#synchronize(&block) ⇒ Object
Synchronizes given block using this channel’s mutex.
1203 1204 1205 |
# File 'lib/amqp/channel.rb', line 1203 def synchronize(&block) @mutex.synchronize(&block) end |
#topic(name = 'amq.topic', opts = {}, &block) ⇒ Exchange
Defines, intializes and returns a topic Exchange instance.
Learn more about topic exchanges in Exchange class documentation.
652 653 654 655 656 657 658 659 660 661 662 663 |
# File 'lib/amqp/channel.rb', line 652 def topic(name = 'amq.topic', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.(:topic, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :topic, name, opts, &block)) end end |
#tx_commit(&block) ⇒ Object
Commits AMQP transaction.
1080 1081 1082 1083 1084 1085 |
# File 'lib/amqp/channel.rb', line 1080 def tx_commit(&block) @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id)) self.redefine_callback :tx_commit, &block self end |
#tx_rollback(&block) ⇒ Object
Rolls AMQP transaction back.
1090 1091 1092 1093 1094 1095 |
# File 'lib/amqp/channel.rb', line 1090 def tx_rollback(&block) @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id)) self.redefine_callback :tx_rollback, &block self end |
#tx_select(&block) ⇒ Object
Sets the channel to use standard transactions. One must use this method at least once on a channel before using #tx_tommit or tx_rollback methods.
1070 1071 1072 1073 1074 1075 |
# File 'lib/amqp/channel.rb', line 1070 def tx_select(&block) @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id)) self.redefine_callback :tx_select, &block self end |
#uses_publisher_confirmations? ⇒ Boolean
1307 1308 1309 |
# File 'lib/amqp/channel.rb', line 1307 def uses_publisher_confirmations? @uses_publisher_confirmations end |