Class: CloudCrowd::Job
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- CloudCrowd::Job
- Includes:
- ModelStatus
- Defined in:
- lib/cloud_crowd/models/job.rb
Overview
A chunk of work that will be farmed out into many WorkUnits to be processed in parallel by each active CloudCrowd::Worker. Jobs are defined by a list of inputs (usually public urls to files), an action (the name of a script that CloudCrowd knows how to run), and, eventually a corresponding list of output.
Class Method Summary collapse
-
.create_from_request(h) ⇒ Object
Create a Job from an incoming JSON request, and add it to the queue.
Instance Method Summary collapse
-
#action_class ⇒ Object
Retrieve the class for this Job’s Action.
-
#all_work_units_complete? ⇒ Boolean
Have all of the WorkUnits finished?.
-
#any_work_units_failed? ⇒ Boolean
Have any of the WorkUnits failed?.
-
#check_for_completion ⇒ Object
After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.
-
#cleanup_assets ⇒ Object
Cleaning up after a job will remove all of its files from S3 or the filesystem.
-
#color ⇒ Object
Generate a stable 8-bit Hex color code, based on the Job’s id.
-
#done_splitting? ⇒ Boolean
This job is done splitting if it’s finished with its splitting work units.
-
#fire_callback ⇒ Object
If a
callback_urlis defined, post the Job’s JSON to it upon completion. -
#mergeable? ⇒ Boolean
This job is mergeable if its Action has a
mergemethod. -
#percent_complete ⇒ Object
How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards.
-
#set_next_status ⇒ Object
Transition this Job’s current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.
-
#splittable? ⇒ Boolean
This job is splittable if its Action has a
splitmethod. -
#time_taken ⇒ Object
How long has this Job taken?.
-
#to_json(opts = {}) ⇒ Object
A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.
Methods included from ModelStatus
#complete?, #display_status, #failed?, included, #incomplete?, #merging?, #processing?, #splitting?, #succeeded?
Class Method Details
.create_from_request(h) ⇒ Object
Create a Job from an incoming JSON request, and add it to the queue.
19 20 21 22 23 24 25 26 27 |
# File 'lib/cloud_crowd/models/job.rb', line 19 def self.create_from_request(h) self.create( :inputs => h['inputs'].to_json, :action => h['action'], :options => (h['options'] || {}).to_json, :email => h['email'], :callback_url => h['callback_url'] ) end |
Instance Method Details
#action_class ⇒ Object
Retrieve the class for this Job’s Action.
103 104 105 106 107 |
# File 'lib/cloud_crowd/models/job.rb', line 103 def action_class @action_class ||= CloudCrowd.actions[self.action] return @action_class if @action_class raise Error::ActionNotFound, "no action named: '#{self.action}' could be found" end |
#all_work_units_complete? ⇒ Boolean
Have all of the WorkUnits finished?
78 79 80 |
# File 'lib/cloud_crowd/models/job.rb', line 78 def all_work_units_complete? self.work_units.incomplete.count <= 0 end |
#any_work_units_failed? ⇒ Boolean
Have any of the WorkUnits failed?
83 84 85 |
# File 'lib/cloud_crowd/models/job.rb', line 83 def any_work_units_failed? self.work_units.failed.count > 0 end |
#check_for_completion ⇒ Object
After work units are marked successful, we check to see if all of them have finished, if so, continue on to the next phase of the job.
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/cloud_crowd/models/job.rb', line 31 def check_for_completion return unless all_work_units_complete? set_next_status outs = gather_outputs_from_work_units return queue_for_workers(outs) if merging? if complete? update_attributes(:outputs => outs, :time => time_taken) fire_callback if callback_url end self end |
#cleanup_assets ⇒ Object
Cleaning up after a job will remove all of its files from S3 or the filesystem. Destroying a Job will cleanup_assets first. Run this in a separate thread to get out of the transaction’s way. TODO: Convert this into a ‘cleanup’ work unit that gets run by a worker.
73 74 75 |
# File 'lib/cloud_crowd/models/job.rb', line 73 def cleanup_assets AssetStore.new.cleanup(self) end |
#color ⇒ Object
Generate a stable 8-bit Hex color code, based on the Job’s id.
127 128 129 |
# File 'lib/cloud_crowd/models/job.rb', line 127 def color @color ||= Digest::MD5.hexdigest(self.id.to_s)[-7...-1] end |
#done_splitting? ⇒ Boolean
This job is done splitting if it’s finished with its splitting work units.
93 94 95 |
# File 'lib/cloud_crowd/models/job.rb', line 93 def done_splitting? splittable? && work_units.splitting.count <= 0 end |
#fire_callback ⇒ Object
If a callback_url is defined, post the Job’s JSON to it upon completion. The callback_url may include HTTP basic authentication, if you like:
http://user:[email protected]/job_complete
If the callback_url is successfully pinged, we proceed to cleanup the job. TODO: This should be moved into a Work Unit…
60 61 62 63 64 65 66 67 |
# File 'lib/cloud_crowd/models/job.rb', line 60 def fire_callback begin RestClient.post(callback_url, {:job => self.to_json}) self.destroy rescue RestClient::Exception => e puts "Failed to fire job callback. Hmmm, what should happen here?" end end |
#mergeable? ⇒ Boolean
This job is mergeable if its Action has a merge method.
98 99 100 |
# File 'lib/cloud_crowd/models/job.rb', line 98 def mergeable? self.processing? && self.action_class.public_instance_methods.include?('merge') end |
#percent_complete ⇒ Object
How complete is this Job? Unfortunately, with the current processing sequence, the percent_complete can pull a fast one and go backwards. This happens when there’s a single large input that takes a long time to split, and when it finally does it creates a whole swarm of work units. This seems unavoidable.
114 115 116 117 118 |
# File 'lib/cloud_crowd/models/job.rb', line 114 def percent_complete return 99 if merging? return 100 if complete? (work_units.complete.count / work_units.count.to_f * 100).round end |
#set_next_status ⇒ Object
Transition this Job’s current status to the appropriate next one, based on the state of the WorkUnits and the nature of the Action.
45 46 47 48 49 50 51 52 |
# File 'lib/cloud_crowd/models/job.rb', line 45 def set_next_status update_attribute(:status, any_work_units_failed? ? FAILED : self.splitting? ? PROCESSING : self.mergeable? ? MERGING : SUCCEEDED ) end |
#splittable? ⇒ Boolean
This job is splittable if its Action has a split method.
88 89 90 |
# File 'lib/cloud_crowd/models/job.rb', line 88 def splittable? self.action_class.public_instance_methods.include? 'split' end |
#time_taken ⇒ Object
How long has this Job taken?
121 122 123 124 |
# File 'lib/cloud_crowd/models/job.rb', line 121 def time_taken return self.time if self.time Time.now - self.created_at end |
#to_json(opts = {}) ⇒ Object
A JSON representation of this job includes the statuses of its component WorkUnits, as well as any completed outputs.
133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/cloud_crowd/models/job.rb', line 133 def to_json(opts={}) atts = { 'id' => id, 'color' => color, 'status' => display_status, 'percent_complete' => percent_complete, 'work_units' => work_units.count, 'time_taken' => time_taken } atts['outputs'] = JSON.parse(outputs) if outputs atts['email'] = email if email atts.to_json end |