Class: ActionPool::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/actionpool/Pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Pool

:min_threads

minimum number of threads in pool

:max_threads

maximum number of threads in pool

:t_to

thread timeout waiting for action to process

:a_to

maximum time action may be worked on before aborting

:logger

logger to print logging messages to

Creates a new pool

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/actionpool/Pool.rb', line 18

def initialize(args={})
    raise ArgumentError.new('Hash required for initialization') unless args.is_a?(Hash)
    @logger = LogHelper.new(args[:logger])
    @queue = ActionPool::Queue.new
    @threads = []
    @lock = Mutex.new
    @thread_timeout = args[:t_to] ? args[:t_to] : 0
    @action_timeout = args[:a_to] ? args[:a_to] : 0
    @max_threads = args[:max_threads] ? args[:max_threads] : 100
    @min_threads = args[:min_threads] ? args[:min_threads] : 10
    @min_threads = @max_threads if @max_threads < @min_threads
    @respond_to = args[:respond_thread] || ::Thread.current
    @open = true
    fill_pool
end

Instance Method Details

#<<(action) ⇒ Object

action

proc to be executed or array of [proc, [*args]]

Add a new proc/lambda to be executed (alias for queue)



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/actionpool/Pool.rb', line 100

def <<(action)
    case action
        when Proc
            queue(action)
        when Array
            raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless action.size == 2 and action[0].is_a?(Proc) and action[1].is_a?(Array)
            queue(action[0], action[1])
        else
            raise ArgumentError.new('Actions to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
    end
    nil
end

#actionObject

Returns the next action to be processed



241
242
243
# File 'lib/actionpool/Pool.rb', line 241

def action
    @queue.pop
end

#action_sizeObject

Number of actions in the queue



246
247
248
# File 'lib/actionpool/Pool.rb', line 246

def action_size
    @queue.size
end

#action_timeoutObject

Maximum number of seconds a thread is allowed to work on a given action (nil means thread is given unlimited time to work on action)



215
216
217
# File 'lib/actionpool/Pool.rb', line 215

def action_timeout
    @action_timeout
end

#action_timeout=(t) ⇒ Object

t

timeout in seconds (nil for infinte)

Set maximum allowed time thread may work on a given action

Raises:

  • (ArgumentError)


232
233
234
235
236
237
238
# File 'lib/actionpool/Pool.rb', line 232

def action_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
    @action_timeout = t
    @threads.each{|thread|thread.action_timeout = t}
    t
end

#add_jobs(jobs) ⇒ Object

jobs

Array of proc/lambdas

Will queue a list of jobs into the pool

Raises:



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/actionpool/Pool.rb', line 125

def add_jobs(jobs)
    raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
    raise ArgumentError.new("Expecting an array but received: #{jobs.class}") unless jobs.is_a?(Array)
    @queue.pause
    begin
        jobs.each do |job|
            case job
            when Proc
                @queue << [job, []]
            when Array
                raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]') unless job.size == 2 and job[0].is_a?(Proc) and job[1].is_a?(Array)
                @queue << [job.shift, job]
            else
                raise ArgumentError.new('Jobs to be processed by the pool must be a proc/lambda or [proc/lambda, [*args]]')
            end
        end
    ensure
        num = jobs.size - @threads.select{|t|t.waiting?}.size
        num.times{ create_thread(:nowait) } if num > 0
        @queue.unpause
    end
    true
end

#create_thread(*args) ⇒ Object

args

:force forces a new thread. :nowait will create a thread if threads are waiting

Create a new thread for pool. Returns newly created thread of nil if pool is at maximum size



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/actionpool/Pool.rb', line 54

def create_thread(*args)
    return if pool_closed?
    thread = nil
    @lock.synchronize do
        if(((size == working || args.include?(:nowait)) && @threads.size < @max_threads) || args.include?(:force))
            thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger)
            @threads << thread
        end
    end
    return thread
end

#fill_poolObject

Fills the pool with the minimum number of threads Returns array of created threads



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/actionpool/Pool.rb', line 68

def fill_pool
    threads = []
    @lock.synchronize do
        required = min - size
        if(required > 0)
            required.times do
                thread = ActionPool::Thread.new(:pool => self, :respond_thread => @respond_to, :a_timeout => @action_timeout, :t_timeout => @thread_timeout, :logger => @logger)
                @threads << thread
                threads << thread
            end
        end
    end
    return threads
end

#flushObject

Flush the thread pool. Mainly used for forcibly resizing the pool if existing threads have a long thread life waiting for input.



253
254
255
256
257
258
259
260
# File 'lib/actionpool/Pool.rb', line 253

def flush
    lock = Mutex.new
    guard = ConditionVariable.new
    @threads.size.times{ queue{ lock.synchronize{ guard.wait(lock) } } }
    Thread.pass
    sleep(0.01)
    lock.synchronize{ guard.broadcast }
end

#maxObject

Maximum allowed number of threads



162
163
164
# File 'lib/actionpool/Pool.rb', line 162

def max
    @max_threads
end

#max=(m) ⇒ Object

m

new max

Set maximum number of threads

Raises:

  • (ArgumentError)


173
174
175
176
177
178
179
180
# File 'lib/actionpool/Pool.rb', line 173

def max=(m)
    m = m.to_i
    raise ArgumentError.new('Maximum value must be greater than 0') unless m > 0
    @max_threads = m
    @min_threads = m if m < @min_threads
    resize if m < size
    m
end

#minObject

Minimum allowed number of threads



167
168
169
# File 'lib/actionpool/Pool.rb', line 167

def min
    @min_threads
end

#min=(m) ⇒ Object

m

new min

Set minimum number of threads

Raises:

  • (ArgumentError)


184
185
186
187
188
189
# File 'lib/actionpool/Pool.rb', line 184

def min=(m)
    m = m.to_i
    raise ArgumentError.new("Minimum value must be greater than 0 and less than or equal to maximum (#{max})") unless m > 0 && m <= max
    @min_threads = m
    m
end

#pool_closed?Boolean

Pool is closed

Returns:

  • (Boolean)


35
36
37
# File 'lib/actionpool/Pool.rb', line 35

def pool_closed?
    !@open
end

#pool_open?Boolean

Pool is open

Returns:

  • (Boolean)


40
41
42
# File 'lib/actionpool/Pool.rb', line 40

def pool_open?
    @open
end

#process(*args, &block) ⇒ Object

block

block to process

Adds a block to be processed



151
152
153
154
# File 'lib/actionpool/Pool.rb', line 151

def process(*args, &block)
    queue(block, *args)
    nil
end

#queue(action, *args) ⇒ Object

action

proc to be executed

Add a new proc/lambda to be executed

Raises:



115
116
117
118
119
120
121
# File 'lib/actionpool/Pool.rb', line 115

def queue(action, *args)
    raise PoolClosed.new("Pool #{self} is currently closed") if pool_closed?
    raise ArgumentError.new('Expecting block') unless action.is_a?(Proc)
    @queue << [action, args]
    ::Thread.pass
    create_thread
end

#remove(t) ⇒ Object

t

ActionPool::Thread to remove

Removes a thread from the pool

Raises:

  • (ArgumentError)


193
194
195
196
197
198
199
200
201
202
# File 'lib/actionpool/Pool.rb', line 193

def remove(t)
    raise ArgumentError.new('Expecting an ActionPool::Thread object') unless t.is_a?(ActionPool::Thread)
    t.stop
    if(@threads.include?(t))
        @threads.delete(t)
        return true
    else
        return false
    end
end

#shutdown(force = false) ⇒ Object

force

force immediate stop

Stop the pool



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/actionpool/Pool.rb', line 85

def shutdown(force=false)
    status(:closed)
    args = []
    args.push(:force) if force
    @logger.info("Pool is now shutting down #{force ? 'using force' : ''}")
    @queue.clear if force
    @queue.wait_empty
    while(t = @threads.pop) do
        t.stop(*args)
    end
    nil
end

#sizeObject

Current size of pool



157
158
159
# File 'lib/actionpool/Pool.rb', line 157

def size
    @threads.size
end

#status(arg) ⇒ Object

arg

:open or :closed

Set pool status



46
47
48
49
# File 'lib/actionpool/Pool.rb', line 46

def status(arg)
    @open = arg == :open
    fill_pool if @open
end

#thread_timeoutObject

Maximum number of seconds a thread is allowed to idle in the pool. (nil means thread life is infinite)



207
208
209
# File 'lib/actionpool/Pool.rb', line 207

def thread_timeout
    @thread_timeout
end

#thread_timeout=(t) ⇒ Object

t

timeout in seconds (nil for infinite)

Set maximum allowed time thead may idle in pool

Raises:

  • (ArgumentError)


221
222
223
224
225
226
227
# File 'lib/actionpool/Pool.rb', line 221

def thread_timeout=(t)
    t = t.to_f
    raise ArgumentError.new('Value must be greater than zero or nil') unless t >= 0
    @thread_timeout = t
    @threads.each{|thread|thread.thread_timeout = t}
    t
end

#workingObject

Returns current number of threads in the pool working



263
264
265
# File 'lib/actionpool/Pool.rb', line 263

def working
    @threads.find_all{|t|!t.waiting?}.size
end