Class: Hydra::Worker
- Inherits:
-
Object
- Object
- Hydra::Worker
- Includes:
- Messages::Worker
- Defined in:
- lib/hydra/worker.rb
Overview
Hydra class responsible to dispatching runners and communicating with the master.
The Worker is never run directly by a user. Workers are created by a Master to delegate to Runners.
The general convention is to have one Worker per machine on a distributed network.
Instance Attribute Summary collapse
-
#runners ⇒ Object
readonly
Returns the value of attribute runners.
Instance Method Summary collapse
-
#delegate_file(message) ⇒ Object
When the master sends a file down to the worker, it hits this method.
-
#initialize(opts = {}) ⇒ Worker
constructor
Create a new worker.
- #load_worker_initializer ⇒ Object
-
#relay_results(message, runner) ⇒ Object
When a runner finishes, it sends the results up to the worker.
-
#request_file(message, runner) ⇒ Object
When a runner wants a file, it hits this method with a message.
-
#shutdown ⇒ Object
When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.
Constructor Details
#initialize(opts = {}) ⇒ Worker
Create a new worker.
-
io: The IO object to use to communicate with the master
-
num_runners: The number of runners to launch
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/hydra/worker.rb', line 17 def initialize(opts = {}) @verbose = opts.fetch(:verbose) { false } @io = opts.fetch(:io) { raise "No IO Object" } @runners = [] @listeners = [] @options = opts.fetch(:options) { "" } load_worker_initializer @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil }) @runner_event_listeners.select{|l| l.is_a? String}.each do |l| @runner_event_listeners.delete_at(@runner_event_listeners.index(l)) listener = Array.wrap eval(l) @runner_event_listeners << listener[0] if listener[0].is_a?(Hydra::RunnerListener::Abstract) end @runner_log_file = opts.fetch(:runner_log_file) { nil } boot_runners(opts.fetch(:runners) { 1 }) @io.write(Hydra::Messages::Worker::WorkerBegin.new) @runners.each{|r| Process.wait r[:pid] } end |
Instance Attribute Details
#runners ⇒ Object (readonly)
Returns the value of attribute runners.
13 14 15 |
# File 'lib/hydra/worker.rb', line 13 def runners @runners end |
Instance Method Details
#delegate_file(message) ⇒ Object
When the master sends a file down to the worker, it hits this method. Then the worker delegates the file down to a runner.
65 66 67 68 69 |
# File 'lib/hydra/worker.rb', line 65 def delegate_file() runner = idle_runner runner[:idle] = false runner[:io].write(RunFile.new(eval(.serialize))) end |
#load_worker_initializer ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/hydra/worker.rb', line 45 def load_worker_initializer if File.exist?('./hydra_worker_init.rb') trace('Requiring hydra_worker_init.rb') require './hydra_worker_init' else trace('hydra_worker_init.rb not present') end end |
#relay_results(message, runner) ⇒ Object
When a runner finishes, it sends the results up to the worker. Then the worker sends the results up to the master.
73 74 75 76 |
# File 'lib/hydra/worker.rb', line 73 def relay_results(, runner) runner[:idle] = true @io.write(Results.new(eval(.serialize))) end |
#request_file(message, runner) ⇒ Object
When a runner wants a file, it hits this method with a message. Then the worker bubbles the file request up to the master.
58 59 60 61 |
# File 'lib/hydra/worker.rb', line 58 def request_file(, runner) @io.write(RequestFile.new) runner[:idle] = true end |
#shutdown ⇒ Object
When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/hydra/worker.rb', line 80 def shutdown @running = false trace "Notifying #{@runners.size} Runners of Shutdown" @runners.each do |r| trace "Sending Shutdown to Runner" trace "\t#{r.inspect}" r[:io].write(Shutdown.new) end Thread.exit end |