Class: Startback::Event::Bus::Bunny::Async
- Inherits:
-
Object
- Object
- Startback::Event::Bus::Bunny::Async
- Includes:
- Support::Robustness
- Defined in:
- lib/startback/event/bus/bunny/async.rb
Overview
Asynchronous implementation of the bus abstraction, on top of RabbitMQ and using the ‘bunny’ gem (you need to include it in your Gemfile yourself: it is NOT a startback official dependency).
This bus implementation emits events by dumping them to RabbitMQ using the event type as exchange name. Listeners may use the ‘processor` parameter to specify the queue name ; otherwise a default “main” queue is used.
Examples:
# Connects to RabbitMQ using all default options
#
# Uses the STARTBACK_BUS_BUNNY_ASYNC_URL environment variable for
# connection URL if present.
Startback::Bus::Bunny::Async.new
# Connects to RabbitMQ using a specific URL
Startback::Bus::Bunny::Async.new("amqp://rabbituser:[email protected]")
Startback::Bus::Bunny::Async.new(url: "amqp://rabbituser:[email protected]")
# Connects to RabbitMQ using specific connection options. See Bunny's own
# documentation
Startback::Bus::Bunny::Async.new({
connection_options: {
host: "192.168.17.17"
}
})
Constant Summary collapse
- CHANNEL_KEY =
'Startback::Bus::Bunny::Async::ChannelKey'
- DEFAULT_OPTIONS =
{ # (optional) The URL to use for connecting to RabbitMQ. url: ENV['STARTBACK_BUS_BUNNY_ASYNC_URL'], # (optional) The options has to pass to ::Bunny constructor connection_options: nil, # (optional) The options to use for the emitter/listener fanout fanout_options: {}, # (optional) The options to use for the listener queue queue_options: {}, # (optional) Default event factory to use, if any event_factory: nil, # (optional) A default context to use for general logging context: nil, # (optional) Size of consumer pool consumer_pool_size: 1, # (optional) Whether the program must be aborted on consumption # error abort_on_exception: true, # (optional) Whether connection occurs immediately, # or on demand later autoconnect: false }
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #channel ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
- #emit(event) ⇒ Object
-
#initialize(options = {}) ⇒ Async
constructor
Creates a bus instance, using the various options provided to fine-tune behavior.
- #listen(type, processor = nil, listener = nil, &bl) ⇒ Object
Methods included from Support::Robustness
#log, #monitor, #stop_errors, #try_max_times
Constructor Details
#initialize(options = {}) ⇒ Async
Creates a bus instance, using the various options provided to fine-tune behavior.
74 75 76 77 78 |
# File 'lib/startback/event/bus/bunny/async.rb', line 74 def initialize( = {}) = { url: } if .is_a?(String) @options = DEFAULT_OPTIONS.merge() connect if @options[:autoconnect] end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
79 80 81 |
# File 'lib/startback/event/bus/bunny/async.rb', line 79 def @options end |
Instance Method Details
#channel ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/startback/event/bus/bunny/async.rb', line 104 def channel unless @bunny raise Startback::Errors::Error, "Please connect your bus first, or use autoconnect: true" end Thread.current[CHANNEL_KEY] ||= @bunny.create_channel( nil, consumer_pool_size, # consumer_pool_size abort_on_exception? # consumer_pool_abort_on_exception ) end |
#connect ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/startback/event/bus/bunny/async.rb', line 81 def connect disconnect conn = [:connection_options] || [:url] try_max_times(10) do @bunny = ::Bunny.new(conn) @bunny.start channel # make sure we already create the channel log(:info, {op: "#{self.class.name}#connect", op_data: conn}, [:context]) end end |
#connected? ⇒ Boolean
100 101 102 |
# File 'lib/startback/event/bus/bunny/async.rb', line 100 def connected? @bunny && @bunny.connected? end |
#disconnect ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/startback/event/bus/bunny/async.rb', line 92 def disconnect if channel = Thread.current[CHANNEL_KEY] channel.close Thread.current[CHANNEL_KEY] = nil end @bunny.close if @bunny end |
#emit(event) ⇒ Object
116 117 118 119 120 121 |
# File 'lib/startback/event/bus/bunny/async.rb', line 116 def emit(event) stop_errors(self, "emit", event.context) do fanout = channel.fanout(event.type.to_s, ) fanout.publish(event.to_json) end end |
#listen(type, processor = nil, listener = nil, &bl) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/startback/event/bus/bunny/async.rb', line 123 def listen(type, processor = nil, listener = nil, &bl) raise ArgumentError, "A listener must be provided" unless listener || bl fanout = channel.fanout(type.to_s, ) queue = channel.queue((processor || "main").to_s, ) queue.bind(fanout) queue.subscribe do |delivery_info, properties, body| stop_errors(self, "listen") do (listener || bl).call(body) end end end |