Class: ThreadedInMemoryQueue::Master

Inherits:
Object
  • Object
show all
Extended by:
Inline
Includes:
Timeout
Defined in:
lib/threaded_in_memory_queue/master.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

seconds, 1 minute

60
DEFAULT_SIZE =
16

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Inline

inline, inline=, inline?

Methods included from Timeout

#timeout

Constructor Details

#initialize(options = {}) ⇒ Master

Returns a new instance of Master.



10
11
12
13
14
15
16
# File 'lib/threaded_in_memory_queue/master.rb', line 10

def initialize(options = {})
  @queue   = Queue.new
  @size    = options[:size]    || DEFAULT_SIZE
  @timeout = options[:timeout] || DEFAULT_TIMEOUT
  @logger  = options[:logger]  || ThreadedInMemoryQueue.logger
  @workers = []
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



4
5
6
# File 'lib/threaded_in_memory_queue/master.rb', line 4

def logger
  @logger
end

#workersObject (readonly)

Returns the value of attribute workers.



4
5
6
# File 'lib/threaded_in_memory_queue/master.rb', line 4

def workers
  @workers
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


55
56
57
58
# File 'lib/threaded_in_memory_queue/master.rb', line 55

def alive?
  return false if workers.empty?
  workers.detect {|w| w.alive? }
end

#enqueue(klass, *json) ⇒ Object



45
46
47
48
49
50
51
52
53
# File 'lib/threaded_in_memory_queue/master.rb', line 45

def enqueue(klass, *json)
  if self.class.inline?
    klass.call(*json)
  else
    raise NoWorkersError, "No workers" unless alive?
    @queue.enq([klass, json])
  end
  return true
end

#joinObject



24
25
26
27
# File 'lib/threaded_in_memory_queue/master.rb', line 24

def join
  workers.each {|w| w.join }
  return self
end

#poisonObject



29
30
31
32
# File 'lib/threaded_in_memory_queue/master.rb', line 29

def poison
  workers.each {|w| w.poison }
  return self
end

#startObject



18
19
20
21
22
# File 'lib/threaded_in_memory_queue/master.rb', line 18

def start
  return self if alive?
  @size.times { @workers << Worker.new(@queue, timeout: @timeout).start }
  return self
end

#stop(timeout = 10) ⇒ Object



34
35
36
37
38
39
40
41
42
43
# File 'lib/threaded_in_memory_queue/master.rb', line 34

def stop(timeout = 10)
  poison
  timeout(timeout, "waiting for workers to stop") do
    while self.alive?
      sleep 0.1
    end
    self.join
  end
  return self
end