Class: Isimud::BunnyClient

Inherits:
Client
  • Object
show all
Defined in:
lib/isimud/bunny_client.rb

Overview

Interface for Bunny RabbitMQ client

Constant Summary collapse

DEFAULT_URL =
'amqp://guest:guest@localhost'
CHANNEL_KEY =
:'isimud.bunny_client.channel'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Client

#on_exception, #run_exception_handlers

Methods included from Logging

#log, #logger

Constructor Details

#initialize(_url = nil, _bunny_options = {}) ⇒ BunnyClient

Initialize a new BunnyClient instance. Note that a connection is not established until any other method is called

Parameters:

  • _url (String, Hash) (defaults to: nil)

    Server URL or options hash

  • _bunny_options (Hash) (defaults to: {})

    optional Bunny connection options

See Also:

  • for connection options


18
19
20
21
22
23
24
# File 'lib/isimud/bunny_client.rb', line 18

def initialize(_url = nil, _bunny_options = {})
  log "Isimud::BunnyClient.initialize: options = #{_bunny_options.inspect}"
  @url = _url || DEFAULT_URL
  @url.symbolize_keys! if @url.respond_to?(:symbolize_keys!)
  @bunny_options = _bunny_options.symbolize_keys
  @bunny_options[:logger] = Isimud.logger
end

Instance Attribute Details

#urlObject (readonly)

Returns the value of attribute url.



11
12
13
# File 'lib/isimud/bunny_client.rb', line 11

def url
  @url
end

Instance Method Details

#bind(queue_name, exchange_name, *routing_keys) {|payload| ... } ⇒ Bunny::Consumer

Convenience method that finds or creates a named queue, binds to an exchange, and subscribes to messages. If a block is provided, it will be called by the consumer each time a message is received.

Parameters:

  • queue_name (String)

    name of the queue

  • exchange_name (String)

    name of the AMQP exchange. Note that existing exchanges must be declared as Topic exchanges; otherwise, an error will occur

  • routing_keys (Array<String>)

    list of routing keys to be bound to the queue for the specified exchange.

Yield Parameters:

  • payload (String)

    message text

Returns:

  • (Bunny::Consumer)

    Bunny consumer interface



35
36
37
38
39
40
# File 'lib/isimud/bunny_client.rb', line 35

def bind(queue_name, exchange_name, *routing_keys, &block)
  queue = create_queue(queue_name, exchange_name,
                       queue_options: {durable: true},
                       routing_keys:  routing_keys)
  subscribe(queue, &block) if block_given?
end

#channelBunny::Channel

Open a new, thread-specific AMQP connection channel, or return the current channel for this thread if it exists

and is currently open. New channels are created with publisher confirms enabled. Messages will be prefetched
according to Isimud.prefetch_count when declared.

Returns:

  • (Bunny::Channel)

    channel instance.



114
115
116
117
118
119
120
121
122
123
# File 'lib/isimud/bunny_client.rb', line 114

def channel
  if (channel = Thread.current[CHANNEL_KEY]).try(:open?)
    channel
  else
    new_channel = connection.channel
    new_channel.confirm_select
    new_channel.prefetch(Isimud.prefetch_count) if Isimud.prefetch_count
    Thread.current[CHANNEL_KEY] = new_channel
  end
end

#closeObject

Close the AMQP connection and clear it from the instance.

Returns:

  • nil



139
140
141
142
143
# File 'lib/isimud/bunny_client.rb', line 139

def close
  connection.close
ensure
  @connection = nil
end

#connected?Boolean?

Determine if a Bunny connection is currently established to the AMQP server. but is closed or closing, or nil if no connection has been established.

Returns:

  • (Boolean, nil)

    true if a connection was established and is active or starting, false if a connection exists



133
134
135
# File 'lib/isimud/bunny_client.rb', line 133

def connected?
  @connection && @connection.open?
end

#connectionBunny::Session Also known as: connect

Establish a connection to the AMQP server, or return the current connection if one already exists

Returns:

  • (Bunny::Session)


102
103
104
# File 'lib/isimud/bunny_client.rb', line 102

def connection
  @connection ||= ::Bunny.new(url, @bunny_options).tap(&:start)
end

#create_queue(queue_name, exchange_name, options = {}) ⇒ Bunny::Queue

Find or create a named queue and bind it to the specified exchange

Parameters:

  • queue_name (String)

    name of the queue

  • exchange_name (String)

    name of the AMQP exchange. Note that pre-existing exchanges must be declared as Topic exchanges; otherwise, an error will occur

  • options (Hash) (defaults to: {})

    queue declaration options

Options Hash (options):

  • :queue_options (Boolean) — default: {durable: true}

    queue declaration options – @see Bunny::Channel#queue

  • :routing_keys (Array<String>) — default: []

    routing keys to be bound to the queue. Use “*” to match any 1 word in a route segment. Use “#” to match 0 or more words in a segment.

Returns:

  • (Bunny::Queue)

    Bunny queue



52
53
54
55
56
57
58
59
# File 'lib/isimud/bunny_client.rb', line 52

def create_queue(queue_name, exchange_name, options = {})
  queue_options = options[:queue_options] || {durable: true}
  routing_keys  = options[:routing_keys] || []
  log "Isimud::BunnyClient: create_queue #{queue_name}: queue_options=#{queue_options.inspect}"
  queue = find_queue(queue_name, queue_options)
  bind_routing_keys(queue, exchange_name, routing_keys) if routing_keys.any?
  queue
end

#delete_queue(queue_name) ⇒ AMQ::Protocol::Queue::DeleteOk

Permanently delete the queue from the AMQP server. Any messages present in the queue will be discarded.

Parameters:

  • queue_name (String)

    queue name

Returns:

  • (AMQ::Protocol::Queue::DeleteOk)

    RabbitMQ response



96
97
98
# File 'lib/isimud/bunny_client.rb', line 96

def delete_queue(queue_name)
  channel.queue_delete(queue_name)
end

#find_queue(queue_name, options = {durable: true}) ⇒ Object

Look up a queue by name, or create it if it does not already exist.



167
168
169
# File 'lib/isimud/bunny_client.rb', line 167

def find_queue(queue_name, options = {durable: true})
  channel.queue(queue_name, options)
end

#publish(exchange, routing_key, payload, options = {}) ⇒ Object

Publish a message to the specified exchange, which is declared as a durable, topic exchange. Note that message

is always persisted.

Parameters:

  • exchange (String)

    AMQP exchange name

  • routing_key (String)

    message routing key. This should always be in the form of words separated by dots e.g. “user.goal.complete”

  • payload (String)

    message payload

  • options (Hash) (defaults to: {})

    additional message options

See Also:



154
155
156
157
# File 'lib/isimud/bunny_client.rb', line 154

def publish(exchange, routing_key, payload, options = {})
  log "Isimud::BunnyClient#publish: exchange=#{exchange} routing_key=#{routing_key}", :debug
  channel.topic(exchange, durable: true).publish(payload, options.merge(routing_key: routing_key, persistent: true))
end

#reconnectBunny::Session

Close and reopen the AMQP connection

Returns:

  • (Bunny::Session)


161
162
163
164
# File 'lib/isimud/bunny_client.rb', line 161

def reconnect
  close
  connect
end

#resetObject

Reset this client by closing all channels for the connection.



126
127
128
# File 'lib/isimud/bunny_client.rb', line 126

def reset
  connection.close_all_channels
end

#subscribe(queue, options = {}) {|payload| ... } ⇒ Object

Subscribe to messages on the Bunny queue. The provided block will be called each time a message is received.

The message will be acknowledged and deleted from the queue unless an exception is raised from the block.
In the case that an uncaught exception is raised, the message is rejected, and any declared exception handlers
will be called.

Parameters:

  • queue (Bunny::Queue)

    Bunny queue

  • options (Hash) (defaults to: {})

    {} subscription options – @see Bunny::Queue#subscribe

Yield Parameters:

  • payload (String)

    message text



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/isimud/bunny_client.rb', line 69

def subscribe(queue, options = {}, &block)
  queue.subscribe(options.merge(manual_ack: true)) do |delivery_info, properties, payload|
    current_channel = delivery_info.channel
    begin
      log "Isimud: queue #{queue.name} received #{properties[:message_id]} routing_key: #{delivery_info.routing_key}", :debug
      Thread.current['isimud_queue_name']    = queue.name
      Thread.current['isimud_delivery_info'] = delivery_info
      Thread.current['isimud_properties']    = properties
      block.call(payload)
      if current_channel.open?
        log "Isimud: queue #{queue.name} finished with #{properties[:message_id]}, acknowledging", :debug
        current_channel.ack(delivery_info.delivery_tag)
      else
        log "Isimud: queue #{queue.name} unable to acknowledge #{properties[:message_id]}", :warn
      end
    rescue => e
      log("Isimud: queue #{queue.name} error processing #{properties[:message_id]} payload #{payload.inspect}: #{e.class.name} #{e.message}\n  #{e.backtrace.join("\n  ")}", :warn)
      retry_status = run_exception_handlers(e)
      log "Isimud: rejecting #{properties[:message_id]} requeue=#{retry_status}", :warn
      current_channel.open? && current_channel.reject(delivery_info.delivery_tag, retry_status)
    end
  end
end