Class: CloudCrowd::WorkUnit

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
ModelStatus
Defined in:
lib/cloud_crowd/models/work_unit.rb

Overview

A WorkUnit is an atomic chunk of work from a job, processing a single input through a single action. The WorkUnits are run in parallel, with each worker daemon processing one at a time. The splitting and merging stages of a job are each run as a single WorkUnit.

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ModelStatus

#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?

Class Method Details

.cancel_all_reservationsObject

Cancels all outstanding WorkUnit reservations for all processes. (Useful in the console for debugging.)



56
57
58
# File 'lib/cloud_crowd/models/work_unit.rb', line 56

def self.cancel_all_reservations
  WorkUnit.update_all('reservation = null')
end

.cancel_reservationsObject

Cancels all outstanding WorkUnit reservations for this process.



50
51
52
# File 'lib/cloud_crowd/models/work_unit.rb', line 50

def self.cancel_reservations
  WorkUnit.reserved.update_all('reservation = null')
end

.distribute_to_nodesObject

Attempt to send a list of work_units to nodes with available capacity. A single central server process stops the same WorkUnit from being distributed to multiple nodes by reserving it first. The algorithm used should be lock-free.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/cloud_crowd/models/work_unit.rb', line 24

def self.distribute_to_nodes
  return unless WorkUnit.reserve_available
  work_units = WorkUnit.reserved
  available_nodes = NodeRecord.available
  until work_units.empty? do
    node = available_nodes.shift
    unit = work_units.first
    break unless node && unit
    next unless node.actions.include? unit.action
    sent = node.send_work_unit(unit)
    if sent
      work_units.shift
      available_nodes.push(node) unless node.busy?
    end
  end
ensure
  WorkUnit.cancel_reservations
end

.find_by_worker_name(name) ⇒ Object

Look up a WorkUnit by the worker that’s currently processing it. Specified by pid@host.



62
63
64
65
66
# File 'lib/cloud_crowd/models/work_unit.rb', line 62

def self.find_by_worker_name(name)
  pid, host = name.split('@')
  node = NodeRecord.find_by_host(host)
  node && node.work_units.find_by_worker_pid(pid)
end

.reserve_availableObject

Reserves all available WorkUnits for this process. Returns false if there were none available.



45
46
47
# File 'lib/cloud_crowd/models/work_unit.rb', line 45

def self.reserve_available
  WorkUnit.available.update_all("reservation = #{$$}") > 0
end

.start(job, action, input, status) ⇒ Object

Convenience method for starting a new WorkUnit.



69
70
71
# File 'lib/cloud_crowd/models/work_unit.rb', line 69

def self.start(job, action, input, status)
  self.create(:job => job, :action => action, :input => input, :status => status)
end

Instance Method Details

#assign_to(node_record, worker_pid) ⇒ Object

When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.



123
124
125
# File 'lib/cloud_crowd/models/work_unit.rb', line 123

def assign_to(node_record, worker_pid)
  update_attributes!(:node_record => node_record, :worker_pid => worker_pid)
end

#fail(output, time_taken) ⇒ Object

Mark this unit as having failed. May attempt a retry.



98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/cloud_crowd/models/work_unit.rb', line 98

def fail(output, time_taken)
  tries = self.attempts + 1
  return try_again if tries < CloudCrowd.config[:work_unit_retries]
  update_attributes({
    :status         => FAILED,
    :node_record    => nil,
    :worker_pid     => nil,
    :attempts       => tries,
    :output         => output,
    :time           => time_taken
  })
  self.job.check_for_completion
end

#finish(result, time_taken) ⇒ Object

Mark this unit as having finished successfully. Splitting work units are handled differently (an optimization) – they immediately fire off all of their resulting WorkUnits for processing, without waiting for the rest of their splitting cousins to complete.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/cloud_crowd/models/work_unit.rb', line 77

def finish(result, time_taken)
  if splitting?
    [JSON.parse(parsed_output(result))].flatten.each do |new_input|
      WorkUnit.start(job, action, new_input, PROCESSING)
    end
    self.destroy
    job.set_next_status if job && job.done_splitting?
  else
    update_attributes({
      :status         => SUCCEEDED,
      :node_record    => nil,
      :worker_pid     => nil,
      :attempts       => attempts + 1,
      :output         => result,
      :time           => time_taken
    })
    job && job.check_for_completion
  end
end

#parsed_output(out = self.output) ⇒ Object

All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array). Convenience method to provide the parsed version.



130
131
132
# File 'lib/cloud_crowd/models/work_unit.rb', line 130

def parsed_output(out = self.output)
  JSON.parse(out)['output']
end

#to_jsonObject

The JSON representation of a WorkUnit shares the Job’s options with all its cousin WorkUnits.



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/cloud_crowd/models/work_unit.rb', line 136

def to_json
  {
    'id'        => self.id,
    'job_id'    => self.job_id,
    'input'     => self.input,
    'attempts'  => self.attempts,
    'action'    => self.action,
    'options'   => JSON.parse(self.job.options),
    'status'    => self.status
  }.to_json
end

#try_againObject

Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.



113
114
115
116
117
118
119
# File 'lib/cloud_crowd/models/work_unit.rb', line 113

def try_again
  update_attributes({
    :node_record  => nil,
    :worker_pid   => nil,
    :attempts     => self.attempts + 1
  })
end