Class: Sisyphus::Master
- Inherits:
-
Object
- Object
- Sisyphus::Master
- Defined in:
- lib/sisyphus/master.rb
Constant Summary collapse
- IO_TIMEOUT =
10
- HANDLED_SIGNALS =
[:INT, :TTIN, :TTOU]
Instance Attribute Summary collapse
-
#job ⇒ Object
readonly
Returns the value of attribute job.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#number_of_workers ⇒ Object
readonly
Returns the value of attribute number_of_workers.
Instance Method Summary collapse
-
#initialize(job, options = {}) ⇒ Master
constructor
A new instance of Master.
- #spawn_worker ⇒ Object
- #start ⇒ Object
- #start_worker(worker) ⇒ Object
- #stop_all ⇒ Object
- #stop_worker(wpid) ⇒ Object
- #worker_count ⇒ Object
Constructor Details
#initialize(job, options = {}) ⇒ Master
Returns a new instance of Master.
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/sisyphus/master.rb', line 14 def initialize(job, = {}) self.number_of_workers = .fetch :workers, 0 @logger = .fetch(:logger) { NullLogger.new } @execution_strategy = .fetch(:execution_strategy) { ForkingExecutionStrategy } @workers = [] @job = job self_reader, self_writer = IO.pipe @selfpipe = { reader: self_reader, writer: self_writer } Thread.main[:signal_queue] = [] end |
Instance Attribute Details
#job ⇒ Object (readonly)
Returns the value of attribute job.
12 13 14 |
# File 'lib/sisyphus/master.rb', line 12 def job @job end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/sisyphus/master.rb', line 12 def logger @logger end |
#number_of_workers ⇒ Object
Returns the value of attribute number_of_workers.
12 13 14 |
# File 'lib/sisyphus/master.rb', line 12 def number_of_workers @number_of_workers end |
Instance Method Details
#spawn_worker ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/sisyphus/master.rb', line 37 def spawn_worker reader, writer = IO.pipe if wpid = fork writer.close workers << { pid: wpid, reader: reader } else reader.close self.process_name = "Worker #{Process.pid}" worker = create_worker(writer) start_worker worker end end |
#start ⇒ Object
27 28 29 30 31 32 33 34 35 |
# File 'lib/sisyphus/master.rb', line 27 def start trap_signals number_of_workers.times do spawn_worker sleep rand(1000).fdiv(1000) end puts "Sisyphus::Master started with PID: #{Process.pid}" watch_for_output end |
#start_worker(worker) ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/sisyphus/master.rb', line 50 def start_worker(worker) worker.setup worker.start rescue Exception => e worker.error_handler.call logger.warn(process_name) { e } exit! 0 end |
#stop_all ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/sisyphus/master.rb', line 65 def stop_all workers.each do |worker| stop_worker worker.fetch(:pid) end begin Timeout.timeout(30) do watch_for_shutdown while worker_count > 0 end rescue e p "Timeout reached:", e end end |
#stop_worker(wpid) ⇒ Object
59 60 61 62 63 |
# File 'lib/sisyphus/master.rb', line 59 def stop_worker(wpid) if workers.find { |w| w.fetch(:pid) == wpid } Process.kill 'INT', wpid rescue Errno::ESRCH # Ignore if the process is already gone end end |
#worker_count ⇒ Object
78 79 80 |
# File 'lib/sisyphus/master.rb', line 78 def worker_count workers.length end |