Class: Pione::DRbPatch::ReplyReader
- Inherits:
-
Object
- Object
- Pione::DRbPatch::ReplyReader
- Defined in:
- lib/pione/patch/drb-patch.rb
Instance Method Summary collapse
-
#add_watcher(watcher) ⇒ Object
Makes reader thread for receiving unordered replies.
-
#initialize ⇒ ReplyReader
constructor
A new instance of ReplyReader.
-
#remove_watcher(watcher) ⇒ Object
Remove the request reader thread watcher.
- #start(protocol) ⇒ Object
Constructor Details
#initialize ⇒ ReplyReader
Returns a new instance of ReplyReader.
24 25 26 27 |
# File 'lib/pione/patch/drb-patch.rb', line 24 def initialize @watcher_lock = Mutex.new @watchers = Set.new end |
Instance Method Details
#add_watcher(watcher) ⇒ Object
Makes reader thread for receiving unordered replies.
56 57 58 59 60 |
# File 'lib/pione/patch/drb-patch.rb', line 56 def add_watcher(watcher) @watcher_lock.synchronize do @watchers << watcher end end |
#remove_watcher(watcher) ⇒ Object
Remove the request reader thread watcher.
63 64 65 66 67 |
# File 'lib/pione/patch/drb-patch.rb', line 63 def remove_watcher(watcher) @watcher_lock.synchronize do @watchers.delete_if {|th| th == watcher} end end |
#start(protocol) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/pione/patch/drb-patch.rb', line 29 def start(protocol) @thread ||= Thread.new do begin # loop for receiving reply and waiting the result while true # receive a replay req_id, succ, result = protocol.recv_reply # register it to waiter table DRbPatch.waiter_table.push(req_id, [succ, result]) end rescue => e @watcher_lock.synchronize do # pass the exception to watchers @watchers.each do |watcher| Log::Debug.communication("connection error happened in receiving reply.") Log::Debug.communication(e) watcher.raise(ReplyReaderError.new(e)) if watcher.alive? end # remove dead watchers @watchers.delete_if {|watcher| not(watcher.alive?)} end end end end |