Class: AMQP::Client
- Inherits:
-
Object
- Object
- AMQP::Client
- Defined in:
- lib/amqp/client.rb,
lib/amqp/client/queue.rb,
lib/amqp/client/table.rb,
lib/amqp/client/errors.rb,
lib/amqp/client/channel.rb,
lib/amqp/client/message.rb,
lib/amqp/client/version.rb,
lib/amqp/client/exchange.rb,
lib/amqp/client/connection.rb,
lib/amqp/client/properties.rb,
lib/amqp/client/frame_bytes.rb
Overview
AMQP 0-9-1 Client
Defined Under Namespace
Classes: Connection, Error, Exchange, Message, Properties, Queue, ReturnMessage
Constant Summary collapse
- VERSION =
Version of the client library
"1.1.7"
Connect and disconnect collapse
-
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection.
-
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first.
-
#stop ⇒ nil
Close the currently open connection.
High level objects collapse
-
#exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object.
-
#queue(name, durable: true, auto_delete: false, arguments: {}) ⇒ Queue
Declare a queue.
Publish collapse
-
#publish(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a (persistent) message and wait for confirmation.
-
#publish_and_forget(body, exchange, routing_key, **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation.
-
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes.
Queue actions collapse
-
#bind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue.
-
#purge(queue) ⇒ nil
Purge a queue.
-
#subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue.
-
#unbind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Exchange actions collapse
-
#delete_exchange(name) ⇒ nil
Delete an exchange.
-
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to an exchange.
-
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from an exchange.
Instance Method Summary collapse
- #initialize(uri = "", **options) ⇒ Client constructor
Constructor Details
#initialize(uri = "", **options) ⇒ Client
26 27 28 29 30 31 32 33 |
# File 'lib/amqp/client.rb', line 26 def initialize(uri = "", **) @uri = uri @options = @queues = {} @exchanges = {} @subscriptions = Set.new @connq = SizedQueue.new(1) end |
Instance Method Details
#bind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange
199 200 201 202 203 |
# File 'lib/amqp/client.rb', line 199 def bind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_bind(queue, exchange, binding_key, arguments: arguments) end end |
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection
42 43 44 |
# File 'lib/amqp/client.rb', line 42 def connect(read_loop_thread: true) Connection.new(@uri, read_loop_thread: read_loop_thread, **@options) end |
#delete_exchange(name) ⇒ nil
Delete an exchange
269 270 271 272 273 274 275 |
# File 'lib/amqp/client.rb', line 269 def delete_exchange(name) with_connection do |conn| conn.channel(1).exchange_delete(name) @exchanges.delete(name) nil end end |
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue
231 232 233 234 235 236 237 |
# File 'lib/amqp/client.rb', line 231 def delete_queue(name, if_unused: false, if_empty: false) with_connection do |conn| msgs = conn.channel(1).queue_delete(name, if_unused: if_unused, if_empty: if_empty) @queues.delete(name) msgs end end |
#exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object
124 125 126 127 128 129 130 131 132 |
# File 'lib/amqp/client.rb', line 124 def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete, internal: internal, arguments: arguments) end @exchanges[name] = Exchange.new(self, name) end end |
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to an exchange
248 249 250 251 252 |
# File 'lib/amqp/client.rb', line 248 def exchange_bind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_bind(destination, source, binding_key, arguments: arguments) end end |
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from an exchange
260 261 262 263 264 |
# File 'lib/amqp/client.rb', line 260 def exchange_unbind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_unbind(destination, source, binding_key, arguments: arguments) end end |
#publish(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a (persistent) message and wait for confirmation
142 143 144 145 146 147 |
# File 'lib/amqp/client.rb', line 142 def publish(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties) end end |
#publish_and_forget(body, exchange, routing_key, **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation
154 155 156 157 158 159 |
# File 'lib/amqp/client.rb', line 154 def publish_and_forget(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish(body, exchange, routing_key, **properties) end end |
#purge(queue) ⇒ nil
Purge a queue
220 221 222 223 224 |
# File 'lib/amqp/client.rb', line 220 def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end |
#queue(name, durable: true, auto_delete: false, arguments: {}) ⇒ Queue
Declare a queue
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/amqp/client.rb', line 107 def queue(name, durable: true, auto_delete: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| conn.channel(1).queue_declare(name, durable: durable, auto_delete: auto_delete, arguments: arguments) end @queues[name] = Queue.new(self, name) end end |
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/amqp/client.rb', line 52 def start @stopped = false Thread.new(connect(read_loop_thread: false)) do |conn| Thread.abort_on_exception = true # Raising an unhandled exception is a bug loop do break if @stopped conn ||= connect(read_loop_thread: false) Thread.new do # restore connection in another thread, read_loop have to run conn.channel(1) # reserve channel 1 for publishes @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk) end @connq << conn end conn.read_loop # blocks until connection is closed, then reconnect rescue Error => e warn "AMQP-Client reconnect error: #{e.inspect}" sleep @options[:reconnect_interval] || 1 ensure conn = nil end end self end |
#stop ⇒ nil
Close the currently open connection
83 84 85 86 87 88 89 90 |
# File 'lib/amqp/client.rb', line 83 def stop return if @stopped @stopped = true conn = @connq.pop conn.close nil end |
#subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue
181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/amqp/client.rb', line 181 def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0 @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk] with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments, &blk) end end |
#unbind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange
211 212 213 214 215 |
# File 'lib/amqp/client.rb', line 211 def unbind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments: arguments) end end |
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes
163 164 165 166 167 |
# File 'lib/amqp/client.rb', line 163 def wait_for_confirms with_connection do |conn| conn.channel(1).wait_for_confirms end end |