Class: ZK::ThreadedCallback

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/zk/threaded_callback.rb

Overview

A class that encapsulates the queue + thread that calls a callback. Repsonds to call but places call on a queue to be delivered by a thread. You will not have a useful return value from call so this is only useful for background processing.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(callback = nil, &blk) ⇒ ThreadedCallback

Returns a new instance of ThreadedCallback.



11
12
13
14
15
16
17
# File 'lib/zk/threaded_callback.rb', line 11

def initialize(callback=nil, &blk)
  @callback = callback || blk
  @mutex = Monitor.new
  @queue = Queue.new
  @running = true
  setup_dispatch_thread
end

Instance Attribute Details

#callbackObject (readonly)

Returns the value of attribute callback.



9
10
11
# File 'lib/zk/threaded_callback.rb', line 9

def callback
  @callback
end

Instance Method Details

#call(*args) ⇒ Object



35
36
37
# File 'lib/zk/threaded_callback.rb', line 35

def call(*args)
  @queue.push(args)
end

#running?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/zk/threaded_callback.rb', line 19

def running?
  @mutex.synchronize { @running }
end

#setup_dispatch_threadObject (protected)



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/zk/threaded_callback.rb', line 40

def setup_dispatch_thread
  @thread ||= Thread.new do
    while running?
      args = @queue.pop
      break if args == KILL_TOKEN
      begin
        callback.call(*args)
      rescue Exception => e
        logger.error { "error caught in handler for path: #{path.inspect}, interests: #{interests.inspect}" }
        logger.error { e.to_std_format }
      end
    end
  end
end

#shutdown(timeout = 2) ⇒ Object

how long to wait on thread shutdown before we return



24
25
26
27
28
29
30
31
32
33
# File 'lib/zk/threaded_callback.rb', line 24

def shutdown(timeout=2)
  @mutex.synchronize do
    @running = false
    @queue.push(KILL_TOKEN)
    return unless @thread 
    unless @thread.join(2)
      logger.error { "#{self.class} timed out waiting for dispatch thread, callback: #{callback.inspect}" }
    end
  end
end