Class: Hydra::Worker
- Inherits:
-
Object
- Object
- Hydra::Worker
- Includes:
- Messages::Worker
- Defined in:
- lib/hydra/worker.rb
Overview
Hydra class responsible to dispatching runners and communicating with the master.
The Worker is never run directly by a user. Workers are created by a Master to delegate to Runners.
The general convention is to have one Worker per machine on a distributed network.
Instance Attribute Summary collapse
-
#runners ⇒ Object
readonly
Returns the value of attribute runners.
Instance Method Summary collapse
-
#delegate_file(message) ⇒ Object
When the master sends a file down to the worker, it hits this method.
-
#initialize(opts = {}) ⇒ Worker
constructor
Create a new worker.
- #load_worker_initializer ⇒ Object
-
#relay_results(message, runner) ⇒ Object
When a runner finishes, it sends the results up to the worker.
-
#request_file(message, runner) ⇒ Object
When a runner wants a file, it hits this method with a message.
-
#shutdown ⇒ Object
When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.
Constructor Details
#initialize(opts = {}) ⇒ Worker
Create a new worker.
-
io: The IO object to use to communicate with the master
-
num_runners: The number of runners to launch
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/hydra/worker.rb', line 17 def initialize(opts = {}) @verbose = opts.fetch(:verbose) { false } @io = opts.fetch(:io) { raise "No IO Object" } @runners = [] @listeners = [] load_worker_initializer @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil }) @runner_event_listeners.select{|l| l.is_a? String}.each do |l| @runner_event_listeners.delete_at(@runner_event_listeners.index(l)) listener = eval(l) @runner_event_listeners << listener if listener.is_a?(Hydra::RunnerListener::Abstract) end @runner_log_file = opts.fetch(:runner_log_file) { nil } boot_runners(opts.fetch(:runners) { 1 }) @io.write(Hydra::Messages::Worker::WorkerBegin.new) @runners.each{|r| Process.wait r[:pid] } end |
Instance Attribute Details
#runners ⇒ Object (readonly)
Returns the value of attribute runners.
13 14 15 |
# File 'lib/hydra/worker.rb', line 13 def runners @runners end |
Instance Method Details
#delegate_file(message) ⇒ Object
When the master sends a file down to the worker, it hits this method. Then the worker delegates the file down to a runner.
61 62 63 64 65 |
# File 'lib/hydra/worker.rb', line 61 def delegate_file() runner = idle_runner runner[:idle] = false runner[:io].write(RunFile.new(eval(.serialize))) end |
#load_worker_initializer ⇒ Object
41 42 43 44 45 46 47 48 |
# File 'lib/hydra/worker.rb', line 41 def load_worker_initializer if File.exist?('./hydra_worker_init.rb') trace('Requiring hydra_worker_init.rb') require 'hydra_worker_init' else trace('hydra_worker_init.rb not present') end end |
#relay_results(message, runner) ⇒ Object
When a runner finishes, it sends the results up to the worker. Then the worker sends the results up to the master.
69 70 71 72 |
# File 'lib/hydra/worker.rb', line 69 def relay_results(, runner) runner[:idle] = true @io.write(Results.new(eval(.serialize))) end |
#request_file(message, runner) ⇒ Object
When a runner wants a file, it hits this method with a message. Then the worker bubbles the file request up to the master.
54 55 56 57 |
# File 'lib/hydra/worker.rb', line 54 def request_file(, runner) @io.write(RequestFile.new) runner[:idle] = true end |
#shutdown ⇒ Object
When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/hydra/worker.rb', line 76 def shutdown @running = false trace "Notifying #{@runners.size} Runners of Shutdown" @runners.each do |r| trace "Sending Shutdown to Runner" trace "\t#{r.inspect}" r[:io].write(Shutdown.new) end Thread.exit end |