Class: Hydra::Worker

Inherits:
Object
  • Object
show all
Includes:
Messages::Worker
Defined in:
lib/hydra/worker.rb

Overview

Hydra class responsible to dispatching runners and communicating with the master.

The Worker is never run directly by a user. Workers are created by a Master to delegate to Runners.

The general convention is to have one Worker per machine on a distributed network.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Worker

Create a new worker.

  • io: The IO object to use to communicate with the master

  • num_runners: The number of runners to launch



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/hydra/worker.rb', line 17

def initialize(opts = {})
  @verbose = opts.fetch(:verbose) { false }
  @io = opts.fetch(:io) { raise "No IO Object" }
  @runners = []
  @listeners = []
  @options = opts.fetch(:options) { "" }

  load_worker_initializer

  @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil })

  @runner_event_listeners.select{|l| l.is_a? String}.each do |l|
    @runner_event_listeners.delete_at(@runner_event_listeners.index(l))
    listener = Array.wrap eval(l)

    @runner_event_listeners << listener[0] if listener[0].is_a?(Hydra::RunnerListener::Abstract)
  end

  @runner_log_file = opts.fetch(:runner_log_file) { nil }

  boot_runners(opts.fetch(:runners) { 1 })
  @io.write(Hydra::Messages::Worker::WorkerBegin.new)

  process_messages

  @runners.each{|r| Process.wait r[:pid] }
end

Instance Attribute Details

#runnersObject (readonly)

Returns the value of attribute runners.



13
14
15
# File 'lib/hydra/worker.rb', line 13

def runners
  @runners
end

Instance Method Details

#delegate_file(message) ⇒ Object

When the master sends a file down to the worker, it hits this method. Then the worker delegates the file down to a runner.



65
66
67
68
69
# File 'lib/hydra/worker.rb', line 65

def delegate_file(message)
  runner = idle_runner
  runner[:idle] = false
  runner[:io].write(RunFile.new(eval(message.serialize)))
end

#load_worker_initializerObject



45
46
47
48
49
50
51
52
# File 'lib/hydra/worker.rb', line 45

def load_worker_initializer
  if File.exist?('./hydra_worker_init.rb')
    trace('Requiring hydra_worker_init.rb')
    require './hydra_worker_init'
  else
    trace('hydra_worker_init.rb not present')
  end
end

#relay_results(message, runner) ⇒ Object

When a runner finishes, it sends the results up to the worker. Then the worker sends the results up to the master.



73
74
75
76
# File 'lib/hydra/worker.rb', line 73

def relay_results(message, runner)
  runner[:idle] = true
  @io.write(Results.new(eval(message.serialize)))
end

#request_file(message, runner) ⇒ Object

When a runner wants a file, it hits this method with a message. Then the worker bubbles the file request up to the master.



58
59
60
61
# File 'lib/hydra/worker.rb', line 58

def request_file(message, runner)
  @io.write(RequestFile.new)
  runner[:idle] = true
end

#shutdownObject

When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.



80
81
82
83
84
85
86
87
88
89
# File 'lib/hydra/worker.rb', line 80

def shutdown
  @running = false
  trace "Notifying #{@runners.size} Runners of Shutdown"
  @runners.each do |r|
    trace "Sending Shutdown to Runner"
    trace "\t#{r.inspect}"
    r[:io].write(Shutdown.new)
  end
  Thread.exit
end