Class: Isono::Util::DeferedMsg
- Inherits:
-
Queue
- Object
- Queue
- Isono::Util::DeferedMsg
- Defined in:
- lib/isono/util.rb
Overview
A utility class to interact success/error message with two different threads. dm = DeferedMsg.new EM.schedule
if condition
dm.success
else
dm.error(RuntimeError.new("fail"))
end
dm.wait rescue abort("got failure")
# will raise RuntimeError in case of error(). dm.wait unless EM.reactor_thread?
Defined Under Namespace
Classes: TimeoutError
Instance Method Summary collapse
- #cancel ⇒ Object
- #error(ex) ⇒ Object
-
#initialize(timeout = 60*15, th = Thread.current) ⇒ DeferedMsg
constructor
A new instance of DeferedMsg.
- #on_timeout(&blk) ⇒ Object
- #success(returnval = true) ⇒ Object
- #wait ⇒ Object
Constructor Details
#initialize(timeout = 60*15, th = Thread.current) ⇒ DeferedMsg
Returns a new instance of DeferedMsg.
169 170 171 172 173 174 175 176 |
# File 'lib/isono/util.rb', line 169 def initialize(timeout=60*15, th=Thread.current) super() @thread_wait = th @timer_sig = EventMachine.add_timer(timeout) { @on_timeout_hook.call if @on_timeout_hook error(TimeoutError.new) } end |
Instance Method Details
#cancel ⇒ Object
193 194 195 196 |
# File 'lib/isono/util.rb', line 193 def cancel EventMachine.cancel_timer(@timer_sig) rescue nil @on_timeout_hook = nil end |
#error(ex) ⇒ Object
183 184 185 186 187 |
# File 'lib/isono/util.rb', line 183 def error(ex) raise TypeError unless ex.is_a?(Exception) self.enq(ex) @thread_called = Thread.current end |
#on_timeout(&blk) ⇒ Object
189 190 191 |
# File 'lib/isono/util.rb', line 189 def on_timeout(&blk) @on_timeout_hook = blk end |
#success(returnval = true) ⇒ Object
178 179 180 181 |
# File 'lib/isono/util.rb', line 178 def success(returnval=true) self.enq(returnval) @thread_called = Thread.current end |
#wait ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/isono/util.rb', line 198 def wait if (@thread_called == @thread_wait || @thread_called == Thread.current ) && self.empty? raise "do success() or error() prior to calling wait()" end ret = self.deq() # requeue the message to distribute to another wait(). self.enq(ret) if ret.is_a?(Exception) raise ret else return ret end ensure @thread_wait = nil self.cancel end |