Class: Basquiat::Adapters::RabbitMq
- Defined in:
- lib/basquiat/adapters/rabbitmq_adapter.rb,
lib/basquiat/adapters/rabbitmq/events.rb,
lib/basquiat/adapters/rabbitmq/message.rb,
lib/basquiat/adapters/rabbitmq/session.rb,
lib/basquiat/adapters/rabbitmq/connection.rb,
lib/basquiat/adapters/rabbitmq/configuration.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/base_strategy.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/dead_lettering.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/auto_acknowledge.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/delayed_delivery.rb,
lib/basquiat/adapters/rabbitmq/requeue_strategies/basic_acknowledge.rb
Overview
The RabbitMQ adapter for Basquiat
Defined Under Namespace
Classes: AutoAcknowledge, BaseStrategy, BasicAcknowledge, Configuration, Connection, DeadLettering, DelayedDelivery, Events, Message, Session
Instance Attribute Summary collapse
-
#procs ⇒ Object
readonly
Returns the value of attribute procs.
Instance Method Summary collapse
-
#base_options ⇒ Object
Since the RabbitMQ configuration options are quite vast and it’s interations with the requeue strategies a bit convoluted it uses a Configuration object to handle it all.
-
#initialize(procs: Events.new) ⇒ RabbitMq
constructor
Initializes the superclass using a Events object as the procs instance variable.
-
#listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc) ⇒ Object
Binds the queues and start the event lopp.
- #process_message(message, rescue_proc) ⇒ Object
-
#publish(event, message, props: {}) ⇒ Object
Publishes the event to the exchange configured.
-
#reset_connection ⇒ Object
(also: #disconnect)
Reset the connection to RabbitMQ.
-
#session ⇒ Session
Lazy initializes and return the session.
-
#session_pool ⇒ ConnectionPool<Session>
Lazy initializes and return the session pool.
-
#strategy ⇒ BaseStrategy
Lazy initializes the requeue strategy configured for the adapter.
-
#subscribe_to(event_name, proc) ⇒ Object
Adds the subscription and register the proc to the event.
Methods inherited from Base
#adapter_options, #default_options, register_strategy, #strategies, strategies, strategy
Constructor Details
Instance Attribute Details
#procs ⇒ Object (readonly)
Returns the value of attribute procs.
12 13 14 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 12 def procs @procs end |
Instance Method Details
#base_options ⇒ Object
Since the RabbitMQ configuration options are quite vast and it’s interations with the requeue strategies a bit convoluted it uses a Configuration object to handle it all
29 30 31 32 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 29 def @configuration ||= Configuration.new @configuration.(Basquiat.configuration.) end |
#listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc) ⇒ Object
Binds the queues and start the event lopp.
56 57 58 59 60 61 62 63 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 56 def listen(block: true, rescue_proc: Basquiat.configuration.rescue_proc) procs.keys.each { |key| session.bind_queue(key) } session.subscribe(block: block) do || strategy.run() do (, rescue_proc) end end end |
#process_message(message, rescue_proc) ⇒ Object
65 66 67 68 69 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 65 def (, rescue_proc) procs[.routing_key].call() rescue StandardError => ex rescue_proc.call(ex, ) end |
#publish(event, message, props: {}) ⇒ Object
Publishes the event to the exchange configured.
45 46 47 48 49 50 51 52 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 45 def publish(event, , props: {}) if [:publisher][:session_pool] session_pool.with { |session| session.publish(event, , props) } else session.publish(event, , props) end disconnect unless [:publisher][:persistent] end |
#reset_connection ⇒ Object Also known as: disconnect
Reset the connection to RabbitMQ.
72 73 74 75 76 77 78 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 72 def reset_connection connection.disconnect @connection = nil @session = nil @session_pool = nil @strategy = nil end |
#session ⇒ Session
Lazy initializes and return the session
90 91 92 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 90 def session @session ||= Session.new(connection.create_channel, @configuration.) end |
#session_pool ⇒ ConnectionPool<Session>
Lazy initializes and return the session pool
96 97 98 99 100 101 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 96 def session_pool @session_pool ||= ConnectionPool.new(size: [:publisher][:session_pool].fetch(:size, 1), timeout: [:publisher][:session_pool].fetch(:timeout, 5)) do Session.new(connection.create_channel, @configuration.) end end |
#strategy ⇒ BaseStrategy
Lazy initializes the requeue strategy configured for the adapter
84 85 86 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 84 def strategy @strategy ||= @configuration.strategy.new(session) end |
#subscribe_to(event_name, proc) ⇒ Object
Adds the subscription and register the proc to the event.
37 38 39 |
# File 'lib/basquiat/adapters/rabbitmq_adapter.rb', line 37 def subscribe_to(event_name, proc) procs[event_name] = proc end |