Class: Startback::Event::Bus::Bunny::Async

Inherits:
Object
  • Object
show all
Includes:
Support::Robustness
Defined in:
lib/startback/event/bus/bunny/async.rb

Overview

Asynchronous implementation of the bus abstraction, on top of RabbitMQ and using the ‘bunny’ gem (you need to include it in your Gemfile yourself: it is NOT a startback official dependency).

This bus implementation emits events by dumping them to RabbitMQ using the event type as exchange name. Listeners may use the ‘processor` parameter to specify the queue name ; otherwise a default “main” queue is used.

Examples:

# Connects to RabbitMQ using all default options
#
# Uses the STARTBACK_BUS_BUNNY_ASYNC_URL environment variable for
# connection URL if present.
Startback::Bus::Bunny::Async.new

# Connects to RabbitMQ using a specific URL
Startback::Bus::Bunny::Async.new("amqp://rabbituser:[email protected]")
Startback::Bus::Bunny::Async.new(url: "amqp://rabbituser:[email protected]")

# Connects to RabbitMQ using specific connection options. See Bunny's own
# documentation
Startback::Bus::Bunny::Async.new({
  connection_options: {
    host: "192.168.17.17"
  }
})

Constant Summary collapse

CHANNEL_KEY =
'Startback::Bus::Bunny::Async::ChannelKey'
DEFAULT_OPTIONS =
{
  # (optional) The URL to use for connecting to RabbitMQ.
  url: ENV['STARTBACK_BUS_BUNNY_ASYNC_URL'],

  # (optional) The options has to pass to ::Bunny constructor
  connection_options: nil,

  # (optional) The options to use for the emitter/listener fanout
  fanout_options: {},

  # (optional) The options to use for the listener queue
  queue_options: {},

  # (optional) Default event factory to use, if any
  event_factory: nil,

  # (optional) A default context to use for general logging
  context: nil,

  # (optional) Size of consumer pool
  consumer_pool_size: 1,

  # (optional) Whether the program must be aborted on consumption
  # error
  abort_on_exception: true,

  # (optional) Whether connection occurs immediately,
  # or on demand later
  autoconnect: false
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Support::Robustness

#log, #monitor, #stop_errors, #try_max_times

Constructor Details

#initialize(options = {}) ⇒ Async

Creates a bus instance, using the various options provided to fine-tune behavior.



74
75
76
77
78
# File 'lib/startback/event/bus/bunny/async.rb', line 74

def initialize(options = {})
  options = { url: options } if options.is_a?(String)
  @options = DEFAULT_OPTIONS.merge(options)
  connect if @options[:autoconnect]
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



79
80
81
# File 'lib/startback/event/bus/bunny/async.rb', line 79

def options
  @options
end

Instance Method Details

#channelObject



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/startback/event/bus/bunny/async.rb', line 104

def channel
  unless @bunny
    raise Startback::Errors::Error, "Please connect your bus first, or use autoconnect: true"
  end

  Thread.current[CHANNEL_KEY] ||= @bunny.create_channel(
    nil,
    consumer_pool_size, # consumer_pool_size
    abort_on_exception? # consumer_pool_abort_on_exception
  )
end

#connectObject



81
82
83
84
85
86
87
88
89
90
# File 'lib/startback/event/bus/bunny/async.rb', line 81

def connect
  disconnect
  conn = options[:connection_options] || options[:url]
  try_max_times(10) do
    @bunny = ::Bunny.new(conn)
    @bunny.start
    channel # make sure we already create the channel
    log(:info, {op: "#{self.class.name}#connect", op_data: conn}, options[:context])
  end
end

#connected?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/startback/event/bus/bunny/async.rb', line 100

def connected?
  @bunny && @bunny.connected?
end

#disconnectObject



92
93
94
95
96
97
98
# File 'lib/startback/event/bus/bunny/async.rb', line 92

def disconnect
  if channel = Thread.current[CHANNEL_KEY]
    channel.close
    Thread.current[CHANNEL_KEY] = nil
  end
  @bunny.close if @bunny
end

#emit(event) ⇒ Object



116
117
118
119
120
121
# File 'lib/startback/event/bus/bunny/async.rb', line 116

def emit(event)
  stop_errors(self, "emit", event.context) do
    fanout = channel.fanout(event.type.to_s, fanout_options)
    fanout.publish(event.to_json)
  end
end

#listen(type, processor = nil, listener = nil, &bl) ⇒ Object

Raises:

  • (ArgumentError)


123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/startback/event/bus/bunny/async.rb', line 123

def listen(type, processor = nil, listener = nil, &bl)
  raise ArgumentError, "A listener must be provided" unless listener || bl

  fanout = channel.fanout(type.to_s, fanout_options)
  queue = channel.queue((processor || "main").to_s, queue_options)
  queue.bind(fanout)
  queue.subscribe do |delivery_info, properties, body|
    stop_errors(self, "listen") do
      (listener || bl).call(body)
    end
  end
end