Class: BoundedAsyncTaskRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/bounded_async_task_runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_workers = 20) ⇒ BoundedAsyncTaskRunner

Create a new BoundedAsyncTaskRunner.

Parameters:

  • max_workers (Integer) (defaults to: 20)

    the maximum number of asynchronous tasks to simultaneously execute.



18
19
20
21
# File 'lib/bounded_async_task_runner.rb', line 18

def initialize(max_workers=20)
  @threads = Array.new
  @max_workers = max_workers
end

Instance Attribute Details

#max_workersObject (readonly)

Returns the value of attribute max_workers.



12
13
14
# File 'lib/bounded_async_task_runner.rb', line 12

def max_workers
  @max_workers
end

Instance Method Details

#do(*args) {|*args| ... } ⇒ Object Also known as: do_async

Execute the provided block asynchronously.

Parameters:

  • *args

    Objects to yield to the provided block when it is invoked.

Yields:

  • The task to execute asynchronously.

Yield Parameters:

  • *args

    The arguments passed into this method.

Raises:

  • (AsyncTaskError)

    If any previously executed task has failed (i.e. raised an error) an AsyncTaskError will be raised instead of executing the async task.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/bounded_async_task_runner.rb', line 35

def do(*args, &block)
  purge_dead
  until @threads.size < max_workers
    purge_dead
    break if @threads.size < max_workers
    # We have still have the maximum number of active threads so we
    # need to wait for one to finish.

    @threads.first.join(0.1)  # Join on the oldest thread because it will probably finish first.
  end
  # We can start new thread without exceeding the threshold

  thread_args = [block] + args

  @threads << Thread.start([block] + args) do |args|
    task = args.shift
    
    begin
      task.call(*args)
    rescue Exception => e
      AsyncTaskError.new(e)
    end
  end
end

#wait_for_all_to_finishObject

Blocks until all currently running tasks to complete.

Raises:

  • (AsyncTaskError)

    If any previously executed task has failed (i.e. raised an error) an AsyncTaskError will be raised.



66
67
68
69
70
71
# File 'lib/bounded_async_task_runner.rb', line 66

def wait_for_all_to_finish
  until @threads.empty?
    @threads.first.join
    purge_dead
  end    
end