Class: ThreadPool

Inherits:
Array
  • Object
show all
Defined in:
lib/thread-pool/pool.rb,
lib/thread-pool/version.rb

Constant Summary collapse

THREAD_LIMIT =

Max number of worker threads available to this pool.

10
REAP_INTERVAL =

Interval at which to poll the workers, at which time any inactive threads are destroyed.

0.002
JOIN_INTERVAL =

Interval at which to poll workers, preventing the parent thread from exiting.

0.002
VERSION =
'0.0.1'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_threads = nil, options = {}, &block) ⇒ ThreadPool

Initialize with max_threads and options. When a block is passed it will be evaluated in context to the thread pool, or the thread pool will be yielded, and the pool will call #join before exiting the block.

Options

:reap_interval  reap interval or REAP_INTERVAL


51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/thread-pool/pool.rb', line 51

def initialize max_threads = nil, options = {}, &block
  @threads, @data = [], []
  @max_threads = max_threads || THREAD_LIMIT
  @reap_interval = options.fetch :reap_interval, REAP_INTERVAL
  if block
    if block.arity > 0
      yield self
    else
      self.instance_eval &block   
    end
    join
  end
end

Instance Attribute Details

#max_threadsObject (readonly)

Maximum threads.



33
34
35
# File 'lib/thread-pool/pool.rb', line 33

def max_threads
  @max_threads
end

#reap_intervalObject (readonly)

Reap interval.



38
39
40
# File 'lib/thread-pool/pool.rb', line 38

def reap_interval
  @reap_interval
end

#threadsObject (readonly)

Live threads.



28
29
30
# File 'lib/thread-pool/pool.rb', line 28

def threads
  @threads
end

Instance Method Details

#<<(object) ⇒ Object

Process any object which must respond to #call in a worker thread.



68
69
70
71
72
# File 'lib/thread-pool/pool.rb', line 68

def << object
  process &lambda { |*args|
    object.call *args
  }
end

#busy?Boolean

Check if any thread pool workers are busy.

Returns:

  • (Boolean)


130
131
132
# File 'lib/thread-pool/pool.rb', line 130

def busy?
  running? && threads.any? { |thread| thread.alive? }
end

#inspectObject

Return inspection string.



84
85
86
# File 'lib/thread-pool/pool.rb', line 84

def inspect
  "#<ThreadPool:#{__id__} max_threads:#{max_threads} threads:#{threads.length}>"
end

#joinObject

Join threads and start workers.



137
138
139
140
# File 'lib/thread-pool/pool.rb', line 137

def join
  start unless running?
  sleep JOIN_INTERVAL while busy?
end

#process(*args, &block) ⇒ Object

Fetch a worker and execute block passing args.



77
78
79
# File 'lib/thread-pool/pool.rb', line 77

def process *args, &block
  @data << [args, block]
end

#running?Boolean

Check if the thread pool is running.

Returns:

  • (Boolean)


123
124
125
# File 'lib/thread-pool/pool.rb', line 123

def running?
  @running
end

#startObject

Run workers.



103
104
105
106
107
108
# File 'lib/thread-pool/pool.rb', line 103

def start
  @running = true
  @data.each do |args, proc|
    worker *args, &proc
  end
end

#stopObject

Stop the thread pool, killing its workers.



113
114
115
116
117
118
# File 'lib/thread-pool/pool.rb', line 113

def stop
  threads.each do |thread|
    thread.kill
  end
  @running = false
end

#worker(*args, &block) ⇒ Object

Run worker thread with args and block.



91
92
93
94
95
96
97
98
# File 'lib/thread-pool/pool.rb', line 91

def worker *args, &block
  unless threads.length > max_threads
    threads << Thread.new(*args, &block)
  else
    sleep REAP_INTERVAL
    reap; retry
  end
end