Class: CloudCrowd::Worker
- Inherits:
-
Object
- Object
- CloudCrowd::Worker
- Defined in:
- lib/cloud_crowd/worker.rb
Overview
The Worker, forked off from the Node when a new WorkUnit is received, launches an Action for processing. Workers will only ever receive WorkUnits that they are able to handle (for which they have a corresponding action in their actions directory). If communication with the central server is interrupted, the Worker will repeatedly attempt to complete its unit – every Worker::RETRY_WAIT seconds. Any exceptions that take place during the course of the Action will cause the Worker to mark the WorkUnit as having failed. When finished, the Worker’s process exits, minimizing the potential for memory leaks.
Constant Summary collapse
- RETRY_WAIT =
Wait five seconds to retry, after internal communcication errors.
5
Instance Attribute Summary collapse
-
#node ⇒ Object
readonly
Returns the value of attribute node.
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#unit ⇒ Object
readonly
Returns the value of attribute unit.
Instance Method Summary collapse
-
#complete_work_unit(result) ⇒ Object
Return output to the central server, marking the WorkUnit done.
-
#display_work_unit ⇒ Object
Loggable details describing what the Worker is up to.
-
#enhanced_unit_options ⇒ Object
There are some potentially important attributes of the WorkUnit that we’d like to pass into the Action – in case it needs to know them.
-
#fail_work_unit(exception) ⇒ Object
Mark the WorkUnit failed, returning the exception to central.
-
#initialize(node, unit) ⇒ Worker
constructor
A new Worker customizes itself to its WorkUnit at instantiation.
-
#keep_trying_to(title) ⇒ Object
We expect and require internal communication between the central server and the workers to succeed.
-
#run ⇒ Object
Wraps run_work_unit to benchmark the execution time, if requested.
-
#run_work_unit ⇒ Object
Executes the WorkUnit by running the Action, catching all exceptions as failures.
-
#time_taken ⇒ Object
How long has this worker been running for?.
Constructor Details
#initialize(node, unit) ⇒ Worker
A new Worker customizes itself to its WorkUnit at instantiation.
20 21 22 23 24 25 26 27 |
# File 'lib/cloud_crowd/worker.rb', line 20 def initialize(node, unit) @start_time = Time.now @pid = $$ @node = node @unit = unit @status = @unit['status'] @retry_wait = RETRY_WAIT end |
Instance Attribute Details
#node ⇒ Object (readonly)
Returns the value of attribute node.
17 18 19 |
# File 'lib/cloud_crowd/worker.rb', line 17 def node @node end |
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
17 18 19 |
# File 'lib/cloud_crowd/worker.rb', line 17 def pid @pid end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
17 18 19 |
# File 'lib/cloud_crowd/worker.rb', line 17 def status @status end |
#unit ⇒ Object (readonly)
Returns the value of attribute unit.
17 18 19 |
# File 'lib/cloud_crowd/worker.rb', line 17 def unit @unit end |
Instance Method Details
#complete_work_unit(result) ⇒ Object
Return output to the central server, marking the WorkUnit done.
30 31 32 33 34 35 36 |
# File 'lib/cloud_crowd/worker.rb', line 30 def complete_work_unit(result) "complete work unit" do data = base_params.merge({:status => 'succeeded', :output => result}) @node.server["/work/#{data[:id]}"].put(data) log "finished #{display_work_unit} in #{data[:time]} seconds" end end |
#display_work_unit ⇒ Object
Loggable details describing what the Worker is up to.
65 66 67 |
# File 'lib/cloud_crowd/worker.rb', line 65 def display_work_unit "unit ##{@unit['id']} (#{@unit['action']}/#{CloudCrowd.display_status(@status)})" end |
#enhanced_unit_options ⇒ Object
There are some potentially important attributes of the WorkUnit that we’d like to pass into the Action – in case it needs to know them. They will always be made available in the options hash.
107 108 109 110 111 112 113 |
# File 'lib/cloud_crowd/worker.rb', line 107 def @unit['options'].merge({ 'job_id' => @unit['job_id'], 'work_unit_id' => @unit['id'], 'attempts' => @unit['attempts'] }) end |
#fail_work_unit(exception) ⇒ Object
Mark the WorkUnit failed, returning the exception to central.
39 40 41 42 43 44 45 |
# File 'lib/cloud_crowd/worker.rb', line 39 def fail_work_unit(exception) "mark work unit as failed" do data = base_params.merge({:status => 'failed', :output => {'output' => exception.}.to_json}) @node.server["/work/#{data[:id]}"].put(data) log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" end end |
#keep_trying_to(title) ⇒ Object
We expect and require internal communication between the central server and the workers to succeed. If it fails for any reason, log it, and then keep trying the same request.
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/cloud_crowd/worker.rb', line 50 def (title) begin yield rescue RestClient::ResourceNotFound => e log "work unit ##{@unit['id']} doesn't exist. discarding..." rescue Exception => e log "failed to #{title} -- retry in #{@retry_wait} seconds" log e. log e.backtrace sleep @retry_wait retry end end |
#run ⇒ Object
Wraps run_work_unit to benchmark the execution time, if requested.
97 98 99 100 101 102 |
# File 'lib/cloud_crowd/worker.rb', line 97 def run trap_signals log "starting #{display_work_unit}" return run_work_unit unless @unit['options']['benchmark'] log("ran #{display_work_unit} in " + Benchmark.measure { run_work_unit }.to_s) end |
#run_work_unit ⇒ Object
Executes the WorkUnit by running the Action, catching all exceptions as failures. We capture the thread so that we can kill it from the outside, when exiting.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/cloud_crowd/worker.rb', line 72 def run_work_unit @worker_thread = Thread.new do begin result = nil action_class = CloudCrowd.actions[@unit['action']] action = action_class.new(@status, @unit['input'], , @node.asset_store) Dir.chdir(action.work_directory) do result = case @status when PROCESSING then action.process when SPLITTING then action.split when MERGING then action.merge else raise Error::StatusUnspecified, "work units must specify their status" end end complete_work_unit({'output' => result}.to_json) rescue Exception => e fail_work_unit(e) ensure action.cleanup_work_directory if action end end @worker_thread.join end |
#time_taken ⇒ Object
How long has this worker been running for?
116 117 118 |
# File 'lib/cloud_crowd/worker.rb', line 116 def time_taken Time.now - @start_time end |