Class: Gearman::Task
- Inherits:
-
Object
- Object
- Gearman::Task
- Defined in:
- lib/gearman/task.rb
Overview
Task
Description
A task submitted to a Gearman job server.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#arg ⇒ Object
readonly
Returns the value of attribute arg.
-
#background ⇒ Object
Returns the value of attribute background.
-
#epoch ⇒ Object
Returns the value of attribute epoch.
-
#func ⇒ Object
readonly
Returns the value of attribute func.
-
#priority ⇒ Object
Returns the value of attribute priority.
-
#retries_done ⇒ Object
readonly
Returns the value of attribute retries_done.
-
#retry_count ⇒ Object
Returns the value of attribute retry_count.
-
#successful ⇒ Object
readonly
Returns the value of attribute successful.
-
#uniq ⇒ Object
Returns the value of attribute uniq.
Instance Method Summary collapse
-
#get_submit_packet ⇒ Object
Construct a packet to submit this task to a job server.
-
#get_uniq_hash ⇒ Object
Return a hash that we can use to execute identical tasks on the same job server.
-
#handle_completion(data) ⇒ Object
Handle completion of the task.
-
#handle_data(data) ⇒ Object
Handle (partial) data.
-
#handle_exception(exception) ⇒ Object
Record an exception.
-
#handle_failure ⇒ Object
Record a failure and check whether we should be retried.
-
#handle_status(numerator, denominator) ⇒ Object
Handle a status update for the task.
-
#handle_warning(message) ⇒ Object
Handle a warning.
-
#initialize(func, arg = '', opts = {}) ⇒ Task
constructor
Create a new Task object.
-
#on_complete(&f) ⇒ Object
Set a block of code to be executed when this task completes successfully.
-
#on_data(&f) ⇒ Object
Set a block of code to be executed when we receive a (partial) data packet for this task.
-
#on_exception(&f) ⇒ Object
Set a block of code to be executed when a remote exception is sent by a worker.
-
#on_fail(&f) ⇒ Object
Set a block of code to be executed when this task fails.
-
#on_retry(&f) ⇒ Object
Set a block of code to be executed when this task is retried after failing.
-
#on_status(&f) ⇒ Object
Set a block of code to be executed when we receive a status update for this task.
-
#on_warning(&f) ⇒ Object
Set a block of code to be executed when we receive a warning from a worker.
-
#reset_state ⇒ Object
Internal method to reset this task’s state so it can be run again.
-
#schedule(time) ⇒ Object
Schedule this job to run at a certain time (like ‘cron`) XXX: But there is no wildcard??.
Constructor Details
#initialize(func, arg = '', opts = {}) ⇒ Task
Create a new Task object.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/gearman/task.rb', line 19 def initialize(func, arg='', opts={}) @func = func.to_s @arg = arg or '' # TODO: use something more ref-like? @uniq = nil # Initialize to nil %w{on_complete on_fail on_retry on_exception on_status on_warning on_data uniq retry_count priority hash background}.map {|s| s.to_sym }.each do |k| instance_variable_set "@#{k}", opts[k] opts.delete k end if opts.size > 0 raise InvalidArgsError, 'Invalid task args: ' + opts.keys.sort.join(', ') end @retry_count ||= 0 @successful = false @retries_done = 0 end |
Instance Attribute Details
#arg ⇒ Object (readonly)
Returns the value of attribute arg.
38 39 40 |
# File 'lib/gearman/task.rb', line 38 def arg @arg end |
#background ⇒ Object
Returns the value of attribute background.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def background @background end |
#epoch ⇒ Object
Returns the value of attribute epoch.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def epoch @epoch end |
#func ⇒ Object (readonly)
Returns the value of attribute func.
38 39 40 |
# File 'lib/gearman/task.rb', line 38 def func @func end |
#priority ⇒ Object
Returns the value of attribute priority.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def priority @priority end |
#retries_done ⇒ Object (readonly)
Returns the value of attribute retries_done.
38 39 40 |
# File 'lib/gearman/task.rb', line 38 def retries_done @retries_done end |
#retry_count ⇒ Object
Returns the value of attribute retry_count.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def retry_count @retry_count end |
#successful ⇒ Object (readonly)
Returns the value of attribute successful.
38 39 40 |
# File 'lib/gearman/task.rb', line 38 def successful @successful end |
#uniq ⇒ Object
Returns the value of attribute uniq.
37 38 39 |
# File 'lib/gearman/task.rb', line 37 def uniq @uniq end |
Instance Method Details
#get_submit_packet ⇒ Object
Construct a packet to submit this task to a job server.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/gearman/task.rb', line 191 def get_submit_packet() modes = ['submit_job'] if @scheduled_at modes << 'epoch' args = [func, get_uniq_hash, @scheduled_at.to_i, arg] else if @priority modes << 'high' if @priority == :high modes << 'low' if @priority == :low end modes << 'bg' if @background args = [func, get_uniq_hash, arg] end mode = modes.join('_') Util::pack_request(mode, args.join("\0")) end |
#get_uniq_hash ⇒ Object
Return a hash that we can use to execute identical tasks on the same job server.
175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/gearman/task.rb', line 175 def get_uniq_hash return @hash if @hash if @uniq.nil? string = (@func+@arg).to_s else string = @uniq end @hash = Digest::SHA1.hexdigest(string) end |
#handle_completion(data) ⇒ Object
Handle completion of the task.
118 119 120 121 122 |
# File 'lib/gearman/task.rb', line 118 def handle_completion(data) @successful = true @on_complete.call(data) if @on_complete self end |
#handle_data(data) ⇒ Object
Handle (partial) data
163 164 165 166 |
# File 'lib/gearman/task.rb', line 163 def handle_data(data) @on_data.call(data) if @on_data self end |
#handle_exception(exception) ⇒ Object
Record an exception.
141 142 143 144 |
# File 'lib/gearman/task.rb', line 141 def handle_exception(exception) @on_exception.call(exception) if @on_exception self end |
#handle_failure ⇒ Object
Record a failure and check whether we should be retried.
128 129 130 131 132 133 134 135 136 |
# File 'lib/gearman/task.rb', line 128 def handle_failure if @retries_done >= @retry_count @on_fail.call if @on_fail return false end @retries_done += 1 @on_retry.call(@retries_done) if @on_retry true end |
#handle_status(numerator, denominator) ⇒ Object
Handle a status update for the task.
148 149 150 151 |
# File 'lib/gearman/task.rb', line 148 def handle_status(numerator, denominator) @on_status.call(numerator, denominator) if @on_status self end |
#handle_warning(message) ⇒ Object
Handle a warning.
156 157 158 159 |
# File 'lib/gearman/task.rb', line 156 def handle_warning() @on_warning.call() if @on_warning self end |
#on_complete(&f) ⇒ Object
Set a block of code to be executed when this task completes successfully. The returned data will be passed to the block.
61 62 63 |
# File 'lib/gearman/task.rb', line 61 def on_complete(&f) @on_complete = f end |
#on_data(&f) ⇒ Object
Set a block of code to be executed when we receive a (partial) data packet for this task. The data received will be passed as an argument to the block.
110 111 112 |
# File 'lib/gearman/task.rb', line 110 def on_data(&f) @on_data = f end |
#on_exception(&f) ⇒ Object
Set a block of code to be executed when a remote exception is sent by a worker. The block will receive the message of the exception passed from the worker. The user can return true for retrying or false to mark it as finished
NOTE: this is actually deprecated, cf. bugs.launchpad.net/gearmand/+bug/405732
86 87 88 |
# File 'lib/gearman/task.rb', line 86 def on_exception(&f) @on_exception = f end |
#on_fail(&f) ⇒ Object
Set a block of code to be executed when this task fails.
67 68 69 |
# File 'lib/gearman/task.rb', line 67 def on_fail(&f) @on_fail = f end |
#on_retry(&f) ⇒ Object
Set a block of code to be executed when this task is retried after failing. The number of retries that have been attempted (including the current one) will be passed to the block.
75 76 77 |
# File 'lib/gearman/task.rb', line 75 def on_retry(&f) @on_retry = f end |
#on_status(&f) ⇒ Object
Set a block of code to be executed when we receive a status update for this task. The block will receive two arguments, a numerator and denominator describing the task’s status.
94 95 96 |
# File 'lib/gearman/task.rb', line 94 def on_status(&f) @on_status = f end |
#on_warning(&f) ⇒ Object
Set a block of code to be executed when we receive a warning from a worker. It is recommended for workers to send work_warning, followed by work_fail if an exception occurs on their side. Don’t expect this behavior from workers NOT using this very library ATM, though. (cf. bugs.launchpad.net/gearmand/+bug/405732)
103 104 105 |
# File 'lib/gearman/task.rb', line 103 def on_warning(&f) @on_warning = f end |
#reset_state ⇒ Object
Internal method to reset this task’s state so it can be run again. Called by TaskSet#add_task.
52 53 54 55 56 |
# File 'lib/gearman/task.rb', line 52 def reset_state @retries_done = 0 @successful = false self end |
#schedule(time) ⇒ Object
Schedule this job to run at a certain time (like ‘cron`) XXX: But there is no wildcard??
45 46 47 |
# File 'lib/gearman/task.rb', line 45 def schedule(time) @scheduled_at = time end |