Class: Cuniculus::PubWorker
- Inherits:
-
Object
- Object
- Cuniculus::PubWorker
- Defined in:
- lib/cuniculus/pub_worker.rb
Overview
Each PubWorker maintains a background thread in a loop, fetching jobs reaching its job queue and publishing the payloads to RabbitMQ. They are not instantiated directly, but are rather created and managed by a Dispatcher.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Whether the background thread is running.
-
#initialize(config, job_queue, dispatcher_chan) ⇒ PubWorker
constructor
A new instance of PubWorker.
-
#start!(conn) ⇒ Object
Declares exchanges, and starts a background thread that consumes and publishes messages.
Constructor Details
#initialize(config, job_queue, dispatcher_chan) ⇒ PubWorker
Returns a new instance of PubWorker.
10 11 12 13 14 15 16 |
# File 'lib/cuniculus/pub_worker.rb', line 10 def initialize(config, job_queue, dispatcher_chan) @config = config @job_queue = job_queue @dispatcher_chan = dispatcher_chan @mutex = Mutex.new @thread = nil end |
Instance Method Details
#alive? ⇒ Boolean
Whether the background thread is running.
40 41 42 |
# File 'lib/cuniculus/pub_worker.rb', line 40 def alive? @thread&.alive? || false end |
#start!(conn) ⇒ Object
Declares exchanges, and starts a background thread that consumes and publishes messages.
If the connection to RabbitMQ it receives is not established, or if it fails to declare the exchanges, the background thread is not started and a message is sent to the dispatcher channel with the current timestamp. The dispatcher is then responsible for trying to set the connection up again and starting each of its workers.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/cuniculus/pub_worker.rb', line 26 def start!(conn) return @dispatcher_chan << Cuniculus.mark_time unless conn.open? @channel = sync { conn.create_channel } @x = sync { @channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true }) } @dlx = sync { @channel.fanout(Cuniculus::CUNICULUS_DLX_EXCHANGE, { durable: true }) } @thread = Thread.new { run } rescue Bunny::Exception @dispatcher_chan << Cuniculus.mark_time end |