Class: Workhorse::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/worker.rb

Constant Summary collapse

LOG_LEVELS =
%i[fatal error warn info debug].freeze
SHUTDOWN_SIGNALS =
%w[TERM INT].freeze
LOG_REOPEN_SIGNAL =
'HUP'.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker

Instantiates a new worker. The worker is not automatically started.

Parameters:

  • queues (Array) (defaults to: [])

    The queues you want this worker to process. If an empty array is given, any queues will be processed. Queues need to be specified as a symbol. To also process jobs without a queue, supply ‘nil` within the array.

  • pool_size (Integer) (defaults to: nil)

    The number of jobs that will be processed simultaneously. If this parameter is not given, it will be set to the number of given queues + 1.

  • polling_interval (Integer) (defaults to: 300)

    Interval in seconds the database will be polled for new jobs. Set this as high as possible to avoid unnecessary database load. Defaults to 5 minutes.

  • auto_terminate (Boolean) (defaults to: true)

    Whether to automatically shut down the worker properly on INT and TERM signals.

  • quiet (Boolean) (defaults to: true)

    If this is set to ‘false`, the worker will also log to STDOUT.

  • instant_repolling (Boolean) (defaults to: false)

    If this is set to ‘true`, the worker immediately re-polls for new jobs when a job execution has finished.

  • logger (Logger) (defaults to: nil)

    An optional logger the worker will append to. This can be any instance of ruby’s ‘Logger` but is commonly set to `Rails.logger`.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/workhorse/worker.rb', line 50

def initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil)
  @queues = queues
  @pool_size = pool_size || (queues.size + 1)
  @polling_interval = polling_interval
  @auto_terminate = auto_terminate
  @state = :initialized
  @quiet = quiet

  @mutex = Mutex.new
  @pool = Pool.new(@pool_size)
  @poller = Workhorse::Poller.new(self, proc { check_memory })
  @logger = logger

  unless (@polling_interval / 0.1).round(2).modulo(1).zero?
    fail 'Polling interval must be a multiple of 0.1.'
  end

  if instant_repolling
    @pool.on_idle { @poller.instant_repoll! }
  end
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/workhorse/worker.rb', line 12

def logger
  @logger
end

#mutexObject (readonly)

Returns the value of attribute mutex.



11
12
13
# File 'lib/workhorse/worker.rb', line 11

def mutex
  @mutex
end

#pollerObject (readonly)

Returns the value of attribute poller.



13
14
15
# File 'lib/workhorse/worker.rb', line 13

def poller
  @poller
end

#polling_intervalObject (readonly)

Returns the value of attribute polling_interval.



10
11
12
# File 'lib/workhorse/worker.rb', line 10

def polling_interval
  @polling_interval
end

#pool_sizeObject (readonly)

Returns the value of attribute pool_size.



9
10
11
# File 'lib/workhorse/worker.rb', line 9

def pool_size
  @pool_size
end

#queuesObject (readonly)

Returns the value of attribute queues.



7
8
9
# File 'lib/workhorse/worker.rb', line 7

def queues
  @queues
end

#stateObject (readonly)

Returns the value of attribute state.



8
9
10
# File 'lib/workhorse/worker.rb', line 8

def state
  @state
end

Class Method Details

.shutdown_file_for(pid) ⇒ Object



24
25
26
27
# File 'lib/workhorse/worker.rb', line 24

def self.shutdown_file_for(pid)
  return nil unless defined?(Rails)
  Rails.root.join('tmp', 'pids', "workhorse.#{pid}.shutdown")
end

.start_and_wait(**args) ⇒ Object

Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).



17
18
19
20
21
# File 'lib/workhorse/worker.rb', line 17

def self.start_and_wait(**args)
  worker = new(**args)
  worker.start
  worker.wait
end

Instance Method Details

#assert_state!(state) ⇒ Object



107
108
109
# File 'lib/workhorse/worker.rb', line 107

def assert_state!(state)
  fail "Expected worker to be in state #{state} but current state is #{self.state}." unless self.state == state
end

#hostnameObject



88
89
90
# File 'lib/workhorse/worker.rb', line 88

def hostname
  @hostname ||= Socket.gethostname
end

#idObject



80
81
82
# File 'lib/workhorse/worker.rb', line 80

def id
  @id ||= "#{hostname}.#{pid}.#{SecureRandom.hex(3)}"
end

#idleObject



140
141
142
# File 'lib/workhorse/worker.rb', line 140

def idle
  @pool.idle
end

#log(text, level = :info) ⇒ Object



72
73
74
75
76
77
78
# File 'lib/workhorse/worker.rb', line 72

def log(text, level = :info)
  text = "[Job worker #{id}] #{text}"
  puts text unless @quiet
  return unless logger
  fail "Log level #{level} is not available. Available are #{LOG_LEVELS.inspect}." unless LOG_LEVELS.include?(level)
  logger.send(level, text.strip)
end

#perform(db_job_id) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/workhorse/worker.rb', line 144

def perform(db_job_id)
  begin # rubocop:disable Style/RedundantBegin
    mutex.synchronize do
      assert_state! :running
      log "Posting job #{db_job_id} to thread pool"

      @pool.post do
        begin # rubocop:disable Style/RedundantBegin
          Workhorse::Performer.new(db_job_id, self).perform
        rescue Exception => e
          log %(#{e.message}\n#{e.backtrace.join("\n")}), :error
          Workhorse.on_exception.call(e)
        end
      end
    end
  rescue Exception => e
    Workhorse.on_exception.call(e)
  end
end

#pidObject



84
85
86
# File 'lib/workhorse/worker.rb', line 84

def pid
  @pid ||= Process.pid
end

#shutdownObject

Shuts down worker and DB poller. Jobs currently beeing processed are properly finished before this method returns. Subsequent calls to this method are ignored.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/workhorse/worker.rb', line 114

def shutdown
  # This is safe to be checked outside of the mutex as 'shutdown' is the
  # final state this worker can be in.
  return if @state == :shutdown

  mutex.synchronize do
    assert_state! :running

    log 'Shutting down'
    @state = :shutdown

    @poller.shutdown
    @pool.shutdown
    log 'Shut down'
  end
end

#startObject

Starts the worker. This call is not blocking - call #wait for this purpose.



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/workhorse/worker.rb', line 94

def start
  mutex.synchronize do
    assert_state! :initialized
    log 'Starting up'
    @state = :running
    @poller.start
    log 'Started up'

    trap_termination if @auto_terminate
    trap_log_reopen
  end
end

#waitObject

Waits until the worker is shut down. This only happens if shutdown gets called - either by another thread or by enabling ‘auto_terminate` and receiving a respective signal. Use this method to let worker run undefinitely.



135
136
137
138
# File 'lib/workhorse/worker.rb', line 135

def wait
  @poller.wait
  @pool.wait
end