Class: WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/work_queue.rb

Overview

WorkQueue

Description

A tunable work queue, designed to coordinate work between a producer and a pool of worker threads.

Usage

wq = WorkQueue.new
wq.enqueue_b { puts "Hello from the WorkQueue" }
wq.join

Constant Summary collapse

VERSION =
"2.0.1"

Instance Method Summary collapse

Constructor Details

#initialize(max_threads = nil, max_tasks = nil) ⇒ WorkQueue

Creates a new work queue with the desired parameters.

wq = WorkQueue.new(5,10,20)


44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/work_queue.rb', line 44

def initialize(max_threads=nil, max_tasks=nil)
    self.max_threads = max_threads
    self.max_tasks = max_tasks
    @threads = Array.new
    @threads.extend(MonitorMixin)
    @threads_waiting = 0
    @tasks = Array.new
    @tasks.extend(MonitorMixin)
    @task_enqueued = @tasks.new_cond
    @task_completed = @tasks.new_cond
    @cur_tasks = 0
end

Instance Method Details

#cur_tasksObject

Returns the current number of active tasks. This value is just a snapshot, and may change immediately upon returning.

wq = WorkQueue.new(1)
wq.enqueue_b { sleep(1) }
wq.cur_tasks		#=> 0
wq.enqueue_b {}
wq.cur_tasks		#=> 1


106
107
108
# File 'lib/work_queue.rb', line 106

def cur_tasks
    @cur_tasks
end

#cur_threadsObject

Returns the current number of worker threads. This value is just a snapshot, and may change immediately upon returning.

wq = WorkQueue.new(10)
wq.cur_threads		#=> 0
wq.enqueue_b {}
wq.cur_threads		#=> 1


79
80
81
# File 'lib/work_queue.rb', line 79

def cur_threads
    @threads.size
end

#enqueue_b(*args, &block) ⇒ Object

Schedules the given Block for future execution by a worker thread. If there is no space left in the queue, waits until space becomes available.

wq = WorkQueue.new(1)
wq.enqueue_b {}


128
129
130
# File 'lib/work_queue.rb', line 128

def enqueue_b(*args, &block)
   enqueue(block, args) 
end

#enqueue_p(proc, *args) ⇒ Object

Schedules the given Proc for future execution by a worker thread. If there is no space left in the queue, waits until space becomes available.

wq = WorkQueue.new(1)
wq.enqueue_p(Proc.new {})


117
118
119
# File 'lib/work_queue.rb', line 117

def enqueue_p(proc, *args)
    enqueue(proc, args) 
end

#joinObject

Waits until the tasks queue is empty and all worker threads have finished.

wq = WorkQueue.new(1)
wq.enqueue_b { sleep(1) }
wq.join


139
140
141
142
143
# File 'lib/work_queue.rb', line 139

def join
    @tasks.synchronize do
        @task_completed.wait_while { cur_tasks > 0 }
    end
end

#killObject

Stops all worker threads immediately, aborting any ongoing tasks.

wq = WorkQueue.new(1)
wq.enqueue_b { sleep(1) }
wq.kill


152
153
154
155
156
157
158
159
160
# File 'lib/work_queue.rb', line 152

def kill
    @tasks.synchronize do
        @threads.dup.each { |thread| thread.exit.join }
        @threds.clear
        @threads_waiting = 0
        @tasks.clear
        @cur_tasks = 0
    end
end

#max_tasksObject

Returns the maximum number of queued tasks. This value is set upon initialization and cannot be changed afterwards.

wq = WorkQueue.new()
wq.max_tasks		#=> Infinity
wq = WorkQueue.new(nil,1)
wq.max_tasks		#=> 1


92
93
94
# File 'lib/work_queue.rb', line 92

def max_tasks
    @max_tasks
end

#max_threadsObject

Returns the maximum number of worker threads. This value is set upon initialization and cannot be changed afterwards.

wq = WorkQueue.new()
wq.max_threads		#=> Infinity
wq = WorkQueue.new(1)
wq.max_threads		#=> 1


66
67
68
# File 'lib/work_queue.rb', line 66

def max_threads
    @max_threads
end