Class: DatWorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/dat-worker-pool.rb,
lib/dat-worker-pool/queue.rb,
lib/dat-worker-pool/worker.rb,
lib/dat-worker-pool/version.rb,
lib/dat-worker-pool/worker_pool_spy.rb

Defined Under Namespace

Modules: Logger, OptionalTimeout Classes: Queue, Worker, WorkerPoolSpy, WorkersWaiting

Constant Summary collapse

VERSION =
"0.5.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min = 0, max = 1, debug = false, &do_work_proc) ⇒ DatWorkerPool

Returns a new instance of DatWorkerPool.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/dat-worker-pool.rb', line 18

def initialize(min = 0, max = 1, debug = false, &do_work_proc)
  @min_workers  = min
  @max_workers  = max
  @debug        = debug
  @logger       = Logger.new(@debug)
  @do_work_proc = do_work_proc

  @queue           = Queue.new
  @workers_waiting = WorkersWaiting.new

  @mutex   = Mutex.new
  @workers = []
  @spawned = 0

  @on_worker_error_callbacks    = []
  @on_worker_start_callbacks    = []
  @on_worker_shutdown_callbacks = [proc{ |worker| despawn_worker(worker) }]
  @on_worker_sleep_callbacks    = [proc{ @workers_waiting.increment }]
  @on_worker_wakeup_callbacks   = [proc{ @workers_waiting.decrement }]
  @before_work_callbacks        = []
  @after_work_callbacks         = []

  @started = false
end

Instance Attribute Details

#after_work_callbacksObject (readonly)

Returns the value of attribute after_work_callbacks.



16
17
18
# File 'lib/dat-worker-pool.rb', line 16

def after_work_callbacks
  @after_work_callbacks
end

#before_work_callbacksObject (readonly)

Returns the value of attribute before_work_callbacks.



16
17
18
# File 'lib/dat-worker-pool.rb', line 16

def before_work_callbacks
  @before_work_callbacks
end

#loggerObject (readonly)

Returns the value of attribute logger.



11
12
13
# File 'lib/dat-worker-pool.rb', line 11

def logger
  @logger
end

#on_worker_error_callbacksObject (readonly)

Returns the value of attribute on_worker_error_callbacks.



13
14
15
# File 'lib/dat-worker-pool.rb', line 13

def on_worker_error_callbacks
  @on_worker_error_callbacks
end

#on_worker_shutdown_callbacksObject (readonly)

Returns the value of attribute on_worker_shutdown_callbacks.



14
15
16
# File 'lib/dat-worker-pool.rb', line 14

def on_worker_shutdown_callbacks
  @on_worker_shutdown_callbacks
end

#on_worker_sleep_callbacksObject (readonly)

Returns the value of attribute on_worker_sleep_callbacks.



15
16
17
# File 'lib/dat-worker-pool.rb', line 15

def on_worker_sleep_callbacks
  @on_worker_sleep_callbacks
end

#on_worker_start_callbacksObject (readonly)

Returns the value of attribute on_worker_start_callbacks.



14
15
16
# File 'lib/dat-worker-pool.rb', line 14

def on_worker_start_callbacks
  @on_worker_start_callbacks
end

#on_worker_wakeup_callbacksObject (readonly)

Returns the value of attribute on_worker_wakeup_callbacks.



15
16
17
# File 'lib/dat-worker-pool.rb', line 15

def on_worker_wakeup_callbacks
  @on_worker_wakeup_callbacks
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/dat-worker-pool.rb', line 12

def queue
  @queue
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



11
12
13
# File 'lib/dat-worker-pool.rb', line 11

def spawned
  @spawned
end

Instance Method Details

#add_work(work_item) ⇒ Object

  • Always check if all workers are busy before pushing the work because ‘@queue.push` can wakeup a worker. If you check after, you can see all workers are busy because one just wokeup to handle what was just pushed. This would cause it to spawn a worker when one isn’t needed.



64
65
66
67
68
69
# File 'lib/dat-worker-pool.rb', line 64

def add_work(work_item)
  return if work_item.nil?
  new_worker_needed = self.all_spawned_workers_are_busy?
  @queue.push work_item
  spawn_worker if @started && new_worker_needed && !reached_max_workers?
end

#after_work(&block) ⇒ Object



108
# File 'lib/dat-worker-pool.rb', line 108

def after_work(&block);  @after_work_callbacks  << block; end

#all_spawned_workers_are_busy?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/dat-worker-pool.rb', line 87

def all_spawned_workers_are_busy?
  @workers_waiting.count <= 0
end

#before_work(&block) ⇒ Object



107
# File 'lib/dat-worker-pool.rb', line 107

def before_work(&block); @before_work_callbacks << block; end

#on_queue_pop(&block) ⇒ Object



98
# File 'lib/dat-worker-pool.rb', line 98

def on_queue_pop(&block);  @queue.on_pop_callbacks  << block; end

#on_queue_pop_callbacksObject



95
# File 'lib/dat-worker-pool.rb', line 95

def on_queue_pop_callbacks;  @queue.on_pop_callbacks;  end

#on_queue_push(&block) ⇒ Object



99
# File 'lib/dat-worker-pool.rb', line 99

def on_queue_push(&block); @queue.on_push_callbacks << block; end

#on_queue_push_callbacksObject



96
# File 'lib/dat-worker-pool.rb', line 96

def on_queue_push_callbacks; @queue.on_push_callbacks; end

#on_worker_error(&block) ⇒ Object



101
# File 'lib/dat-worker-pool.rb', line 101

def on_worker_error(&block);    @on_worker_error_callbacks    << block; end

#on_worker_shutdown(&block) ⇒ Object



103
# File 'lib/dat-worker-pool.rb', line 103

def on_worker_shutdown(&block); @on_worker_shutdown_callbacks << block; end

#on_worker_sleep(&block) ⇒ Object



104
# File 'lib/dat-worker-pool.rb', line 104

def on_worker_sleep(&block);    @on_worker_sleep_callbacks    << block; end

#on_worker_start(&block) ⇒ Object



102
# File 'lib/dat-worker-pool.rb', line 102

def on_worker_start(&block);    @on_worker_start_callbacks    << block; end

#on_worker_wakeup(&block) ⇒ Object



105
# File 'lib/dat-worker-pool.rb', line 105

def on_worker_wakeup(&block);   @on_worker_wakeup_callbacks   << block; end

#queue_empty?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/dat-worker-pool.rb', line 75

def queue_empty?
  @queue.empty?
end

#reached_max_workers?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/dat-worker-pool.rb', line 91

def reached_max_workers?
  @mutex.synchronize{ @spawned >= @max_workers }
end

#shutdown(timeout = nil) ⇒ Object

  • All work on the queue is left on the queue. It’s up to the controlling system to decide how it should handle this.



51
52
53
54
55
56
57
58
# File 'lib/dat-worker-pool.rb', line 51

def shutdown(timeout = nil)
  @started = false
  begin
    OptionalTimeout.new(timeout){ graceful_shutdown }
  rescue TimeoutError
    force_shutdown(timeout, caller)
  end
end

#startObject



43
44
45
46
47
# File 'lib/dat-worker-pool.rb', line 43

def start
  @started = true
  @queue.start
  @min_workers.times{ spawn_worker }
end

#waitingObject



79
80
81
# File 'lib/dat-worker-pool.rb', line 79

def waiting
  @workers_waiting.count
end

#work_itemsObject



71
72
73
# File 'lib/dat-worker-pool.rb', line 71

def work_items
  @queue.work_items
end

#worker_available?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/dat-worker-pool.rb', line 83

def worker_available?
  !reached_max_workers? || @workers_waiting.count > 0
end