Class: DatWorkerPool
- Inherits:
-
Object
show all
- Defined in:
- lib/dat-worker-pool.rb,
lib/dat-worker-pool/queue.rb,
lib/dat-worker-pool/worker.rb,
lib/dat-worker-pool/version.rb,
lib/dat-worker-pool/worker_pool_spy.rb
Defined Under Namespace
Modules: Logger, OptionalTimeout
Classes: Queue, Worker, WorkerPoolSpy, WorkersWaiting
Constant Summary
collapse
- VERSION =
"0.5.0"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(min = 0, max = 1, debug = false, &do_work_proc) ⇒ DatWorkerPool
Returns a new instance of DatWorkerPool.
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/dat-worker-pool.rb', line 18
def initialize(min = 0, max = 1, debug = false, &do_work_proc)
@min_workers = min
@max_workers = max
@debug = debug
@logger = Logger.new(@debug)
@do_work_proc = do_work_proc
@queue = Queue.new
@workers_waiting = WorkersWaiting.new
@mutex = Mutex.new
@workers = []
@spawned = 0
@on_worker_error_callbacks = []
@on_worker_start_callbacks = []
@on_worker_shutdown_callbacks = [proc{ |worker| despawn_worker(worker) }]
@on_worker_sleep_callbacks = [proc{ @workers_waiting.increment }]
@on_worker_wakeup_callbacks = [proc{ @workers_waiting.decrement }]
@before_work_callbacks = []
@after_work_callbacks = []
@started = false
end
|
Instance Attribute Details
#after_work_callbacks ⇒ Object
Returns the value of attribute after_work_callbacks.
16
17
18
|
# File 'lib/dat-worker-pool.rb', line 16
def after_work_callbacks
@after_work_callbacks
end
|
#before_work_callbacks ⇒ Object
Returns the value of attribute before_work_callbacks.
16
17
18
|
# File 'lib/dat-worker-pool.rb', line 16
def before_work_callbacks
@before_work_callbacks
end
|
#logger ⇒ Object
Returns the value of attribute logger.
11
12
13
|
# File 'lib/dat-worker-pool.rb', line 11
def logger
@logger
end
|
#on_worker_error_callbacks ⇒ Object
Returns the value of attribute on_worker_error_callbacks.
13
14
15
|
# File 'lib/dat-worker-pool.rb', line 13
def on_worker_error_callbacks
@on_worker_error_callbacks
end
|
#on_worker_shutdown_callbacks ⇒ Object
Returns the value of attribute on_worker_shutdown_callbacks.
14
15
16
|
# File 'lib/dat-worker-pool.rb', line 14
def on_worker_shutdown_callbacks
@on_worker_shutdown_callbacks
end
|
#on_worker_sleep_callbacks ⇒ Object
Returns the value of attribute on_worker_sleep_callbacks.
15
16
17
|
# File 'lib/dat-worker-pool.rb', line 15
def on_worker_sleep_callbacks
@on_worker_sleep_callbacks
end
|
#on_worker_start_callbacks ⇒ Object
Returns the value of attribute on_worker_start_callbacks.
14
15
16
|
# File 'lib/dat-worker-pool.rb', line 14
def on_worker_start_callbacks
@on_worker_start_callbacks
end
|
#on_worker_wakeup_callbacks ⇒ Object
Returns the value of attribute on_worker_wakeup_callbacks.
15
16
17
|
# File 'lib/dat-worker-pool.rb', line 15
def on_worker_wakeup_callbacks
@on_worker_wakeup_callbacks
end
|
#queue ⇒ Object
Returns the value of attribute queue.
12
13
14
|
# File 'lib/dat-worker-pool.rb', line 12
def queue
@queue
end
|
#spawned ⇒ Object
Returns the value of attribute spawned.
11
12
13
|
# File 'lib/dat-worker-pool.rb', line 11
def spawned
@spawned
end
|
Instance Method Details
#add_work(work_item) ⇒ Object
64
65
66
67
68
69
|
# File 'lib/dat-worker-pool.rb', line 64
def add_work(work_item)
return if work_item.nil?
new_worker_needed = self.all_spawned_workers_are_busy?
@queue.push work_item
spawn_worker if @started && new_worker_needed && !reached_max_workers?
end
|
#after_work(&block) ⇒ Object
108
|
# File 'lib/dat-worker-pool.rb', line 108
def after_work(&block); @after_work_callbacks << block; end
|
#all_spawned_workers_are_busy? ⇒ Boolean
87
88
89
|
# File 'lib/dat-worker-pool.rb', line 87
def all_spawned_workers_are_busy?
@workers_waiting.count <= 0
end
|
#before_work(&block) ⇒ Object
107
|
# File 'lib/dat-worker-pool.rb', line 107
def before_work(&block); @before_work_callbacks << block; end
|
#on_queue_pop(&block) ⇒ Object
98
|
# File 'lib/dat-worker-pool.rb', line 98
def on_queue_pop(&block); @queue.on_pop_callbacks << block; end
|
#on_queue_pop_callbacks ⇒ Object
95
|
# File 'lib/dat-worker-pool.rb', line 95
def on_queue_pop_callbacks; @queue.on_pop_callbacks; end
|
#on_queue_push(&block) ⇒ Object
99
|
# File 'lib/dat-worker-pool.rb', line 99
def on_queue_push(&block); @queue.on_push_callbacks << block; end
|
#on_queue_push_callbacks ⇒ Object
96
|
# File 'lib/dat-worker-pool.rb', line 96
def on_queue_push_callbacks; @queue.on_push_callbacks; end
|
#on_worker_error(&block) ⇒ Object
101
|
# File 'lib/dat-worker-pool.rb', line 101
def on_worker_error(&block); @on_worker_error_callbacks << block; end
|
#on_worker_shutdown(&block) ⇒ Object
103
|
# File 'lib/dat-worker-pool.rb', line 103
def on_worker_shutdown(&block); @on_worker_shutdown_callbacks << block; end
|
#on_worker_sleep(&block) ⇒ Object
104
|
# File 'lib/dat-worker-pool.rb', line 104
def on_worker_sleep(&block); @on_worker_sleep_callbacks << block; end
|
#on_worker_start(&block) ⇒ Object
102
|
# File 'lib/dat-worker-pool.rb', line 102
def on_worker_start(&block); @on_worker_start_callbacks << block; end
|
#on_worker_wakeup(&block) ⇒ Object
105
|
# File 'lib/dat-worker-pool.rb', line 105
def on_worker_wakeup(&block); @on_worker_wakeup_callbacks << block; end
|
#queue_empty? ⇒ Boolean
75
76
77
|
# File 'lib/dat-worker-pool.rb', line 75
def queue_empty?
@queue.empty?
end
|
#reached_max_workers? ⇒ Boolean
91
92
93
|
# File 'lib/dat-worker-pool.rb', line 91
def reached_max_workers?
@mutex.synchronize{ @spawned >= @max_workers }
end
|
#shutdown(timeout = nil) ⇒ Object
51
52
53
54
55
56
57
58
|
# File 'lib/dat-worker-pool.rb', line 51
def shutdown(timeout = nil)
@started = false
begin
OptionalTimeout.new(timeout){ graceful_shutdown }
rescue TimeoutError
force_shutdown(timeout, caller)
end
end
|
#start ⇒ Object
43
44
45
46
47
|
# File 'lib/dat-worker-pool.rb', line 43
def start
@started = true
@queue.start
@min_workers.times{ spawn_worker }
end
|
#waiting ⇒ Object
79
80
81
|
# File 'lib/dat-worker-pool.rb', line 79
def waiting
@workers_waiting.count
end
|
#work_items ⇒ Object
71
72
73
|
# File 'lib/dat-worker-pool.rb', line 71
def work_items
@queue.work_items
end
|
#worker_available? ⇒ Boolean
83
84
85
|
# File 'lib/dat-worker-pool.rb', line 83
def worker_available?
!reached_max_workers? || @workers_waiting.count > 0
end
|