Class: Workers::Pool

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/workers/pool.rb

Constant Summary collapse

DEFAULT_POOL_SIZE =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helpers

#concat_e, #log_debug, #log_error, #log_info, #log_warn

Constructor Details

#initialize(options = {}) ⇒ Pool

Returns a new instance of Pool.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/workers/pool.rb', line 9

def initialize(options = {})
  @logger = Workers::LogProxy.new(options[:logger])
  @worker_class = options[:worker_class] || Workers::Worker
  @input_queue = Queue.new
  @lock = Monitor.new
  @workers = Set.new
  @size = 0
  @on_exception = options[:on_exception]

  expand(options[:size] || Workers::Pool::DEFAULT_POOL_SIZE)

  nil
end

Instance Attribute Details

#on_exceptionObject

Returns the value of attribute on_exception.



7
8
9
# File 'lib/workers/pool.rb', line 7

def on_exception
  @on_exception
end

Instance Method Details

#contract(count, &block) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/workers/pool.rb', line 83

def contract(count, &block)
  @lock.synchronize do
    raise Workers::PoolSizeError, 'Count is too large.' if count > @size

    count.times do
      callback = Proc.new do |worker|
        remove_worker(worker)
        block.call if block
      end

      enqueue(:shutdown, callback)
      @size -= 1
    end
  end

  nil
end

#dispose(max_wait = nil, &block) ⇒ Object



53
54
55
56
57
58
59
# File 'lib/workers/pool.rb', line 53

def dispose(max_wait = nil, &block)
  shutdown do
    block.call if block
  end

  join(max_wait)
end

#enqueue(command, data = nil) ⇒ Object



23
24
25
26
27
# File 'lib/workers/pool.rb', line 23

def enqueue(command, data = nil)
  @input_queue.push(Event.new(command, data))

  nil
end

#expand(count) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/workers/pool.rb', line 71

def expand(count)
  @lock.synchronize do
    count.times do
      worker = @worker_class.new(:input_queue => @input_queue, :on_exception => @on_exception, :logger => @logger)
      @workers << worker
      @size += 1
    end
  end

  nil
end

#inspectObject



61
62
63
# File 'lib/workers/pool.rb', line 61

def inspect
  "#<#{self.class.to_s}:0x#{(object_id << 1).to_s(16)} size=#{size}>"
end

#join(max_wait = nil) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/workers/pool.rb', line 45

def join(max_wait = nil)
  results = @workers.map { |w| w.join(max_wait) }
  @workers.clear
  @size = 0

  results
end

#perform(&block) ⇒ Object



29
30
31
32
33
# File 'lib/workers/pool.rb', line 29

def perform(&block)
  enqueue(:perform, block)

  nil
end

#resize(new_size) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/workers/pool.rb', line 101

def resize(new_size)
  @lock.synchronize do
    if new_size > @size
      expand(new_size - @size)
    elsif new_size < @size
      contract(@size - new_size)
    end
  end

  nil
end

#shutdown(&block) ⇒ Object



35
36
37
38
39
40
41
42
43
# File 'lib/workers/pool.rb', line 35

def shutdown(&block)
  @lock.synchronize do
    @size.times do
      enqueue(:shutdown, block)
    end
  end

  nil
end

#sizeObject



65
66
67
68
69
# File 'lib/workers/pool.rb', line 65

def size
  @lock.synchronize do
    @size
  end
end