Class: CloudCrowd::Worker
- Inherits:
-
Object
- Object
- CloudCrowd::Worker
- Defined in:
- lib/cloud_crowd/worker.rb
Overview
The Worker, forked off from the Node when a new WorkUnit is received, launches an Action for processing. Workers will only ever receive WorkUnits that they are able to handle (for which they have a corresponding action in their actions directory). If communication with the central server is interrupted, the Worker will repeatedly attempt to complete its unit – every Worker::RETRY_WAIT seconds. Any exceptions that take place during the course of the Action will cause the Worker to mark the WorkUnit as having failed. When finished, the Worker’s process exits, minimizing the potential for memory leaks.
Constant Summary collapse
- RETRY_WAIT =
Wait five seconds to retry, after internal communcication errors.
5
Instance Attribute Summary collapse
-
#node ⇒ Object
readonly
Returns the value of attribute node.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#unit ⇒ Object
readonly
Returns the value of attribute unit.
Instance Method Summary collapse
-
#base_params ⇒ Object
private
Common parameters to send back to central upon unit completion, regardless of success or failure.
-
#complete_work_unit(result) ⇒ Object
Return output to the central server, marking the WorkUnit done.
-
#display_work_unit ⇒ Object
Loggable details describing what the Worker is up to.
-
#enhanced_unit_options ⇒ Object
There are some potentially important attributes of the WorkUnit that we’d like to pass into the Action – in case it needs to know them.
-
#fail_work_unit(exception) ⇒ Object
Mark the WorkUnit failed, returning the exception to central.
-
#initialize(node, unit) ⇒ Worker
constructor
A new Worker customizes itself to its WorkUnit at instantiation.
-
#keep_trying_to(title) ⇒ Object
We expect and require internal communication between the central server and the workers to succeed.
-
#log(message) ⇒ Object
private
Log a message to the daemon log.
-
#run ⇒ Object
Run this worker inside of a fork.
-
#run_work_unit ⇒ Object
Executes the WorkUnit by running the Action, catching all exceptions as failures.
-
#time_taken ⇒ Object
How long has this worker been running for?.
-
#trap_signals ⇒ Object
private
When signaled to exit, make sure that the Worker shuts down without firing the Node’s at_exit callbacks.
Constructor Details
#initialize(node, unit) ⇒ Worker
A new Worker customizes itself to its WorkUnit at instantiation.
22 23 24 25 26 27 28 29 30 |
# File 'lib/cloud_crowd/worker.rb', line 22
def initialize(node, unit)
@start_time = Time.now
@pid = $$
@node = node
@unit = unit
@status = @unit['status']
@retry_wait = RETRY_WAIT
$0 = "#{unit['action']} (#{unit['id']}) [cloud-crowd-worker]"
end
|
Instance Attribute Details
#node ⇒ Object (readonly)
Returns the value of attribute node.
19 20 21 |
# File 'lib/cloud_crowd/worker.rb', line 19
def node
@node
end
|
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
19 20 21 |
# File 'lib/cloud_crowd/worker.rb', line 19
def pid
@pid
end
|
#status ⇒ Object (readonly)
Returns the value of attribute status.
19 20 21 |
# File 'lib/cloud_crowd/worker.rb', line 19
def status
@status
end
|
#unit ⇒ Object (readonly)
Returns the value of attribute unit.
19 20 21 |
# File 'lib/cloud_crowd/worker.rb', line 19
def unit
@unit
end
|
Instance Method Details
#base_params ⇒ Object (private)
Common parameters to send back to central upon unit completion, regardless of success or failure.
131 132 133 134 135 |
# File 'lib/cloud_crowd/worker.rb', line 131
def base_params
{ :pid => @pid,
:id => @unit['id'],
:time => time_taken }
end
|
#complete_work_unit(result) ⇒ Object
Return output to the central server, marking the WorkUnit done.
33 34 35 36 37 38 39 |
# File 'lib/cloud_crowd/worker.rb', line 33
def complete_work_unit(result)
keep_trying_to "complete work unit" do
data = base_params.merge({:status => 'succeeded', :output => result})
@node.central["/work/#{data[:id]}"].put(data)
log "finished #{display_work_unit} in #{data[:time]} seconds"
end
end
|
#display_work_unit ⇒ Object
Loggable details describing what the Worker is up to.
68 69 70 |
# File 'lib/cloud_crowd/worker.rb', line 68
def display_work_unit
"unit ##{@unit['id']} (#{@unit['action']}/#{CloudCrowd.display_status(@status)})"
end
|
#enhanced_unit_options ⇒ Object
There are some potentially important attributes of the WorkUnit that we’d like to pass into the Action – in case it needs to know them. They will always be made available in the options hash.
113 114 115 116 117 118 119 |
# File 'lib/cloud_crowd/worker.rb', line 113
def enhanced_unit_options
@unit['options'].merge({
'job_id' => @unit['job_id'],
'work_unit_id' => @unit['id'],
'attempts' => @unit['attempts']
})
end
|
#fail_work_unit(exception) ⇒ Object
Mark the WorkUnit failed, returning the exception to central.
42 43 44 45 46 47 48 |
# File 'lib/cloud_crowd/worker.rb', line 42
def fail_work_unit(exception)
keep_trying_to "mark work unit as failed" do
data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json})
@node.central["/work/#{data[:id]}"].put(data)
log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}"
end
end
|
#keep_trying_to(title) ⇒ Object
We expect and require internal communication between the central server and the workers to succeed. If it fails for any reason, log it, and then keep trying the same request.
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/cloud_crowd/worker.rb', line 53
def keep_trying_to(title)
begin
yield
rescue RestClient::ResourceNotFound => e
log "work unit ##{@unit['id']} doesn't exist. discarding..."
rescue Exception => e
log "failed to #{title} -- retry in #{@retry_wait} seconds"
log e.message
log e.backtrace
sleep @retry_wait
retry
end
end
|
#log(message) ⇒ Object (private)
Log a message to the daemon log. Includes PID for identification.
138 139 140 |
# File 'lib/cloud_crowd/worker.rb', line 138
def log(message)
puts "Worker ##{@pid}: #{message}" unless ENV['RACK_ENV'] == 'test'
end
|
#run ⇒ Object
Run this worker inside of a fork. Attempts to exit cleanly. Wraps run_work_unit to benchmark the execution time, if requested.
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/cloud_crowd/worker.rb', line 99
def run
trap_signals
log "starting #{display_work_unit}"
if @unit['options']['benchmark']
log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s)
else
run_work_unit
end
Process.exit!
end
|
#run_work_unit ⇒ Object
Executes the WorkUnit by running the Action, catching all exceptions as failures. We capture the thread so that we can kill it from the outside, when exiting.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/cloud_crowd/worker.rb', line 75
def run_work_unit
begin
result = nil
action_class = CloudCrowd.actions[@unit['action']]
action = action_class.new(@status, @unit['input'], enhanced_unit_options, @node.asset_store)
Dir.chdir(action.work_directory) do
result = case @status
when PROCESSING then action.process
when SPLITTING then action.split
when MERGING then action.merge
else raise Error::StatusUnspecified, "work units must specify their status"
end
end
action.cleanup_work_directory if action
complete_work_unit({'output' => result}.to_json)
rescue Exception => e
action.cleanup_work_directory if action
fail_work_unit(e)
end
@node.resolve_work(@unit['id'])
end
|
#time_taken ⇒ Object
How long has this worker been running for?
122 123 124 |
# File 'lib/cloud_crowd/worker.rb', line 122
def time_taken
Time.now - @start_time
end
|
#trap_signals ⇒ Object (private)
When signaled to exit, make sure that the Worker shuts down without firing the Node’s at_exit callbacks.
144 145 146 147 148 |
# File 'lib/cloud_crowd/worker.rb', line 144
def trap_signals
Signal.trap('QUIT') { Process.exit! }
Signal.trap('INT') { Process.exit! }
Signal.trap('TERM') { Process.exit! }
end
|