Class: Gearman::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/gearman/task.rb

Overview

Task

Description

A task submitted to a Gearman job server.

Direct Known Subclasses

BackgroundTask

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(func, arg = '', opts = {}) ⇒ Task

Create a new Task object.

Parameters:

  • func

    function name

  • arg (defaults to: '')

    argument to the function

  • opts (defaults to: {})

    hash of additional options



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/gearman/task.rb', line 19

def initialize(func, arg='', opts={})
  @func = func.to_s
  @arg = arg or ''  # TODO: use something more ref-like?
  @uniq = nil       # Initialize to nil
  %w{on_complete on_fail on_retry on_exception on_status on_warning on_data
     uniq retry_count priority hash background}.map {|s| s.to_sym }.each do |k|
    instance_variable_set "@#{k}", opts[k]
    opts.delete k
  end
  if opts.size > 0
    raise InvalidArgsError, 'Invalid task args: ' + opts.keys.sort.join(', ')
  end
  @retry_count ||= 0
  @successful = false
  @retries_done = 0

end

Instance Attribute Details

#argObject (readonly)

Returns the value of attribute arg.



38
39
40
# File 'lib/gearman/task.rb', line 38

def arg
  @arg
end

#backgroundObject

Returns the value of attribute background.



37
38
39
# File 'lib/gearman/task.rb', line 37

def background
  @background
end

#epochObject

Returns the value of attribute epoch.



37
38
39
# File 'lib/gearman/task.rb', line 37

def epoch
  @epoch
end

#funcObject (readonly)

Returns the value of attribute func.



38
39
40
# File 'lib/gearman/task.rb', line 38

def func
  @func
end

#priorityObject

Returns the value of attribute priority.



37
38
39
# File 'lib/gearman/task.rb', line 37

def priority
  @priority
end

#retries_doneObject (readonly)

Returns the value of attribute retries_done.



38
39
40
# File 'lib/gearman/task.rb', line 38

def retries_done
  @retries_done
end

#retry_countObject

Returns the value of attribute retry_count.



37
38
39
# File 'lib/gearman/task.rb', line 37

def retry_count
  @retry_count
end

#successfulObject (readonly)

Returns the value of attribute successful.



38
39
40
# File 'lib/gearman/task.rb', line 38

def successful
  @successful
end

#uniqObject

Returns the value of attribute uniq.



37
38
39
# File 'lib/gearman/task.rb', line 37

def uniq
  @uniq
end

Instance Method Details

#get_submit_packetObject

Construct a packet to submit this task to a job server.

Returns:

  • String representation of packet



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/gearman/task.rb', line 191

def get_submit_packet()
  modes = ['submit_job']

  if @scheduled_at
    modes << 'epoch'
    args = [func, get_uniq_hash, @scheduled_at.to_i, arg]
  else
    if @priority
      modes << 'high' if @priority == :high
      modes << 'low'  if @priority == :low
    end

    modes << 'bg' if @background

    args = [func, get_uniq_hash, arg]
  end

  mode = modes.join('_')
  Util::pack_request(mode, args.join("\0"))
end

#get_uniq_hashObject

Return a hash that we can use to execute identical tasks on the same job server.

Returns:

  • hashed value, based on @func + @arg (sorted letters and then SHA1’d) if @uniq is nil, or the SHA1 of @uniq if it’s set to something



175
176
177
178
179
180
181
182
183
184
185
# File 'lib/gearman/task.rb', line 175

def get_uniq_hash
  return @hash if @hash

  if @uniq.nil? 
    string = (@func+@arg).to_s
  else 
    string = @uniq
  end

  @hash = Digest::SHA1.hexdigest(string)
end

#handle_completion(data) ⇒ Object

Handle completion of the task.

Parameters:

  • data

    data returned from the server (doesn’t include handle)



118
119
120
121
122
# File 'lib/gearman/task.rb', line 118

def handle_completion(data)
  @successful = true
  @on_complete.call(data) if @on_complete
  self
end

#handle_data(data) ⇒ Object

Handle (partial) data



163
164
165
166
# File 'lib/gearman/task.rb', line 163

def handle_data(data)
  @on_data.call(data) if @on_data
  self
end

#handle_exception(exception) ⇒ Object

Record an exception.



141
142
143
144
# File 'lib/gearman/task.rb', line 141

def handle_exception(exception)
  @on_exception.call(exception) if @on_exception
  self
end

#handle_failureObject

Record a failure and check whether we should be retried.

Returns:

  • true if we should be resubmitted; false otherwise



128
129
130
131
132
133
134
135
136
# File 'lib/gearman/task.rb', line 128

def handle_failure
  if @retries_done >= @retry_count
    @on_fail.call if @on_fail
    return false
  end
  @retries_done += 1
  @on_retry.call(@retries_done) if @on_retry
  true
end

#handle_status(numerator, denominator) ⇒ Object

Handle a status update for the task.



148
149
150
151
# File 'lib/gearman/task.rb', line 148

def handle_status(numerator, denominator)
  @on_status.call(numerator, denominator) if @on_status
  self
end

#handle_warning(message) ⇒ Object

Handle a warning.



156
157
158
159
# File 'lib/gearman/task.rb', line 156

def handle_warning(message)
  @on_warning.call(message) if @on_warning
  self
end

#on_complete(&f) ⇒ Object

Set a block of code to be executed when this task completes successfully. The returned data will be passed to the block.



61
62
63
# File 'lib/gearman/task.rb', line 61

def on_complete(&f)
  @on_complete = f
end

#on_data(&f) ⇒ Object

Set a block of code to be executed when we receive a (partial) data packet for this task. The data received will be passed as an argument to the block.



110
111
112
# File 'lib/gearman/task.rb', line 110

def on_data(&f)
  @on_data = f
end

#on_exception(&f) ⇒ Object

Set a block of code to be executed when a remote exception is sent by a worker. The block will receive the message of the exception passed from the worker. The user can return true for retrying or false to mark it as finished

NOTE: this is actually deprecated, cf. bugs.launchpad.net/gearmand/+bug/405732



86
87
88
# File 'lib/gearman/task.rb', line 86

def on_exception(&f)
  @on_exception = f
end

#on_fail(&f) ⇒ Object

Set a block of code to be executed when this task fails.



67
68
69
# File 'lib/gearman/task.rb', line 67

def on_fail(&f)
  @on_fail = f
end

#on_retry(&f) ⇒ Object

Set a block of code to be executed when this task is retried after failing. The number of retries that have been attempted (including the current one) will be passed to the block.



75
76
77
# File 'lib/gearman/task.rb', line 75

def on_retry(&f)
  @on_retry = f
end

#on_status(&f) ⇒ Object

Set a block of code to be executed when we receive a status update for this task. The block will receive two arguments, a numerator and denominator describing the task’s status.



94
95
96
# File 'lib/gearman/task.rb', line 94

def on_status(&f)
  @on_status = f
end

#on_warning(&f) ⇒ Object

Set a block of code to be executed when we receive a warning from a worker. It is recommended for workers to send work_warning, followed by work_fail if an exception occurs on their side. Don’t expect this behavior from workers NOT using this very library ATM, though. (cf. bugs.launchpad.net/gearmand/+bug/405732)



103
104
105
# File 'lib/gearman/task.rb', line 103

def on_warning(&f)
  @on_warning = f
end

#reset_stateObject

Internal method to reset this task’s state so it can be run again. Called by TaskSet#add_task.



52
53
54
55
56
# File 'lib/gearman/task.rb', line 52

def reset_state
  @retries_done = 0
  @successful = false
  self
end

#schedule(time) ⇒ Object

Schedule this job to run at a certain time (like ‘cron`) XXX: But there is no wildcard??

Parameters:

  • time

    Ruby Time object that represents when to run the thing



45
46
47
# File 'lib/gearman/task.rb', line 45

def schedule(time)
  @scheduled_at = time
end