Class: Sqewer::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sqewer/worker.rb

Overview

A massively threaded worker engine

Constant Summary collapse

DEFAULT_NUM_THREADS =
4
SLEEP_SECONDS_ON_EMPTY_QUEUE =
1
THROTTLE_FACTOR =
2

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection: Sqewer::Connection.default, serializer: Sqewer::Serializer.default, execution_context_class: Sqewer::ExecutionContext, submitter_class: Sqewer::Submitter, middleware_stack: Sqewer::MiddlewareStack.default, logger: Logger.new($stderr), num_threads: DEFAULT_NUM_THREADS) ⇒ Worker

Creates a new Worker. The Worker, unlike it is in the Rails tradition, is only responsible for the actual processing of jobs, and not for the job arguments.

the worker for each job execution)

Parameters:

  • connection (Sqewer::Connection) (defaults to: Sqewer::Connection.default)

    the object that handles polling and submitting

  • serializer (#serialize, #unserialize) (defaults to: Sqewer::Serializer.default)

    the serializer/unserializer for the jobs

  • execution_context_class (Class) (defaults to: Sqewer::ExecutionContext)

    the class for the execution context (will be instantiated by

  • submitter_class (Class) (defaults to: Sqewer::Submitter)

    the class used for submitting jobs (will be instantiated by the worker for each job execution)

  • middleware_stack (Sqewer::MiddlewareStack) (defaults to: Sqewer::MiddlewareStack.default)

    the middleware stack that is going to be used

  • logger (Logger) (defaults to: Logger.new($stderr))

    the logger to log execution to and to pass to the jobs

  • num_threads (Fixnum) (defaults to: DEFAULT_NUM_THREADS)

    how many worker threads to spawn

Raises:

  • (ArgumentError)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sqewer/worker.rb', line 57

def initialize(connection: Sqewer::Connection.default,
    serializer: Sqewer::Serializer.default,
    execution_context_class: Sqewer::ExecutionContext,
    submitter_class: Sqewer::Submitter,
    middleware_stack: Sqewer::MiddlewareStack.default,
    logger: Logger.new($stderr),
    num_threads: DEFAULT_NUM_THREADS)

  @logger = logger
  @connection = connection
  @serializer = serializer
  @middleware_stack = middleware_stack
  @execution_context_class = execution_context_class
  @submitter_class = submitter_class
  @num_threads = num_threads

  @threads = []

  raise ArgumentError, "num_threads must be > 0" unless num_threads > 0

  @execution_counter = Sqewer::AtomicCounter.new

  @state = Sqewer::StateLock.new
end

Instance Attribute Details

#connectionSqewer::Connection (readonly)

Returns The connection for sending and receiving messages.

Returns:



16
17
18
# File 'lib/sqewer/worker.rb', line 16

def connection
  @connection
end

#execution_context_classClass (readonly)

Returns The class to use when instantiating the execution context.

Returns:

  • (Class)

    The class to use when instantiating the execution context



25
26
27
# File 'lib/sqewer/worker.rb', line 25

def execution_context_class
  @execution_context_class
end

#loggerLogger (readonly)

Returns The logger used for job execution.

Returns:

  • (Logger)

    The logger used for job execution



13
14
15
# File 'lib/sqewer/worker.rb', line 13

def logger
  @logger
end

#middleware_stackSqewer::MiddlewareStack (readonly)

Returns The stack used when executing the job.

Returns:



22
23
24
# File 'lib/sqewer/worker.rb', line 22

def middleware_stack
  @middleware_stack
end

#num_threadsFixnum (readonly)

Returns the number of worker threads set up for this Worker.

Returns:

  • (Fixnum)

    the number of worker threads set up for this Worker



34
35
36
# File 'lib/sqewer/worker.rb', line 34

def num_threads
  @num_threads
end

#serializerSqewer::Serializer (readonly)

Returns The serializer for unmarshalling and marshalling.

Returns:



19
20
21
# File 'lib/sqewer/worker.rb', line 19

def serializer
  @serializer
end

#stateSymbol (readonly)

Returns the current state of this Worker.

Returns:

  • (Symbol)

    the current state of this Worker



37
38
39
# File 'lib/sqewer/worker.rb', line 37

def state
  @state
end

#submitter_classClass (readonly)

Returns The class used to create the Submitter used by jobs to spawn other jobs.

Returns:

  • (Class)

    The class used to create the Submitter used by jobs to spawn other jobs



28
29
30
# File 'lib/sqewer/worker.rb', line 28

def submitter_class
  @submitter_class
end

#threadsArray<Thread> (readonly)

Returns all the currently running threads of the Worker.

Returns:

  • (Array<Thread>)

    all the currently running threads of the Worker



31
32
33
# File 'lib/sqewer/worker.rb', line 31

def threads
  @threads
end

Class Method Details

.defaultSqewer::Worker

Returns a Worker instance, configured based on the default components

Returns:



42
43
44
# File 'lib/sqewer/worker.rb', line 42

def self.default
  new
end

Instance Method Details

#debug_thread_information!Object

Prints the status and the backtraces of all controlled threads to the logger



187
188
189
190
191
192
# File 'lib/sqewer/worker.rb', line 187

def debug_thread_information!
  @threads.each do | t |
    @logger.debug { t.inspect }
    @logger.debug { t.backtrace }
  end
end

#killObject

Performs a hard shutdown by killing all the threads



178
179
180
181
182
183
184
# File 'lib/sqewer/worker.rb', line 178

def kill
  @state.transition! :stopping
  @logger.info { '[worker] Killing (unclean shutdown), will kill all threads'}
  @threads.map(&:kill)
  @logger.info { '[worker] Stopped'}
  @state.transition! :stopped
end

#startvoid

This method returns an undefined value.

Start listening on the queue, spin up a number of consumer threads that will execute the jobs.

Parameters:

  • num_threads (Fixnum)

    the number of consumer/executor threads to spin up



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/sqewer/worker.rb', line 86

def start
  @state.transition! :starting

  @logger.info { '[worker] Starting with %d consumer threads' % @num_threads }
  @execution_queue = Queue.new

  # Ensure that unhandled exceptions inside threads make the worker fail,
  # to avoid silent failures with no consumer threads running.
  Thread.abort_on_exception = true

  consumers = (1..@num_threads).each_with_index.map do |_, index|
    Thread.new do
      Thread.current[:role] = :consumer
      Thread.current[:id] = index
      loop { take_and_execute }
    end
  end

  # Create the provider thread. When the execution queue is exhausted,
  # grab new messages and place them on the local queue.
  owning_worker = self # self won't be self anymore in the thread
  provider = Thread.new do
    Thread.current[:role] = :provider
    loop do
      begin
        break if stopping?

        if queue_has_capacity?
          messages = @connection.receive_messages
          if messages.any?
            messages.each {|m| @execution_queue << m }
            @logger.debug { "[worker] Received and buffered %d messages" % messages.length } if messages.any?
          else
            @logger.debug { "[worker] No messages received" }
            sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
          end
        else
          @logger.debug { "[worker] Cache is full (%d items), postponing receive" % @execution_queue.length }
          sleep SLEEP_SECONDS_ON_EMPTY_QUEUE
        end
      rescue StandardError => e
        @logger.fatal "Exiting because message receiving thread died. Exception causing this: #{e.inspect}"
        owning_worker.stop # allow any queues and/or running jobs to complete
      end
    end
  end

  # Register the provider separately for the situation where it hangs in `receive_messages` and doesn't
  # terminate from within it's own run loop.
  @provider_thread = provider
  @threads = consumers + [provider]

  # If any of our threads are already dead, it means there is some misconfiguration and startup failed
  if @threads.any?{|t| !t.alive? }
    @threads.map(&:kill)
    @state.transition! :failed
    @logger.fatal { '[worker] Failed to start (one or more threads died on startup)' }
  else
    @state.transition! :running
    @logger.info { '[worker] Started, %d consumer threads' % consumers.length }
  end
end

#stoptrue

Attempts to softly stop the running consumers and the producer. Once the call is made, all the threads will stop after the local cache of messages is emptied. This is to ensure that message drops do not happen just because the worker is about to be terminated.

The call will block until all the threads of the worker are terminated

Returns:

  • (true)


156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/sqewer/worker.rb', line 156

def stop
  @state.transition! :stopping
  @logger.info { '[worker] Stopping (clean shutdown), will wait for local cache to drain. Killing the provider thread now.' }
  @provider_thread.kill
  loop do
    n_live = @threads.select(&:alive?).length
    break if n_live.zero?

    n_dead = @threads.length - n_live
    @logger.info { '[worker] Staged shutdown, %d threads alive, %d have quit, %d jobs in local cache' %
      [n_live, n_dead, @execution_queue.length] }

    sleep 2
  end

  @threads.map(&:join)
  @logger.info { '[worker] Stopped'}
  @state.transition! :stopped
  true
end