Class: Gearman::TaskSet
- Inherits:
-
Object
- Object
- Gearman::TaskSet
- Defined in:
- lib/gearman/taskset.rb
Overview
TaskSet
Description
A set of tasks submitted to a Gearman job server.
Instance Method Summary collapse
-
#add_task(*args) ⇒ Object
Add a new task to this TaskSet.
-
#initialize(client) ⇒ TaskSet
constructor
A new instance of TaskSet.
-
#wait(timeout = 1) ⇒ Object
Wait for all tasks in the set to finish.
Constructor Details
#initialize(client) ⇒ TaskSet
Returns a new instance of TaskSet.
13 14 15 16 17 18 19 20 |
# File 'lib/gearman/taskset.rb', line 13 def initialize(client) @client = client @task_waiting_for_handle = nil @tasks_in_progress = {} # "host:port//handle" -> [job1, job2, ...] @finished_tasks = [] # tasks that have completed or failed @sockets = {} # "host:port" -> Socket @merge_hash_to_hostport = {} # Fixnum -> "host:port" end |
Instance Method Details
#add_task(*args) ⇒ Object
Add a new task to this TaskSet.
27 28 29 30 |
# File 'lib/gearman/taskset.rb', line 27 def add_task(*args) task = Util::get_task_from_args(*args) add_task_internal(task, true) end |
#wait(timeout = 1) ⇒ Object
Wait for all tasks in the set to finish.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/gearman/taskset.rb', line 228 def wait(timeout = 1) end_time = if timeout Time.now.to_f + timeout else nil end while not @tasks_in_progress.empty? remaining = if end_time (t = end_time - Time.now.to_f) > 0 ? t : 0 else nil end ready_socks = remaining == 0 ? nil : IO::select(@sockets.values, nil, nil, remaining) if not ready_socks or not ready_socks[0] Util.logger.debug "GearmanRuby: Timed out while waiting for tasks to finish" # not sure what state the connections are in, so just be lame and # close them for now @sockets.values.each {|s| @client.close_socket(s) } @sockets = {} return false end ready_socks[0].each do |sock| begin read_packet(sock, (end_time ? end_time - Time.now.to_f : nil)) rescue ProtocolError hostport = @client.get_hostport_for_socket(sock) Util.logger.debug "GearmanRuby: Ignoring bad packet from #{hostport}" rescue NetworkError hostport = @client.get_hostport_for_socket(sock) Util.logger.debug "GearmanRuby: Got timeout on read from #{hostport}" end end end @sockets.values.each {|s| @client.return_socket(s) } @sockets = {} @finished_tasks.each do |t| if ( (t.background.nil? || t.background == false) && !t.successful) Util.logger.debug "GearmanRuby: Taskset failed" return false end end true end |