Class: ActionPool::Pool
- Inherits:
-
Object
- Object
- ActionPool::Pool
- Defined in:
- lib/actionpool/Pool.rb
Instance Method Summary collapse
-
#<<(action) ⇒ Object
- action
-
proc to be executed or array of [proc, [*args]] Add a new proc/lambda to be executed (alias for queue).
-
#action ⇒ Object
Returns the next action to be processed.
-
#action_size ⇒ Object
Number of actions in the queue.
-
#action_timeout ⇒ Object
Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action).
-
#action_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinte) Set maximum allowed time thread may work on a given action.
-
#add_jobs(jobs) ⇒ Object
- jobs
-
Array of proc/lambdas Will queue a list of jobs into the pool.
-
#create_thread(*args) ⇒ Object
- args
-
:force forces a new thread.
-
#fill_pool ⇒ Object
Fills the pool with the minimum number of threads Returns array of created threads.
-
#flush ⇒ Object
Flush the thread pool.
-
#initialize(args = {}) ⇒ Pool
constructor
- :min_threads
- minimum number of threads in pool :max_threads
- maximum number of threads in pool :t_to
- thread timeout waiting for action to process :a_to
- maximum time action may be worked on before aborting :logger
-
logger to print logging messages to Creates a new pool.
-
#max ⇒ Object
Maximum allowed number of threads.
-
#max=(m) ⇒ Object
- m
-
new max Set maximum number of threads.
-
#min ⇒ Object
Minimum allowed number of threads.
-
#min=(m) ⇒ Object
- m
-
new min Set minimum number of threads.
-
#pool_closed? ⇒ Boolean
Pool is closed.
-
#pool_open? ⇒ Boolean
Pool is open.
-
#process(*args, &block) ⇒ Object
- block
-
block to process Adds a block to be processed.
-
#queue(action, *args) ⇒ Object
- action
-
proc to be executed Add a new proc/lambda to be executed.
-
#remove(t) ⇒ Object
- t
-
ActionPool::Thread to remove Removes a thread from the pool.
-
#shutdown(force = false) ⇒ Object
- force
-
force immediate stop Stop the pool.
-
#size ⇒ Object
Current size of pool.
-
#status(arg) ⇒ Object
- arg
-
:open or :closed Set pool status.
-
#thread_timeout ⇒ Object
Maximum number of seconds a thread is allowed to idle in the pool.
-
#thread_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinite) Set maximum allowed time thead may idle in pool.
-
#working ⇒ Object
Returns current number of threads in the pool working.
Constructor Details
#initialize(args = {}) ⇒ Pool
- :min_threads
-
minimum number of threads in pool
- :max_threads
-
maximum number of threads in pool
- :t_to
-
thread timeout waiting for action to process
- :a_to
-
maximum time action may be worked on before aborting
- :logger
-
logger to print logging messages to
Creates a new pool
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/actionpool/Pool.rb', line 18 def initialize(args={}) raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash) @logger = LogHelper.new(args[:logger]) @queue = ActionPool::Queue.new @threads = [] @lock = Mutex.new @thread_timeout = args[:t_to] ? args[:t_to] : 0 @action_timeout = args[:a_to] ? args[:a_to] : 0 @max_threads = args[:max_threads] ? args[:max_threads] : 100 @min_threads = args[:min_threads] ? args[:min_threads] : 10 @min_threads = @max_threads if @max_threads < @min_threads @respond_to = args[:respond_thread] || ::Thread.current @open = true fill_pool end |
Instance Method Details
#<<(action) ⇒ Object
- action
-
proc to be executed or array of [proc, [*args]]
Add a new proc/lambda to be executed (alias for queue)
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/actionpool/Pool.rb', line 100 def <<(action) case action when Proc queue(action) when Array raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array) queue(action[0], action[1]) else raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') end nil end |
#action ⇒ Object
Returns the next action to be processed
241 242 243 |
# File 'lib/actionpool/Pool.rb', line 241 def action @queue.pop end |
#action_size ⇒ Object
Number of actions in the queue
246 247 248 |
# File 'lib/actionpool/Pool.rb', line 246 def action_size @queue.size end |
#action_timeout ⇒ Object
Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action)
215 216 217 |
# File 'lib/actionpool/Pool.rb', line 215 def action_timeout @action_timeout end |
#action_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinte)
Set maximum allowed time thread may work on a given action
232 233 234 235 236 237 238 |
# File 'lib/actionpool/Pool.rb', line 232 def action_timeout=(t) t = t.to_f raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0 @action_timeout = t @threads.each{|thread|thread.action_timeout = t} t end |
#add_jobs(jobs) ⇒ Object
- jobs
-
Array of proc/lambdas
Will queue a list of jobs into the pool
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/actionpool/Pool.rb', line 125 def add_jobs(jobs) raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed? raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array) @queue.pause begin jobs.each do |job| case job when Proc @queue << [job, []] when Array raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array) @queue << [job.shift, job] else raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') end end ensure num = jobs.size - @threads.select{|t|t.waiting?}.size num.times{ create_thread(:nowait) } if num > 0 @queue.unpause end true end |
#create_thread(*args) ⇒ Object
- args
-
:force forces a new thread. :nowait will create a thread if threads are waiting
Create a new thread for pool. Returns newly created thread of nil if pool is at maximum size
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/actionpool/Pool.rb', line 54 def create_thread(*args) return if pool_closed? thread = nil @lock.synchronize do if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force)) thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger) @threads << thread end end return thread end |
#fill_pool ⇒ Object
Fills the pool with the minimum number of threads Returns array of created threads
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/actionpool/Pool.rb', line 68 def fill_pool threads = [] @lock.synchronize do required = min - size if(required > 0) required.times do thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger) @threads << thread threads << thread end end end return threads end |
#flush ⇒ Object
Flush the thread pool. Mainly used for forcibly resizing the pool if existing threads have a long thread life waiting for input.
253 254 255 256 257 258 259 260 |
# File 'lib/actionpool/Pool.rb', line 253 def flush lock = Mutex.new guard = ConditionVariable.new @threads.size.times{ queue{ lock.synchronize{ guard.wait(lock) } } } Thread.pass sleep(0.01) lock.synchronize{ guard.broadcast } end |
#max ⇒ Object
Maximum allowed number of threads
162 163 164 |
# File 'lib/actionpool/Pool.rb', line 162 def max @max_threads end |
#max=(m) ⇒ Object
- m
-
new max
Set maximum number of threads
173 174 175 176 177 178 179 180 |
# File 'lib/actionpool/Pool.rb', line 173 def max=(m) m = m.to_i raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0 @max_threads = m @min_threads = m if m < @min_threads resize if m < size m end |
#min ⇒ Object
Minimum allowed number of threads
167 168 169 |
# File 'lib/actionpool/Pool.rb', line 167 def min @min_threads end |
#min=(m) ⇒ Object
- m
-
new min
Set minimum number of threads
184 185 186 187 188 189 |
# File 'lib/actionpool/Pool.rb', line 184 def min=(m) m = m.to_i raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max @min_threads = m m end |
#pool_closed? ⇒ Boolean
Pool is closed
35 36 37 |
# File 'lib/actionpool/Pool.rb', line 35 def pool_closed? !@open end |
#pool_open? ⇒ Boolean
Pool is open
40 41 42 |
# File 'lib/actionpool/Pool.rb', line 40 def pool_open? @open end |
#process(*args, &block) ⇒ Object
- block
-
block to process
Adds a block to be processed
151 152 153 154 |
# File 'lib/actionpool/Pool.rb', line 151 def process(*args, &block) queue(block, *args) nil end |
#queue(action, *args) ⇒ Object
- action
-
proc to be executed
Add a new proc/lambda to be executed
115 116 117 118 119 120 121 |
# File 'lib/actionpool/Pool.rb', line 115 def queue(action, *args) raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed? raise ArgumentError.new('Expecting block') unless action.is_a?(Proc) @queue << [action, args] ::Thread.pass create_thread end |
#remove(t) ⇒ Object
- t
-
ActionPool::Thread to remove
Removes a thread from the pool
193 194 195 196 197 198 199 200 201 202 |
# File 'lib/actionpool/Pool.rb', line 193 def remove(t) raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread) t.stop if(@threads.include?(t)) @threads.delete(t) return true else return false end end |
#shutdown(force = false) ⇒ Object
- force
-
force immediate stop
Stop the pool
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/actionpool/Pool.rb', line 85 def shutdown(force=false) status(:closed) args = [] args.push(:force) if force @logger.info("Pool is now shutting down #{force ? 'using force' : ''}") @queue.clear if force @queue.wait_empty while(t = @threads.pop) do t.stop(*args) end nil end |
#size ⇒ Object
Current size of pool
157 158 159 |
# File 'lib/actionpool/Pool.rb', line 157 def size @threads.size end |
#status(arg) ⇒ Object
- arg
-
:open or :closed
Set pool status
46 47 48 49 |
# File 'lib/actionpool/Pool.rb', line 46 def status(arg) @open = arg == :open fill_pool if @open end |
#thread_timeout ⇒ Object
Maximum number of seconds a thread is allowed to idle in the pool. (nil means thread life is infinite)
207 208 209 |
# File 'lib/actionpool/Pool.rb', line 207 def thread_timeout @thread_timeout end |
#thread_timeout=(t) ⇒ Object
- t
-
timeout in seconds (nil for infinite)
Set maximum allowed time thead may idle in pool
221 222 223 224 225 226 227 |
# File 'lib/actionpool/Pool.rb', line 221 def thread_timeout=(t) t = t.to_f raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0 @thread_timeout = t @threads.each{|thread|thread.thread_timeout = t} t end |
#working ⇒ Object
Returns current number of threads in the pool working
263 264 265 |
# File 'lib/actionpool/Pool.rb', line 263 def working @threads.find_all{|t|!t.waiting?}.size end |