Class: Threaded::Master

Inherits:
Object
  • Object
show all
Includes:
Timeout
Defined in:
lib/threaded/master.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

seconds, 1 minute

60
DEFAULT_SIZE =
16

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Timeout

#timeout

Constructor Details

#initialize(options = {}) ⇒ Master

Returns a new instance of Master.



9
10
11
12
13
14
15
16
17
# File 'lib/threaded/master.rb', line 9

def initialize(options = {})
  @queue    = Queue.new
  @mutex    = Mutex.new
  @stopping = false
  @max      = options[:size]     || DEFAULT_SIZE
  @timeout  = options[:timeout]  || DEFAULT_TIMEOUT
  @logger   = options[:logger]   || Threaded.logger
  @workers  = []
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



4
5
6
# File 'lib/threaded/master.rb', line 4

def logger
  @logger
end

#workersObject (readonly)

Returns the value of attribute workers.



4
5
6
# File 'lib/threaded/master.rb', line 4

def workers
  @workers
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


27
28
29
30
# File 'lib/threaded/master.rb', line 27

def alive?
  return false if workers.empty?
  workers.detect {|w| w.alive? }
end

#enqueue(job, *json) ⇒ Object

Raises:



19
20
21
22
23
24
25
# File 'lib/threaded/master.rb', line 19

def enqueue(job, *json)
  @queue.enq([job, json])

  new_worker if needs_workers? && @queue.size > 0
  raise NoWorkersError unless alive?
  return true
end

#sizeObject



49
50
51
# File 'lib/threaded/master.rb', line 49

def size
  @workers.size
end

#startObject



32
33
34
35
36
# File 'lib/threaded/master.rb', line 32

def start
  return self if alive?
  @max.times { new_worker }
  return self
end

#stop(timeout = 10) ⇒ Object



38
39
40
41
42
43
44
45
46
47
# File 'lib/threaded/master.rb', line 38

def stop(timeout = 10)
  poison
  timeout(timeout, "waiting for workers to stop") do
    while self.alive?
      sleep 0.1
    end
    join
  end
  return self
end