Class: QueueToTheFuture::Coordinator
- Inherits:
-
Object
- Object
- QueueToTheFuture::Coordinator
- Includes:
- Singleton
- Defined in:
- lib/queue_to_the_future/coordinator.rb
Overview
The coordinator schedules jobs and maintains the workforce size to match. As jobs get added the coordinator creates workers to complete them. The number of workers created will never exceed maximum_workers.
Instance Method Summary collapse
-
#initialize ⇒ Coordinator
constructor
Creates the coordinator.
-
#queue_length ⇒ Fixnum
The current length of the job queue.
-
#schedule(job) ⇒ Object
Append a QueueToTheFuture::Job to the queue.
-
#shutdown(force = false) ⇒ Object
Prevents more workers from being created and waits for all jobs to finish.
-
#workforce_size ⇒ Fixnum
The number of workers being utilized to complete jobs.
Constructor Details
#initialize ⇒ Coordinator
Creates the coordinator.
Note: This is a singleton class. To access the instance use Coordinator::instance.
18 19 20 21 22 23 |
# File 'lib/queue_to_the_future/coordinator.rb', line 18 def initialize @job_queue = Queue.new @workforce = [] @workforce.extend(Mutex_m) end |
Instance Method Details
#queue_length ⇒ Fixnum
The current length of the job queue
28 29 30 |
# File 'lib/queue_to_the_future/coordinator.rb', line 28 def queue_length @job_queue.length end |
#schedule(job) ⇒ Object
Append a QueueToTheFuture::Job to the queue.
If there are workers available, the first available worker will be woken up to perform the QueueToTheFuture::Job. If there are no available workers, one will be created as long as doing so will not cause the workforce to exceed QueueToTheFuture::maximum_workers.
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/queue_to_the_future/coordinator.rb', line 47 def schedule(job) # If we can't get a lock on the @workforce then the Coordinator is most likely shutting down. # We want to skip creating new workers in this case. if @job_queue.num_waiting == 0 && @workforce.size < QueueToTheFuture.maximum_workers && @workforce.mu_try_lock @workforce.push Thread.new() { while job = @job_queue.shift; job.__execute__; end } @workforce.mu_unlock end @job_queue.push(job) nil end |
#shutdown(force = false) ⇒ Object
Prevents more workers from being created and waits for all jobs to finish. Once the jobs have completed the workers are terminated.
To start up again just QueueToTheFuture::schedule more jobs once this method returns.
69 70 71 72 73 74 75 76 |
# File 'lib/queue_to_the_future/coordinator.rb', line 69 def shutdown(force = false) @workforce.mu_synchronize do Thread.pass until @job_queue.empty? unless force while worker = @workforce.shift; worker.terminate; end end nil end |
#workforce_size ⇒ Fixnum
The number of workers being utilized to complete jobs.
35 36 37 |
# File 'lib/queue_to_the_future/coordinator.rb', line 35 def workforce_size @workforce.size end |