Class: AMQP::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client.rb,
lib/amqp/client/queue.rb,
lib/amqp/client/table.rb,
lib/amqp/client/errors.rb,
lib/amqp/client/channel.rb,
lib/amqp/client/message.rb,
lib/amqp/client/version.rb,
lib/amqp/client/exchange.rb,
lib/amqp/client/connection.rb,
lib/amqp/client/properties.rb,
lib/amqp/client/frame_bytes.rb

Overview

AMQP 0-9-1 Client

See Also:

Defined Under Namespace

Classes: Connection, Error, Exchange, Message, Properties, Queue, ReturnMessage

Constant Summary collapse

VERSION =

Version of the client library

"1.1.7"

Connect and disconnect collapse

High level objects collapse

Publish collapse

Queue actions collapse

Exchange actions collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", **options) ⇒ Client

Create a new Client object, this won’t establish a connection yet, use #connect or #start for that

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maxium number of channels the client will be allowed to have open. Maxium allowed is 65_536. The smallest of the client’s and the broker’s value will be used.



26
27
28
29
30
31
32
33
# File 'lib/amqp/client.rb', line 26

def initialize(uri = "", **options)
  @uri = uri
  @options = options
  @queues = {}
  @exchanges = {}
  @subscriptions = Set.new
  @connq = SizedQueue.new(1)
end

Instance Method Details

#bind(queue, exchange, binding_key, arguments: {}) ⇒ nil

Bind a queue to an exchange

Parameters:

  • queue (String)

    Name of the queue to bind

  • exchange (String)

    Name of the exchange to bind to

  • binding_key (String)

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (nil)


199
200
201
202
203
# File 'lib/amqp/client.rb', line 199

def bind(queue, exchange, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).queue_bind(queue, exchange, binding_key, arguments: arguments)
  end
end

#connect(read_loop_thread: true) ⇒ Connection

Establishes and returns a new AMQP connection

Examples:

connection = AMQP::Client.new("amqps://server.rmq.cloudamqp.com", connection_name: "My connection").connect

Returns:

See Also:



42
43
44
# File 'lib/amqp/client.rb', line 42

def connect(read_loop_thread: true)
  Connection.new(@uri, read_loop_thread: read_loop_thread, **@options)
end

#delete_exchange(name) ⇒ nil

Delete an exchange

Parameters:

  • name (String)

    Name of the exchange

Returns:

  • (nil)


269
270
271
272
273
274
275
# File 'lib/amqp/client.rb', line 269

def delete_exchange(name)
  with_connection do |conn|
    conn.channel(1).exchange_delete(name)
    @exchanges.delete(name)
    nil
  end
end

#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer

Delete a queue

Parameters:

  • name (String)

    Name of the queue

  • if_unused (Boolean) (defaults to: false)

    Only delete if the queue doesn’t have consumers, raises a ChannelClosed error otherwise

  • if_empty (Boolean) (defaults to: false)

    Only delete if the queue is empty, raises a ChannelClosed error otherwise

Returns:

  • (Integer)

    Number of messages in the queue when deleted



231
232
233
234
235
236
237
# File 'lib/amqp/client.rb', line 231

def delete_queue(name, if_unused: false, if_empty: false)
  with_connection do |conn|
    msgs = conn.channel(1).queue_delete(name, if_unused: if_unused, if_empty: if_empty)
    @queues.delete(name)
    msgs
  end
end

#exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange

Declare an exchange and return a high level Exchange object

Examples:

amqp = AMQP::Client.new.start
x = amqp.exchange("my.hash.exchange", "x-consistent-hash")
x.publish("body", "routing-key")

Returns:



124
125
126
127
128
129
130
131
132
# File 'lib/amqp/client.rb', line 124

def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {})
  @exchanges.fetch(name) do
    with_connection do |conn|
      conn.channel(1).exchange_declare(name, type, durable: durable, auto_delete: auto_delete,
                                                   internal: internal, arguments: arguments)
    end
    @exchanges[name] = Exchange.new(self, name)
  end
end

#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil

Bind an exchange to an exchange

Parameters:

  • destination (String)

    Name of the exchange to bind

  • source (String)

    Name of the exchange to bind to

  • binding_key (String)

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (nil)


248
249
250
251
252
# File 'lib/amqp/client.rb', line 248

def exchange_bind(destination, source, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).exchange_bind(destination, source, binding_key, arguments: arguments)
  end
end

#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil

Unbind an exchange from an exchange

Parameters:

  • destination (String)

    Name of the exchange to unbind

  • source (String)

    Name of the exchange to unbind from

  • binding_key (String)

    Binding key which the exchange is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


260
261
262
263
264
# File 'lib/amqp/client.rb', line 260

def exchange_unbind(destination, source, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).exchange_unbind(destination, source, binding_key, arguments: arguments)
  end
end

#publish(body, exchange, routing_key, **properties) ⇒ Boolean

Publish a (persistent) message and wait for confirmation

Parameters:

  • body (String)

    The body, can be a string or a byte array

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String)

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (Integer)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (Boolean)

    True if the message was successfully published



142
143
144
145
146
147
# File 'lib/amqp/client.rb', line 142

def publish(body, exchange, routing_key, **properties)
  with_connection do |conn|
    properties = { delivery_mode: 2 }.merge!(properties)
    conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties)
  end
end

#publish_and_forget(body, exchange, routing_key, **properties) ⇒ nil

Publish a (persistent) message but don’t wait for a confirmation

Parameters:

  • body (String)

    The body, can be a string or a byte array

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String)

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (Integer)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (nil)


154
155
156
157
158
159
# File 'lib/amqp/client.rb', line 154

def publish_and_forget(body, exchange, routing_key, **properties)
  with_connection do |conn|
    properties = { delivery_mode: 2 }.merge!(properties)
    conn.channel(1).basic_publish(body, exchange, routing_key, **properties)
  end
end

#purge(queue) ⇒ nil

Purge a queue

Parameters:

  • queue (String)

    Name of the queue

Returns:

  • (nil)


220
221
222
223
224
# File 'lib/amqp/client.rb', line 220

def purge(queue)
  with_connection do |conn|
    conn.channel(1).queue_purge(queue)
  end
end

#queue(name, durable: true, auto_delete: false, arguments: {}) ⇒ Queue

Declare a queue

Examples:

amqp = AMQP::Client.new.start
q = amqp.queue("foobar")
q.publish("body")

Parameters:

  • name (String)

    Name of the queue

  • durable (Boolean) (defaults to: true)

    If true the queue will survive broker restarts, messages in the queue will only survive if they are published as persistent

  • auto_delete (Boolean) (defaults to: false)

    If true the queue will be deleted when the last consumer stops consuming (it won’t be deleted until at least one consumer has consumed from it)

  • arguments (Hash) (defaults to: {})

    Custom arguments, such as queue-ttl etc.

Returns:

Raises:

  • (ArgumentError)


107
108
109
110
111
112
113
114
115
116
# File 'lib/amqp/client.rb', line 107

def queue(name, durable: true, auto_delete: false, arguments: {})
  raise ArgumentError, "Currently only supports named, durable queues" if name.empty?

  @queues.fetch(name) do
    with_connection do |conn|
      conn.channel(1).queue_declare(name, durable: durable, auto_delete: auto_delete, arguments: arguments)
    end
    @queues[name] = Queue.new(self, name)
  end
end

#startself

Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first

Examples:

amqp = AMQP::Client.new("amqps://server.rmq.cloudamqp.com")
amqp.start
amqp.queue("foobar")

Returns:

  • (self)


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/amqp/client.rb', line 52

def start
  @stopped = false
  Thread.new(connect(read_loop_thread: false)) do |conn|
    Thread.abort_on_exception = true # Raising an unhandled exception is a bug
    loop do
      break if @stopped

      conn ||= connect(read_loop_thread: false)
      Thread.new do
        # restore connection in another thread, read_loop have to run
        conn.channel(1) # reserve channel 1 for publishes
        @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk|
          ch = conn.channel
          ch.basic_qos(prefetch)
          ch.basic_consume(queue_name, no_ack: no_ack, worker_threads: wt, arguments: args, &blk)
        end
        @connq << conn
      end
      conn.read_loop # blocks until connection is closed, then reconnect
    rescue Error => e
      warn "AMQP-Client reconnect error: #{e.inspect}"
      sleep @options[:reconnect_interval] || 1
    ensure
      conn = nil
    end
  end
  self
end

#stopnil

Close the currently open connection

Returns:

  • (nil)


83
84
85
86
87
88
89
90
# File 'lib/amqp/client.rb', line 83

def stop
  return if @stopped

  @stopped = true
  conn = @connq.pop
  conn.close
  nil
end

#subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?

Consume messages from a queue

Parameters:

  • queue (String)

    Name of the queue to subscribe to

  • no_ack (Boolean) (defaults to: false)

    When false messages have to be manually acknowledged (or rejected) (default: false)

  • prefetch (Integer) (defaults to: 1)

    Specify how many messages to prefetch for consumers with no_ack is false (default: 1)

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages (default: 1)

  • arguments (Hash) (defaults to: {})

    Custom arguments to the consumer

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (Array<(String, Array<Thread>)>)

    Returns consumer_tag and an array of worker threads

  • (nil)

Raises:

  • (ArgumentError)


181
182
183
184
185
186
187
188
189
190
191
# File 'lib/amqp/client.rb', line 181

def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
  raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0

  @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk]

  with_connection do |conn|
    ch = conn.channel
    ch.basic_qos(prefetch)
    ch.basic_consume(queue, no_ack: no_ack, worker_threads: worker_threads, arguments: arguments, &blk)
  end
end

#unbind(queue, exchange, binding_key, arguments: {}) ⇒ nil

Unbind a queue from an exchange

Parameters:

  • queue (String)

    Name of the queue to unbind

  • exchange (String)

    Name of the exchange to unbind from

  • binding_key (String)

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


211
212
213
214
215
# File 'lib/amqp/client.rb', line 211

def unbind(queue, exchange, binding_key, arguments: {})
  with_connection do |conn|
    conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments: arguments)
  end
end

#wait_for_confirmsBoolean

Wait for unconfirmed publishes

Returns:

  • (Boolean)

    True if successful, false if any message negatively acknowledged



163
164
165
166
167
# File 'lib/amqp/client.rb', line 163

def wait_for_confirms
  with_connection do |conn|
    conn.channel(1).wait_for_confirms
  end
end