Class: ZK::ThreadedCallback
- Inherits:
-
Object
- Object
- ZK::ThreadedCallback
- Includes:
- Logging
- Defined in:
- lib/zk/threaded_callback.rb
Overview
A class that encapsulates the queue + thread that calls a callback.
Repsonds to call
but places call on a queue to be delivered by a thread.
You will not have a useful return value from call
so this is only useful
for background processing.
Instance Attribute Summary collapse
-
#callback ⇒ Object
readonly
Returns the value of attribute callback.
Instance Method Summary collapse
- #call(*args) ⇒ Object
-
#initialize(callback = nil, &blk) ⇒ ThreadedCallback
constructor
A new instance of ThreadedCallback.
- #running? ⇒ Boolean
- #setup_dispatch_thread ⇒ Object protected
-
#shutdown(timeout = 2) ⇒ Object
how long to wait on thread shutdown before we return.
Constructor Details
#initialize(callback = nil, &blk) ⇒ ThreadedCallback
Returns a new instance of ThreadedCallback.
11 12 13 14 15 16 17 |
# File 'lib/zk/threaded_callback.rb', line 11 def initialize(callback=nil, &blk) @callback = callback || blk @mutex = Monitor.new @queue = Queue.new @running = true setup_dispatch_thread end |
Instance Attribute Details
#callback ⇒ Object (readonly)
Returns the value of attribute callback.
9 10 11 |
# File 'lib/zk/threaded_callback.rb', line 9 def callback @callback end |
Instance Method Details
#call(*args) ⇒ Object
35 36 37 |
# File 'lib/zk/threaded_callback.rb', line 35 def call(*args) @queue.push(args) end |
#running? ⇒ Boolean
19 20 21 |
# File 'lib/zk/threaded_callback.rb', line 19 def running? @mutex.synchronize { @running } end |
#setup_dispatch_thread ⇒ Object (protected)
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/zk/threaded_callback.rb', line 40 def setup_dispatch_thread @thread ||= Thread.new do while running? args = @queue.pop break if args == KILL_TOKEN begin callback.call(*args) rescue Exception => e logger.error { "error caught in handler for path: #{path.inspect}, interests: #{interests.inspect}" } logger.error { e.to_std_format } end end end end |
#shutdown(timeout = 2) ⇒ Object
how long to wait on thread shutdown before we return
24 25 26 27 28 29 30 31 32 33 |
# File 'lib/zk/threaded_callback.rb', line 24 def shutdown(timeout=2) @mutex.synchronize do @running = false @queue.push(KILL_TOKEN) return unless @thread unless @thread.join(2) logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" } end end end |