Class: Sisyphus::Master

Inherits:
Object
  • Object
show all
Defined in:
lib/sisyphus/master.rb

Constant Summary collapse

IO_TIMEOUT =
10
HANDLED_SIGNALS =
[:INT, :TTIN, :TTOU]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job, options = {}) ⇒ Master

Returns a new instance of Master.



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/sisyphus/master.rb', line 14

def initialize(job, options = {})
  self.number_of_workers = options.fetch :workers, 0
  @logger = options.fetch(:logger) { NullLogger.new }
  @execution_strategy = options.fetch(:execution_strategy) { ForkingExecutionStrategy }
  @workers = []
  @job = job

  self_reader, self_writer = IO.pipe
  @selfpipe = { reader: self_reader, writer: self_writer }

  Thread.main[:signal_queue] = []
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job.



12
13
14
# File 'lib/sisyphus/master.rb', line 12

def job
  @job
end

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/sisyphus/master.rb', line 12

def logger
  @logger
end

#number_of_workersObject

Returns the value of attribute number_of_workers.



12
13
14
# File 'lib/sisyphus/master.rb', line 12

def number_of_workers
  @number_of_workers
end

Instance Method Details

#spawn_workerObject



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/sisyphus/master.rb', line 37

def spawn_worker
  reader, writer = IO.pipe
  if wpid = fork
    writer.close
    workers << { pid: wpid, reader: reader }
  else
    reader.close
    self.process_name = "Worker #{Process.pid}"
    worker = create_worker(writer)
    start_worker worker
  end
end

#startObject



27
28
29
30
31
32
33
34
35
# File 'lib/sisyphus/master.rb', line 27

def start
  trap_signals
  number_of_workers.times do
    spawn_worker
    sleep rand(1000).fdiv(1000)
  end
  puts "Sisyphus::Master started with PID: #{Process.pid}"
  watch_for_output
end

#start_worker(worker) ⇒ Object



50
51
52
53
54
55
56
57
# File 'lib/sisyphus/master.rb', line 50

def start_worker(worker)
  worker.setup
  worker.start
rescue Exception => e
  worker.error_handler.call
  logger.warn(process_name) { e }
  exit! 0
end

#stop_allObject



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/sisyphus/master.rb', line 65

def stop_all
  workers.each do |worker|
    stop_worker worker.fetch(:pid)
  end
  begin
    Timeout.timeout(30) do
      watch_for_shutdown while worker_count > 0
    end
  rescue e
    p "Timeout reached:", e
  end
end

#stop_worker(wpid) ⇒ Object



59
60
61
62
63
# File 'lib/sisyphus/master.rb', line 59

def stop_worker(wpid)
  if workers.find { |w| w.fetch(:pid) == wpid }
    Process.kill 'INT', wpid rescue Errno::ESRCH # Ignore if the process is already gone
  end
end

#worker_countObject



78
79
80
# File 'lib/sisyphus/master.rb', line 78

def worker_count
  workers.length
end