Class: Gearman::TaskSet

Inherits:
Object
  • Object
show all
Defined in:
lib/gearman/taskset.rb

Overview

TaskSet

Description

A set of tasks submitted to a Gearman job server.

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ TaskSet

Returns a new instance of TaskSet.



13
14
15
16
17
18
19
20
# File 'lib/gearman/taskset.rb', line 13

def initialize(client)
  @client = client
  @task_waiting_for_handle = nil
  @tasks_in_progress = {}  # "host:port//handle" -> [job1, job2, ...]
  @finished_tasks = []  # tasks that have completed or failed
  @sockets = {}  # "host:port" -> Socket
  @merge_hash_to_hostport = {}  # Fixnum -> "host:port"
end

Instance Method Details

#add_task(*args) ⇒ Object

Add a new task to this TaskSet.

Parameters:

  • args

    either a Task or arguments for Task.new

Returns:

  • true if the task was created successfully, false otherwise



27
28
29
30
# File 'lib/gearman/taskset.rb', line 27

def add_task(*args)
  task = Util::get_task_from_args(*args)
  add_task_internal(task, true)
end

#wait(timeout = 1) ⇒ Object

Wait for all tasks in the set to finish.

Parameters:

  • timeout (defaults to: 1)

    maximum amount of time to wait, in seconds



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/gearman/taskset.rb', line 228

def wait(timeout = 1)
  end_time = if timeout
    Time.now.to_f + timeout
  else
    nil
  end

  while not @tasks_in_progress.empty?
    remaining = if end_time
      (t = end_time - Time.now.to_f) > 0 ? t : 0
    else
      nil
    end

    ready_socks = remaining == 0 ? nil : IO::select(@sockets.values, nil, nil, remaining)
    if not ready_socks or not ready_socks[0]
      Util.logger.debug "GearmanRuby: Timed out while waiting for tasks to finish"
      # not sure what state the connections are in, so just be lame and
      # close them for now
      @sockets.values.each {|s| @client.close_socket(s) }
      @sockets = {}
      return false
    end
    ready_socks[0].each do |sock|
      begin
        read_packet(sock, (end_time ? end_time - Time.now.to_f : nil))
      rescue ProtocolError
        hostport = @client.get_hostport_for_socket(sock)
        Util.logger.debug "GearmanRuby: Ignoring bad packet from #{hostport}"
      rescue NetworkError
        hostport = @client.get_hostport_for_socket(sock)
        Util.logger.debug "GearmanRuby: Got timeout on read from #{hostport}"
      end
    end
  end

  @sockets.values.each {|s| @client.return_socket(s) }
  @sockets = {}
  @finished_tasks.each do |t|
    if ( (t.background.nil? || t.background == false) && !t.successful)
      Util.logger.debug "GearmanRuby: Taskset failed"
      return false
    end
  end
  true
end