Class: QueueToTheFuture::Coordinator

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/queue_to_the_future/coordinator.rb

Overview

The coordinator schedules jobs and maintains the workforce size to match. As jobs get added the coordinator creates workers to complete them. The number of workers created will never exceed maximum_workers.

Instance Method Summary collapse

Constructor Details

#initializeCoordinator

Creates the coordinator.

Note: This is a singleton class. To access the instance use Coordinator::instance.



18
19
20
21
22
23
# File 'lib/queue_to_the_future/coordinator.rb', line 18

def initialize
  @job_queue = Queue.new
  @workforce = []
  
  @workforce.extend(Mutex_m)
end

Instance Method Details

#queue_lengthFixnum

The current length of the job queue

Returns:

  • (Fixnum)

    Length of job queue



28
29
30
# File 'lib/queue_to_the_future/coordinator.rb', line 28

def queue_length
  @job_queue.length
end

#schedule(job) ⇒ Object

Append a QueueToTheFuture::Job to the queue.

If there are workers available, the first available worker will be woken up to perform the QueueToTheFuture::Job. If there are no available workers, one will be created as long as doing so will not cause the workforce to exceed QueueToTheFuture::maximum_workers.

Parameters:



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/queue_to_the_future/coordinator.rb', line 47

def schedule(job)
  
  # If we can't get a lock on the @workforce then the Coordinator is most likely shutting down.
  # We want to skip creating new workers in this case.
  if @job_queue.num_waiting == 0 && @workforce.size < QueueToTheFuture.maximum_workers && @workforce.mu_try_lock
    @workforce.push Thread.new() { while job = @job_queue.shift; job.__execute__; end }
    @workforce.mu_unlock
  end
  
  @job_queue.push(job)
  
  nil
end

#shutdown(force = false) ⇒ Object

Prevents more workers from being created and waits for all jobs to finish. Once the jobs have completed the workers are terminated.

To start up again just QueueToTheFuture::schedule more jobs once this method returns.

Parameters:

  • force (true, false) (defaults to: false)

    If set to true, shutdown immediately and clear the queue without waiting for any jobs to complete.



69
70
71
72
73
74
75
76
# File 'lib/queue_to_the_future/coordinator.rb', line 69

def shutdown(force = false)
  @workforce.mu_synchronize do
    Thread.pass until @job_queue.empty? unless force
    while worker = @workforce.shift; worker.terminate; end
  end
  
  nil
end

#workforce_sizeFixnum

The number of workers being utilized to complete jobs.

Returns:

  • (Fixnum)

    Number of workers



35
36
37
# File 'lib/queue_to_the_future/coordinator.rb', line 35

def workforce_size
  @workforce.size
end