Class: NoBrainer::QueryRunner::EMDriver::ResponseHandler
- Inherits:
-
RethinkDB::Handler
- Object
- RethinkDB::Handler
- NoBrainer::QueryRunner::EMDriver::ResponseHandler
- Defined in:
- lib/no_brainer/query_runner/em_driver.rb
Defined Under Namespace
Classes: Cursor
Instance Method Summary collapse
- #close_query_handle ⇒ Object
-
#initialize ⇒ ResponseHandler
constructor
A new instance of ResponseHandler.
- #on_array(arr, caller) ⇒ Object
- #on_atom(val, caller) ⇒ Object
- #on_close(caller) ⇒ Object
- #on_dispatch(caller) ⇒ Object
- #on_error(err, caller) ⇒ Object
- #on_open(caller) ⇒ Object
- #on_stream_val(val, caller) ⇒ Object
- #on_unhandled_change(val, caller) ⇒ Object
- #push(v) ⇒ Object
- #response_ready! ⇒ Object
- #set_atom(v) ⇒ Object
- #value ⇒ Object
- #wait_for_response ⇒ Object
Constructor Details
#initialize ⇒ ResponseHandler
Returns a new instance of ResponseHandler.
29 30 31 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 29 def initialize @ready = EventMachine::DefaultDeferrable.new end |
Instance Method Details
#close_query_handle ⇒ Object
33 34 35 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 33 def close_query_handle @query_handle.close end |
#on_array(arr, caller) ⇒ Object
60 61 62 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 60 def on_array(arr, caller) set_atom(arr) end |
#on_atom(val, caller) ⇒ Object
56 57 58 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 56 def on_atom(val, caller) set_atom(val) end |
#on_close(caller) ⇒ Object
45 46 47 48 49 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 45 def on_close(caller) return if @has_atom return on_error(RethinkDB::RqlRuntimeError.new("NoBrainer EM driver: No data received"), caller) unless @has_data @queue ? push(:close) : set_atom([]) end |
#on_dispatch(caller) ⇒ Object
37 38 39 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 37 def on_dispatch(caller) @query_handle = caller end |
#on_error(err, caller) ⇒ Object
51 52 53 54 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 51 def on_error(err, caller) @error = err push(err) end |
#on_open(caller) ⇒ Object
41 42 43 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 41 def on_open(caller) @has_data = true end |
#on_stream_val(val, caller) ⇒ Object
64 65 66 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 64 def on_stream_val(val, caller) push([val]) end |
#on_unhandled_change(val, caller) ⇒ Object
68 69 70 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 68 def on_unhandled_change(val, caller) push([val]) end |
#push(v) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 72 def push(v) raise "internal error: unexpected stream" if @has_atom @queue ||= EventMachine::Queue.new @queue.push(v) response_ready! end |
#response_ready! ⇒ Object
86 87 88 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 86 def response_ready! @ready.succeed(nil) if @ready end |
#set_atom(v) ⇒ Object
79 80 81 82 83 84 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 79 def set_atom(v) raise "internal error: unexpected atom" if @queue @has_atom = true @value = v response_ready! end |
#value ⇒ Object
95 96 97 98 99 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 95 def value wait_for_response raise @error if @error @has_atom ? @value : Cursor.new(self, @queue) end |
#wait_for_response ⇒ Object
90 91 92 93 |
# File 'lib/no_brainer/query_runner/em_driver.rb', line 90 def wait_for_response NoBrainer::QueryRunner::EMDriver.sync { |w| @ready.callback(&w) } if @ready @ready = nil end |