Class: Delayed::Pool

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/delayed/pool.rb

Constant Summary collapse

SIGNALS =
%i[INT TERM QUIT].freeze
POOL_SLEEP_PERIOD =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

logger, #logger, #say

Constructor Details

#initialize(*args) ⇒ Pool

Returns a new instance of Pool.



15
16
17
18
19
20
21
22
23
24
# File 'lib/delayed/pool.rb', line 15

def initialize(*args)
  if args.first.is_a?(Hash)
    @config = args.first
  else
    warn "Calling Delayed::Pool.new directly is deprecated. Use `Delayed::CLI.new.run()` instead."
  end
  @workers = {}
  @signal_queue = []
  @self_pipe = IO.pipe
end

Instance Attribute Details

#workersObject (readonly)

Returns the value of attribute workers.



13
14
15
# File 'lib/delayed/pool.rb', line 13

def workers
  @workers
end

Instance Method Details

#runObject



26
27
28
29
# File 'lib/delayed/pool.rb', line 26

def run
  warn "Delayed::Pool#run is deprecated and will be removed. Use `Delayed::CLI.new.run()` instead."
  Delayed::CLI.new.run
end

#startObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/delayed/pool.rb', line 31

def start
  say "Started job master", :info
  SIGNALS.each do |sig|
    trap(sig) do
      @signal_queue << sig
      wake_up
    end
  end
  $0 = procname
  # fork to handle unlocking (to prevent polluting the parent with worker objects)
  unlock_pid = fork_with_reconnects do
    unlock_orphaned_jobs
  end
  Process.wait unlock_pid

  spawn_periodic_auditor
  spawn_abandoned_job_cleanup
  spawn_all_workers
  say "Workers spawned"
  join
  say "Shutting down"
  stop
  reap_all_children
rescue Exception => e # rubocop:disable Lint/RescueException
  say "Job master died with error: #{e.inspect}\n#{e.backtrace.join("\n")}", :fatal
  raise
end