Class: ZK::Threadpool
- Inherits:
-
Object
- Object
- ZK::Threadpool
- Includes:
- Exceptions, Logger
- Defined in:
- lib/zk/threadpool.rb
Overview
a simple threadpool for running blocks of code off the main thread
Constant Summary collapse
- DEFAULT_SIZE =
5
Class Attribute Summary collapse
-
.default_size ⇒ Object
size of the ZK.defer threadpool (defaults to 5).
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
the size of this threadpool.
Instance Method Summary collapse
-
#alive? ⇒ Boolean
are all of our threads alive? returns false if there are no running threads.
-
#defer(callable = nil, &blk) ⇒ Object
Queue an operation to be run on an internal threadpool.
-
#initialize(size = nil) ⇒ Threadpool
constructor
A new instance of Threadpool.
-
#on_exception(&blk) ⇒ Object
register a block to be called back with unhandled exceptions that occur in the threadpool.
-
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads.
- #running? ⇒ Boolean
-
#shutdown(timeout = 2) ⇒ Object
join all threads in this threadpool, they will be given a maximum of +timeout+ seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info).
-
#start! ⇒ Object
starts the threadpool if not already running.
Methods included from Logger
#logger, wrapped_logger, wrapped_logger=
Constructor Details
#initialize(size = nil) ⇒ Threadpool
Returns a new instance of Threadpool.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/zk/threadpool.rb', line 18 def initialize(size=nil) @size = size || self.class.default_size @threadpool = [] @state = :new @queue = [] @mutex = Mutex.new @cond = ConditionVariable.new @error_callbacks = [] start! end |
Class Attribute Details
.default_size ⇒ Object
size of the ZK.defer threadpool (defaults to 5)
11 12 13 |
# File 'lib/zk/threadpool.rb', line 11 def default_size @default_size end |
Instance Attribute Details
#size ⇒ Object (readonly)
the size of this threadpool
16 17 18 |
# File 'lib/zk/threadpool.rb', line 16 def size @size end |
Instance Method Details
#alive? ⇒ Boolean
are all of our threads alive? returns false if there are no running threads
35 36 37 38 39 40 41 42 |
# File 'lib/zk/threadpool.rb', line 35 def alive? @mutex.lock begin !@threadpool.empty? and @threadpool.all?(&:alive?) ensure @mutex.unlock rescue nil end end |
#defer(callable = nil, &blk) ⇒ Object
Queue an operation to be run on an internal threadpool. You may either provide an object that responds_to?(:call) or pass a block. There is no mechanism for retrieving the result of the operation, it is purely fire-and-forget, so the user is expected to make arrangements for this in their code.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/zk/threadpool.rb', line 50 def defer(callable=nil, &blk) callable ||= blk raise ArgumentError, "Argument to Threadpool#defer must respond_to?(:call)" unless callable.respond_to?(:call) @mutex.lock begin @queue << callable @cond.broadcast ensure @mutex.unlock rescue nil end nil end |
#on_exception(&blk) ⇒ Object
if your exception callback block itself raises an exception, I will make fun of you.
register a block to be called back with unhandled exceptions that occur in the threadpool.
156 157 158 159 160 |
# File 'lib/zk/threadpool.rb', line 156 def on_exception(&blk) @mutex.synchronize do @error_callbacks << blk end end |
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/zk/threadpool.rb', line 76 def on_threadpool? tp = nil @mutex.synchronize do return false unless @threadpool # you can't dup nil tp = @threadpool.dup end tp.respond_to?(:include?) and tp.include?(Thread.current) end |
#running? ⇒ Boolean
66 67 68 69 70 71 72 73 |
# File 'lib/zk/threadpool.rb', line 66 def running? @mutex.lock begin @state == :running ensure @mutex.unlock rescue nil end end |
#shutdown(timeout = 2) ⇒ Object
join all threads in this threadpool, they will be given a maximum of +timeout+ seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see http://blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info)
the default timeout is 2 seconds per thread
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/zk/threadpool.rb', line 169 def shutdown(timeout=2) threads = nil @mutex.lock begin return false if @state == :shutdown @state = :shutdown @queue.clear threads, @threadpool = @threadpool, [] @cond.broadcast ensure @mutex.unlock rescue nil end join_all(threads) nil end |
#start! ⇒ Object
starts the threadpool if not already running
88 89 90 91 92 93 94 95 96 |
# File 'lib/zk/threadpool.rb', line 88 def start! @mutex.synchronize do return false if @state == :running @state = :running spawn_threadpool end true end |