Class: WorkQueue
- Inherits:
-
Object
- Object
- WorkQueue
- Defined in:
- lib/work_queue.rb
Overview
WorkQueue
Description
A tunable work queue, designed to coordinate work between a producer and a pool of worker threads.
Usage
wq = WorkQueue.new
wq.enqueue_b { puts "Hello from the WorkQueue" }
wq.join
Constant Summary collapse
- VERSION =
"2.0.1"
Instance Method Summary collapse
-
#cur_tasks ⇒ Object
Returns the current number of active tasks.
-
#cur_threads ⇒ Object
Returns the current number of worker threads.
-
#enqueue_b(*args, &block) ⇒ Object
Schedules the given Block for future execution by a worker thread.
-
#enqueue_p(proc, *args) ⇒ Object
Schedules the given Proc for future execution by a worker thread.
-
#initialize(max_threads = nil, max_tasks = nil) ⇒ WorkQueue
constructor
Creates a new work queue with the desired parameters.
-
#join ⇒ Object
Waits until the tasks queue is empty and all worker threads have finished.
-
#kill ⇒ Object
Stops all worker threads immediately, aborting any ongoing tasks.
-
#max_tasks ⇒ Object
Returns the maximum number of queued tasks.
-
#max_threads ⇒ Object
Returns the maximum number of worker threads.
Constructor Details
#initialize(max_threads = nil, max_tasks = nil) ⇒ WorkQueue
Creates a new work queue with the desired parameters.
wq = WorkQueue.new(5,10,20)
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/work_queue.rb', line 44 def initialize(max_threads=nil, max_tasks=nil) self.max_threads = max_threads self.max_tasks = max_tasks @threads = Array.new @threads.extend(MonitorMixin) @threads_waiting = 0 @tasks = Array.new @tasks.extend(MonitorMixin) @task_enqueued = @tasks.new_cond @task_completed = @tasks.new_cond @cur_tasks = 0 end |
Instance Method Details
#cur_tasks ⇒ Object
106 107 108 |
# File 'lib/work_queue.rb', line 106 def cur_tasks @cur_tasks end |
#cur_threads ⇒ Object
79 80 81 |
# File 'lib/work_queue.rb', line 79 def cur_threads @threads.size end |
#enqueue_b(*args, &block) ⇒ Object
128 129 130 |
# File 'lib/work_queue.rb', line 128 def enqueue_b(*args, &block) enqueue(block, args) end |
#enqueue_p(proc, *args) ⇒ Object
117 118 119 |
# File 'lib/work_queue.rb', line 117 def enqueue_p(proc, *args) enqueue(proc, args) end |
#join ⇒ Object
139 140 141 142 143 |
# File 'lib/work_queue.rb', line 139 def join @tasks.synchronize do @task_completed.wait_while { cur_tasks > 0 } end end |
#kill ⇒ Object
152 153 154 155 156 157 158 159 160 |
# File 'lib/work_queue.rb', line 152 def kill @tasks.synchronize do @threads.dup.each { |thread| thread.exit.join } @threds.clear @threads_waiting = 0 @tasks.clear @cur_tasks = 0 end end |
#max_tasks ⇒ Object
92 93 94 |
# File 'lib/work_queue.rb', line 92 def max_tasks @max_tasks end |
#max_threads ⇒ Object
66 67 68 |
# File 'lib/work_queue.rb', line 66 def max_threads @max_threads end |