Class: Sqewer::Worker
- Inherits:
-
Object
- Object
- Sqewer::Worker
- Defined in:
- lib/sqewer/worker.rb
Overview
A massively threaded worker engine
Constant Summary collapse
- DEFAULT_NUM_THREADS =
4
- SLEEP_SECONDS_ON_EMPTY_QUEUE =
1
- THROTTLE_FACTOR =
2
Instance Attribute Summary collapse
-
#connection ⇒ Sqewer::Connection
readonly
The connection for sending and receiving messages.
-
#execution_context_class ⇒ Class
readonly
The class to use when instantiating the execution context.
-
#logger ⇒ Logger
readonly
The logger used for job execution.
-
#middleware_stack ⇒ Sqewer::MiddlewareStack
readonly
The stack used when executing the job.
-
#num_threads ⇒ Fixnum
readonly
The number of worker threads set up for this Worker.
-
#serializer ⇒ Sqewer::Serializer
readonly
The serializer for unmarshalling and marshalling.
-
#state ⇒ Symbol
readonly
The current state of this Worker.
-
#submitter_class ⇒ Class
readonly
The class used to create the Submitter used by jobs to spawn other jobs.
-
#threads ⇒ Array<Thread>
readonly
All the currently running threads of the Worker.
Class Method Summary collapse
-
.default ⇒ Sqewer::Worker
Returns a Worker instance, configured based on the default components.
Instance Method Summary collapse
-
#debug_thread_information! ⇒ Object
Prints the status and the backtraces of all controlled threads to the logger.
-
#initialize(connection: Sqewer::Connection.default, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, middleware_stack: Sqewer::MiddlewareStack.default, logger: Logger.new($stderr), num_threads: DEFAULT_NUM_THREADS) ⇒ Worker
constructor
Creates a new Worker.
-
#kill ⇒ Object
Performs a hard shutdown by killing all the threads.
-
#start ⇒ void
Start listening on the queue, spin up a number of consumer threads that will execute the jobs.
-
#stop ⇒ true
Attempts to softly stop the running consumers and the producer.
Constructor Details
#initialize(connection: Sqewer::Connection.default, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, middleware_stack: Sqewer::MiddlewareStack.default, logger: Logger.new($stderr), num_threads: DEFAULT_NUM_THREADS) ⇒ Worker
Creates a new Worker. The Worker, unlike it is in the Rails tradition, is only responsible for the actual processing of jobs, and not for the job arguments.
the worker for each job execution)
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/sqewer/worker.rb', line 57 def initialize(connection: Sqewer::Connection.default, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, middleware_stack: Sqewer::MiddlewareStack.default, logger: Logger.new($stderr), num_threads: DEFAULT_NUM_THREADS) @logger = logger @connection = connection @serializer = serializer @middleware_stack = middleware_stack @execution_context_class = execution_context_class @submitter_class = submitter_class @num_threads = num_threads @threads = [] raise ArgumentError, "num_threads must be > 0" unless num_threads > 0 @execution_counter = Sqewer::AtomicCounter.new @state = Sqewer::StateLock.new end |
Instance Attribute Details
#connection ⇒ Sqewer::Connection (readonly)
Returns The connection for sending and receiving messages.
16 17 18 |
# File 'lib/sqewer/worker.rb', line 16 def connection @connection end |
#execution_context_class ⇒ Class (readonly)
Returns The class to use when instantiating the execution context.
25 26 27 |
# File 'lib/sqewer/worker.rb', line 25 def execution_context_class @execution_context_class end |
#logger ⇒ Logger (readonly)
Returns The logger used for job execution.
13 14 15 |
# File 'lib/sqewer/worker.rb', line 13 def logger @logger end |
#middleware_stack ⇒ Sqewer::MiddlewareStack (readonly)
Returns The stack used when executing the job.
22 23 24 |
# File 'lib/sqewer/worker.rb', line 22 def middleware_stack @middleware_stack end |
#num_threads ⇒ Fixnum (readonly)
Returns the number of worker threads set up for this Worker.
34 35 36 |
# File 'lib/sqewer/worker.rb', line 34 def num_threads @num_threads end |
#serializer ⇒ Sqewer::Serializer (readonly)
Returns The serializer for unmarshalling and marshalling.
19 20 21 |
# File 'lib/sqewer/worker.rb', line 19 def serializer @serializer end |
#state ⇒ Symbol (readonly)
Returns the current state of this Worker.
37 38 39 |
# File 'lib/sqewer/worker.rb', line 37 def state @state end |
#submitter_class ⇒ Class (readonly)
Returns The class used to create the Submitter used by jobs to spawn other jobs.
28 29 30 |
# File 'lib/sqewer/worker.rb', line 28 def submitter_class @submitter_class end |
#threads ⇒ Array<Thread> (readonly)
Returns all the currently running threads of the Worker.
31 32 33 |
# File 'lib/sqewer/worker.rb', line 31 def threads @threads end |
Class Method Details
.default ⇒ Sqewer::Worker
Returns a Worker instance, configured based on the default components
42 43 44 |
# File 'lib/sqewer/worker.rb', line 42 def self.default new end |
Instance Method Details
#debug_thread_information! ⇒ Object
Prints the status and the backtraces of all controlled threads to the logger
187 188 189 190 191 192 |
# File 'lib/sqewer/worker.rb', line 187 def debug_thread_information! @threads.each do | t | @logger.debug { t.inspect } @logger.debug { t.backtrace } end end |
#kill ⇒ Object
Performs a hard shutdown by killing all the threads
178 179 180 181 182 183 184 |
# File 'lib/sqewer/worker.rb', line 178 def kill @state.transition! :stopping @logger.info { '[worker] Killing (unclean shutdown), will kill all threads'} @threads.map(&:kill) @logger.info { '[worker] Stopped'} @state.transition! :stopped end |
#start ⇒ void
This method returns an undefined value.
Start listening on the queue, spin up a number of consumer threads that will execute the jobs.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/sqewer/worker.rb', line 86 def start @state.transition! :starting @logger.info { '[worker] Starting with %d consumer threads' % @num_threads } @execution_queue = Queue.new # Ensure that unhandled exceptions inside threads make the worker fail, # to avoid silent failures with no consumer threads running. Thread.abort_on_exception = true consumers = (1..@num_threads).each_with_index.map do |_, index| Thread.new do Thread.current[:role] = :consumer Thread.current[:id] = index loop { take_and_execute } end end # Create the provider thread. When the execution queue is exhausted, # grab new messages and place them on the local queue. owning_worker = self # self won't be self anymore in the thread provider = Thread.new do Thread.current[:role] = :provider loop do begin break if stopping? if queue_has_capacity? = @connection. if .any? .each {|m| @execution_queue << m } @logger.debug { "[worker] Received and buffered %d messages" % .length } if .any? else @logger.debug { "[worker] No messages received" } sleep SLEEP_SECONDS_ON_EMPTY_QUEUE end else @logger.debug { "[worker] Cache is full (%d items), postponing receive" % @execution_queue.length } sleep SLEEP_SECONDS_ON_EMPTY_QUEUE end rescue StandardError => e @logger.fatal "Exiting because message receiving thread died. Exception causing this: #{e.inspect}" owning_worker.stop # allow any queues and/or running jobs to complete end end end # Register the provider separately for the situation where it hangs in `receive_messages` and doesn't # terminate from within it's own run loop. @provider_thread = provider @threads = consumers + [provider] # If any of our threads are already dead, it means there is some misconfiguration and startup failed if @threads.any?{|t| !t.alive? } @threads.map(&:kill) @state.transition! :failed @logger.fatal { '[worker] Failed to start (one or more threads died on startup)' } else @state.transition! :running @logger.info { '[worker] Started, %d consumer threads' % consumers.length } end end |
#stop ⇒ true
Attempts to softly stop the running consumers and the producer. Once the call is made, all the threads will stop after the local cache of messages is emptied. This is to ensure that message drops do not happen just because the worker is about to be terminated.
The call will block until all the threads of the worker are terminated
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/sqewer/worker.rb', line 156 def stop @state.transition! :stopping @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain. Killing the provider thread now.' } @provider_thread.kill loop do n_live = @threads.select(&:alive?).length break if n_live.zero? n_dead = @threads.length - n_live @logger.info { '[worker] Staged shutdown, %d threads alive, %d have quit, %d jobs in local cache' % [n_live, n_dead, @execution_queue.length] } sleep 2 end @threads.map(&:join) @logger.info { '[worker] Stopped'} @state.transition! :stopped true end |