Class: Cuniculus::Dispatcher
- Inherits:
-
Object
- Object
- Cuniculus::Dispatcher
- Defined in:
- lib/cuniculus/dispatcher.rb
Overview
The dispatcher forwards jobs to a worker pool to be published to RabbitMQ. It holds a RabbitMQ session and, when it receives information from one of its workers that a network exception occurred, tries to reestablish the connection and restarts the pool.
The dispatcher background thread, which monitors for connection errors, is started whenever the first job is enqueued by a Worker.
Constant Summary collapse
- ENFORCED_CONN_OPTS =
{ threaded: false, # No need for a reader thread, since this connection is only used for publishing automatically_recover: false, logger: ::Logger.new(IO::NULL) }.freeze
- RECOVERABLE_ERRORS =
[AMQ::Protocol::Error, ::Bunny::Exception, Errno::ECONNRESET].freeze
Instance Attribute Summary collapse
-
#dispatcher_chan ⇒ Object
readonly
Returns the value of attribute dispatcher_chan.
-
#job_queue ⇒ Object
readonly
Returns the value of attribute job_queue.
-
#reconnect_attempts ⇒ Object
readonly
Returns the value of attribute reconnect_attempts.
-
#reconnect_delay ⇒ Object
readonly
Returns the value of attribute reconnect_delay.
-
#reconnect_delay_max ⇒ Object
readonly
Returns the value of attribute reconnect_delay_max.
-
#shutdown_grace_period ⇒ Object
readonly
Returns the value of attribute shutdown_grace_period.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
Whether its background thread is running.
- #describe(log_level = Logger::DEBUG) ⇒ Object
-
#initialize(config) ⇒ Dispatcher
constructor
Instantiates a dispatcher using the passed Config.
-
#recover_from_net_error ⇒ void
Starts connection to RabbitMQ followed by starting the workers background threads.
-
#shutdown ⇒ void
Shutdown workers, giving them time to conclude outstanding tasks.
-
#start! ⇒ Object
Starts a thread responsible for reestablishing lost RabbitMQ connections and restarting PubWorkers.
Constructor Details
#initialize(config) ⇒ Dispatcher
Instantiates a dispatcher using the passed Config.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/cuniculus/dispatcher.rb', line 27 def initialize(config) @config = config @conn = nil @job_queue = Queue.new @dispatcher_chan = Queue.new @shutdown = false @workers = config.pub_pool_size.times.map do |i| Cuniculus::PubWorker.new(config, @job_queue, @dispatcher_chan) end @reconnect_attempts = config.pub_reconnect_attempts @reconnect_delay = config.pub_reconnect_delay @reconnect_delay_max = config.pub_reconnect_delay_max @shutdown_grace_period = config.pub_shutdown_grace_period @thread = nil @shutdown = false end |
Instance Attribute Details
#dispatcher_chan ⇒ Object (readonly)
Returns the value of attribute dispatcher_chan.
22 23 24 |
# File 'lib/cuniculus/dispatcher.rb', line 22 def dispatcher_chan @dispatcher_chan end |
#job_queue ⇒ Object (readonly)
Returns the value of attribute job_queue.
22 23 24 |
# File 'lib/cuniculus/dispatcher.rb', line 22 def job_queue @job_queue end |
#reconnect_attempts ⇒ Object (readonly)
Returns the value of attribute reconnect_attempts.
22 23 24 |
# File 'lib/cuniculus/dispatcher.rb', line 22 def reconnect_attempts @reconnect_attempts end |
#reconnect_delay ⇒ Object (readonly)
Returns the value of attribute reconnect_delay.
22 23 24 |
# File 'lib/cuniculus/dispatcher.rb', line 22 def reconnect_delay @reconnect_delay end |
#reconnect_delay_max ⇒ Object (readonly)
Returns the value of attribute reconnect_delay_max.
22 23 24 |
# File 'lib/cuniculus/dispatcher.rb', line 22 def reconnect_delay_max @reconnect_delay_max end |
#shutdown_grace_period ⇒ Object (readonly)
Returns the value of attribute shutdown_grace_period.
22 23 24 |
# File 'lib/cuniculus/dispatcher.rb', line 22 def shutdown_grace_period @shutdown_grace_period end |
Instance Method Details
#alive? ⇒ Boolean
Whether its background thread is running.
85 86 87 |
# File 'lib/cuniculus/dispatcher.rb', line 85 def alive? @thread&.alive? || false end |
#describe(log_level = Logger::DEBUG) ⇒ Object
44 45 46 47 48 49 |
# File 'lib/cuniculus/dispatcher.rb', line 44 def describe(log_level = Logger::DEBUG) Cuniculus.logger.info @thread&.backtrace @workers.each do |w| Cuniculus.logger.log(log_level, w.instance_variable_get(:@thread)&.backtrace) end end |
#recover_from_net_error ⇒ void
This method returns an undefined value.
Starts connection to RabbitMQ followed by starting the workers background threads.
if it fails to connect, it keeps retrying for a certain number of attempts, defined by Config#pub_reconnect_attempts. For unlimited retries, this value should be set to ‘:infinite`.
The time between reconnect attempts follows an exponential backoff formula:
“‘ t = delay * 2^(n-1) “`
where n is the attempt number, and delay is defined by Config#pub_reconnect_delay.
If Config#pub_reconnect_delay_max is defined, it works as a cap for the above time.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/cuniculus/dispatcher.rb', line 104 def recover_from_net_error attempt = 0 begin @conn.start Cuniculus.logger.info("Connection established") @workers.each { |w| w.start!(@conn) } rescue *RECOVERABLE_ERRORS => ex handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::RMQConnectionError)) sleep_time = @shutdown ? 1 : [(reconnect_delay * 2**(attempt-1)), reconnect_delay_max].min sleep sleep_time attempt += 1 retry if @shutdown && attempt <= reconnect_delay_max retry if reconnect_attempts == :infinite || attempt <= reconnect_attempts end end |
#shutdown ⇒ void
This method returns an undefined value.
Shutdown workers, giving them time to conclude outstanding tasks.
Shutdown is forced after Config#pub_shutdown_grace_period seconds.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/cuniculus/dispatcher.rb', line 127 def shutdown Cuniculus.logger.info("Cuniculus: Shutting down dispatcher") @shutdown = true alive_size = @workers.size shutdown_t0 = Cuniculus.mark_time sleep 1 until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || @job_queue.empty? until Cuniculus.mark_time - shutdown_t0 > shutdown_grace_period || (alive_size = @workers.select(&:alive?).size) == 0 sleep 1 alive_size.times { @job_queue << :shutdown } end @dispatcher_chan << :shutdown alive_size = @workers.select(&:alive?).size return unless alive_size > 0 Cuniculus.logger.warn("Cuniculus: Forcing shutdown with #{alive_size} workers remaining") describe end |
#start! ⇒ Object
Starts a thread responsible for reestablishing lost RabbitMQ connections and restarting PubWorkers.
It keeps track of the last time it had to reconnect, in case it receives outdated messages of failed connections from workers.
PubWorkers communicate to it through its ‘dispatcher_chan` queue. Depending on the content fetched from the dispatcher channel, it takes different actions:
-
when a :shutdown message is received, it waits until current jobs are finished (up to the configured ‘shutdown_grace_period`) and stops its background thread.
-
when a timestamp is received that is smaller than the last reconnect timestamp, the message is ignored
-
when the timestamp is larger than the last reconnect timestamp, it tries to reestablish the connection to RabbitMQ and restarts its workers.
Note that the first time the dispatcher is started, it sends a message to its own background thread with a timestamp to trigger the first connection.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/cuniculus/dispatcher.rb', line 65 def start! return if @shutdown || @thread&.alive? @thread = Thread.new do last_connect_time = 0 loop do disconnect_time = @dispatcher_chan.pop break if disconnect_time == :shutdown if disconnect_time > last_connect_time recover_from_net_error last_connect_time = Cuniculus.mark_time end end end @conn = ::Bunny.new(@config.rabbitmq_opts.merge(ENFORCED_CONN_OPTS).merge(session_error_handler: @thread)) @dispatcher_chan << Cuniculus.mark_time end |