Class: CloudCrowd::WorkUnit

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
ModelStatus
Defined in:
lib/cloud_crowd/models/work_unit.rb

Overview

A WorkUnit is an atomic chunk of work from a job, processing a single input through a single action. The WorkUnits are run in parallel, with each worker daemon processing one at a time. The splitting and merging stages of a job are each run as a single WorkUnit.

Constant Summary collapse

MAX_RESERVATION =

We use a random number in (0…MAX_RESERVATION) to reserve work units. The size of the maximum signed integer in MySQL – SQLite has no limit.

2147483647
RESERVATION_LIMIT =

We only reserve a certain number of WorkUnits in a single go, to avoid reserving the entire table.

25

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ModelStatus

#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?

Class Method Details

.cancel_all_reservationsObject

Cancels all outstanding WorkUnit reservations for all processes. (Useful in the console for debugging.)



76
77
78
# File 'lib/cloud_crowd/models/work_unit.rb', line 76

def self.cancel_all_reservations
  WorkUnit.update_all('reservation = null')
end

.cancel_reservations(reservation) ⇒ Object

Cancels all outstanding WorkUnit reservations for this process.



70
71
72
# File 'lib/cloud_crowd/models/work_unit.rb', line 70

def self.cancel_reservations(reservation)
  WorkUnit.reserved(reservation).update_all('reservation = null')
end

.distribute_to_nodesObject

Attempt to send a list of WorkUnits to nodes with available capacity. A single central server process stops the same WorkUnit from being distributed to multiple nodes by reserving it first. The algorithm used should be lock-free.

We reserve WorkUnits for this process in chunks of RESERVATION_LIMIT size, and try to match them to Nodes that are capable of handling the Action. WorkUnits get removed from the availability list when they are successfully sent, and Nodes get removed when they are busy or have the action in question disabled.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/cloud_crowd/models/work_unit.rb', line 40

def self.distribute_to_nodes
  reservation = nil
  loop do
    return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT)
    work_units = WorkUnit.reserved(reservation)
    available_nodes = NodeRecord.available
    while node = available_nodes.shift and unit = work_units.shift do
      if node.actions.include? unit.action
        if node.send_work_unit(unit)
          available_nodes.push(node) unless node.busy?
          next
        end
      end
      work_units.push(unit)
    end
    return if work_units.any? || available_nodes.empty?
  end
ensure
  WorkUnit.cancel_reservations(reservation) if reservation
end

.find_by_worker_name(name) ⇒ Object

Look up a WorkUnit by the worker that’s currently processing it. Specified by pid@host.



82
83
84
85
86
# File 'lib/cloud_crowd/models/work_unit.rb', line 82

def self.find_by_worker_name(name)
  pid, host = name.split('@')
  node = NodeRecord.find_by_host(host)
  node && node.work_units.find_by_worker_pid(pid)
end

.reserve_available(options = {}) ⇒ Object

Reserves all available WorkUnits for this process. Returns false if there were none available.



63
64
65
66
67
# File 'lib/cloud_crowd/models/work_unit.rb', line 63

def self.reserve_available(options={})
  reservation = ActiveSupport::SecureRandom.random_number(MAX_RESERVATION)
  any = WorkUnit.available.update_all("reservation = #{reservation}", nil, options) > 0
  any && reservation
end

.start(job, action, input, status) ⇒ Object

Convenience method for starting a new WorkUnit.



89
90
91
92
# File 'lib/cloud_crowd/models/work_unit.rb', line 89

def self.start(job, action, input, status)
  input = input.to_json unless input.is_a? String
  self.create(:job => job, :action => action, :input => input, :status => status)
end

Instance Method Details

#assign_to(node_record, worker_pid) ⇒ Object

When a Node checks out a WorkUnit, establish the connection between WorkUnit and NodeRecord and record the worker_pid.



144
145
146
# File 'lib/cloud_crowd/models/work_unit.rb', line 144

def assign_to(node_record, worker_pid)
  update_attributes!(:node_record => node_record, :worker_pid => worker_pid)
end

#fail(output, time_taken) ⇒ Object

Mark this unit as having failed. May attempt a retry.



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/cloud_crowd/models/work_unit.rb', line 119

def fail(output, time_taken)
  tries = self.attempts + 1
  return try_again if tries < CloudCrowd.config[:work_unit_retries]
  update_attributes({
    :status         => FAILED,
    :node_record    => nil,
    :worker_pid     => nil,
    :attempts       => tries,
    :output         => output,
    :time           => time_taken
  })
  job && job.check_for_completion
end

#finish(result, time_taken) ⇒ Object

Mark this unit as having finished successfully. Splitting work units are handled differently (an optimization) – they immediately fire off all of their resulting WorkUnits for processing, without waiting for the rest of their splitting cousins to complete.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/cloud_crowd/models/work_unit.rb', line 98

def finish(result, time_taken)
  if splitting?
    [parsed_output(result)].flatten.each do |new_input|
      WorkUnit.start(job, action, new_input, PROCESSING)
    end
    self.destroy
    job.set_next_status if job && job.done_splitting?
  else
    update_attributes({
      :status         => SUCCEEDED,
      :node_record    => nil,
      :worker_pid     => nil,
      :attempts       => attempts + 1,
      :output         => result,
      :time           => time_taken
    })
    job && job.check_for_completion
  end
end

#parsed_output(out = self.output) ⇒ Object

All output needs to be wrapped in a JSON object for consistency (unfortunately, JSON.parse needs the top-level to be an object or array). Convenience method to provide the parsed version.



151
152
153
# File 'lib/cloud_crowd/models/work_unit.rb', line 151

def parsed_output(out = self.output)
  JSON.parse(out)['output']
end

#to_jsonObject

The JSON representation of a WorkUnit shares the Job’s options with all its cousin WorkUnits.



157
158
159
160
161
162
163
164
165
166
167
# File 'lib/cloud_crowd/models/work_unit.rb', line 157

def to_json
  {
    'id'        => self.id,
    'job_id'    => self.job_id,
    'input'     => self.input,
    'attempts'  => self.attempts,
    'action'    => self.action,
    'options'   => JSON.parse(self.job.options),
    'status'    => self.status
  }.to_json
end

#try_againObject

Ever tried. Ever failed. No matter. Try again. Fail again. Fail better.



134
135
136
137
138
139
140
# File 'lib/cloud_crowd/models/work_unit.rb', line 134

def try_again
  update_attributes({
    :node_record  => nil,
    :worker_pid   => nil,
    :attempts     => self.attempts + 1
  })
end