Class: Isimud::BunnyClient
Overview
Interface for Bunny RabbitMQ client
Constant Summary collapse
- DEFAULT_URL =
'amqp://guest:guest@localhost'
- CHANNEL_KEY =
:'isimud.bunny_client.channel'
Instance Attribute Summary collapse
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#bind(queue_name, exchange_name, *routing_keys) {|payload| ... } ⇒ Bunny::Consumer
Convenience method that finds or creates a named queue, binds to an exchange, and subscribes to messages.
-
#channel ⇒ Bunny::Channel
Open a new, thread-specific AMQP connection channel, or return the current channel for this thread if it exists and is currently open.
-
#close ⇒ Object
Close the AMQP connection and clear it from the instance.
-
#connected? ⇒ Boolean?
Determine if a Bunny connection is currently established to the AMQP server.
-
#connection ⇒ Bunny::Session
(also: #connect)
Establish a connection to the AMQP server, or return the current connection if one already exists.
-
#create_queue(queue_name, exchange_name, options = {}) ⇒ Bunny::Queue
Find or create a named queue and bind it to the specified exchange.
-
#delete_queue(queue_name) ⇒ AMQ::Protocol::Queue::DeleteOk
Permanently delete the queue from the AMQP server.
-
#find_queue(queue_name, options = {durable: true}) ⇒ Object
Look up a queue by name, or create it if it does not already exist.
-
#initialize(_url = nil, _bunny_options = {}) ⇒ BunnyClient
constructor
Initialize a new BunnyClient instance.
-
#publish(exchange, routing_key, payload, options = {}) ⇒ Object
Publish a message to the specified exchange, which is declared as a durable, topic exchange.
-
#reconnect ⇒ Bunny::Session
Close and reopen the AMQP connection.
-
#reset ⇒ Object
Reset this client by closing all channels for the connection.
-
#subscribe(queue, options = {}) {|payload| ... } ⇒ Object
Subscribe to messages on the Bunny queue.
Methods inherited from Client
#on_exception, #run_exception_handlers
Methods included from Logging
Constructor Details
#initialize(_url = nil, _bunny_options = {}) ⇒ BunnyClient
Initialize a new BunnyClient instance. Note that a connection is not established until any other method is called
18 19 20 21 22 23 24 |
# File 'lib/isimud/bunny_client.rb', line 18 def initialize(_url = nil, = {}) log "Isimud::BunnyClient.initialize: options = #{.inspect}" @url = _url || DEFAULT_URL @url.symbolize_keys! if @url.respond_to?(:symbolize_keys!) @bunny_options = .symbolize_keys @bunny_options[:logger] = Isimud.logger end |
Instance Attribute Details
#url ⇒ Object (readonly)
Returns the value of attribute url.
11 12 13 |
# File 'lib/isimud/bunny_client.rb', line 11 def url @url end |
Instance Method Details
#bind(queue_name, exchange_name, *routing_keys) {|payload| ... } ⇒ Bunny::Consumer
Convenience method that finds or creates a named queue, binds to an exchange, and subscribes to messages. If a block is provided, it will be called by the consumer each time a message is received.
35 36 37 38 39 40 |
# File 'lib/isimud/bunny_client.rb', line 35 def bind(queue_name, exchange_name, *routing_keys, &block) queue = create_queue(queue_name, exchange_name, queue_options: {durable: true}, routing_keys: routing_keys) subscribe(queue, &block) if block_given? end |
#channel ⇒ Bunny::Channel
Open a new, thread-specific AMQP connection channel, or return the current channel for this thread if it exists
and is currently open. New channels are created with publisher confirms enabled. Messages will be prefetched
according to Isimud.prefetch_count when declared.
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/isimud/bunny_client.rb', line 114 def channel if (channel = Thread.current[CHANNEL_KEY]).try(:open?) channel else new_channel = connection.channel new_channel.confirm_select new_channel.prefetch(Isimud.prefetch_count) if Isimud.prefetch_count Thread.current[CHANNEL_KEY] = new_channel end end |
#close ⇒ Object
Close the AMQP connection and clear it from the instance.
139 140 141 142 143 |
# File 'lib/isimud/bunny_client.rb', line 139 def close connection.close ensure @connection = nil end |
#connected? ⇒ Boolean?
Determine if a Bunny connection is currently established to the AMQP server. but is closed or closing, or nil if no connection has been established.
133 134 135 |
# File 'lib/isimud/bunny_client.rb', line 133 def connected? @connection && @connection.open? end |
#connection ⇒ Bunny::Session Also known as: connect
Establish a connection to the AMQP server, or return the current connection if one already exists
102 103 104 |
# File 'lib/isimud/bunny_client.rb', line 102 def connection @connection ||= ::Bunny.new(url, @bunny_options).tap(&:start) end |
#create_queue(queue_name, exchange_name, options = {}) ⇒ Bunny::Queue
Find or create a named queue and bind it to the specified exchange
52 53 54 55 56 57 58 59 |
# File 'lib/isimud/bunny_client.rb', line 52 def create_queue(queue_name, exchange_name, = {}) = [:queue_options] || {durable: true} routing_keys = [:routing_keys] || [] log "Isimud::BunnyClient: create_queue #{queue_name}: queue_options=#{.inspect}" queue = find_queue(queue_name, ) bind_routing_keys(queue, exchange_name, routing_keys) if routing_keys.any? queue end |
#delete_queue(queue_name) ⇒ AMQ::Protocol::Queue::DeleteOk
Permanently delete the queue from the AMQP server. Any messages present in the queue will be discarded.
96 97 98 |
# File 'lib/isimud/bunny_client.rb', line 96 def delete_queue(queue_name) channel.queue_delete(queue_name) end |
#find_queue(queue_name, options = {durable: true}) ⇒ Object
Look up a queue by name, or create it if it does not already exist.
167 168 169 |
# File 'lib/isimud/bunny_client.rb', line 167 def find_queue(queue_name, = {durable: true}) channel.queue(queue_name, ) end |
#publish(exchange, routing_key, payload, options = {}) ⇒ Object
Publish a message to the specified exchange, which is declared as a durable, topic exchange. Note that message
is always persisted.
154 155 156 157 |
# File 'lib/isimud/bunny_client.rb', line 154 def publish(exchange, routing_key, payload, = {}) log "Isimud::BunnyClient#publish: exchange=#{exchange} routing_key=#{routing_key}", :debug channel.topic(exchange, durable: true).publish(payload, .merge(routing_key: routing_key, persistent: true)) end |
#reconnect ⇒ Bunny::Session
Close and reopen the AMQP connection
161 162 163 164 |
# File 'lib/isimud/bunny_client.rb', line 161 def reconnect close connect end |
#reset ⇒ Object
Reset this client by closing all channels for the connection.
126 127 128 |
# File 'lib/isimud/bunny_client.rb', line 126 def reset connection.close_all_channels end |
#subscribe(queue, options = {}) {|payload| ... } ⇒ Object
Subscribe to messages on the Bunny queue. The provided block will be called each time a message is received.
The message will be acknowledged and deleted from the queue unless an exception is raised from the block.
In the case that an uncaught exception is raised, the message is rejected, and any declared exception handlers
will be called.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/isimud/bunny_client.rb', line 69 def subscribe(queue, = {}, &block) queue.subscribe(.merge(manual_ack: true)) do |delivery_info, properties, payload| current_channel = delivery_info.channel begin log "Isimud: queue #{queue.name} received #{properties[:message_id]} routing_key: #{delivery_info.routing_key}", :debug Thread.current['isimud_queue_name'] = queue.name Thread.current['isimud_delivery_info'] = delivery_info Thread.current['isimud_properties'] = properties block.call(payload) if current_channel.open? log "Isimud: queue #{queue.name} finished with #{properties[:message_id]}, acknowledging", :debug current_channel.ack(delivery_info.delivery_tag) else log "Isimud: queue #{queue.name} unable to acknowledge #{properties[:message_id]}", :warn end rescue => e log("Isimud: queue #{queue.name} error processing #{properties[:message_id]} payload #{payload.inspect}: #{e.class.name} #{e.}\n #{e.backtrace.join("\n ")}", :warn) retry_status = run_exception_handlers(e) log "Isimud: rejecting #{properties[:message_id]} requeue=#{retry_status}", :warn current_channel.open? && current_channel.reject(delivery_info.delivery_tag, retry_status) end end end |