Class: NoBrainer::QueryRunner::EMDriver::ResponseHandler

Inherits:
RethinkDB::Handler
  • Object
show all
Defined in:
lib/no_brainer/query_runner/em_driver.rb

Defined Under Namespace

Classes: Cursor

Instance Method Summary collapse

Constructor Details

#initializeResponseHandler

Returns a new instance of ResponseHandler.



29
30
31
# File 'lib/no_brainer/query_runner/em_driver.rb', line 29

def initialize
  @ready = EventMachine::DefaultDeferrable.new
end

Instance Method Details

#close_query_handleObject



33
34
35
# File 'lib/no_brainer/query_runner/em_driver.rb', line 33

def close_query_handle
  @query_handle.close
end

#on_array(arr, caller) ⇒ Object



60
61
62
# File 'lib/no_brainer/query_runner/em_driver.rb', line 60

def on_array(arr, caller)
  set_atom(arr)
end

#on_atom(val, caller) ⇒ Object



56
57
58
# File 'lib/no_brainer/query_runner/em_driver.rb', line 56

def on_atom(val, caller)
  set_atom(val)
end

#on_close(caller) ⇒ Object



45
46
47
48
49
# File 'lib/no_brainer/query_runner/em_driver.rb', line 45

def on_close(caller)
  return if @has_atom
  return on_error(RethinkDB::RqlRuntimeError.new("NoBrainer EM driver: No data received"), caller) unless @has_data
  @queue ? push(:close) : set_atom([])
end

#on_dispatch(caller) ⇒ Object



37
38
39
# File 'lib/no_brainer/query_runner/em_driver.rb', line 37

def on_dispatch(caller)
  @query_handle = caller
end

#on_error(err, caller) ⇒ Object



51
52
53
54
# File 'lib/no_brainer/query_runner/em_driver.rb', line 51

def on_error(err, caller)
  @error = err
  push(err)
end

#on_open(caller) ⇒ Object



41
42
43
# File 'lib/no_brainer/query_runner/em_driver.rb', line 41

def on_open(caller)
  @has_data = true
end

#on_stream_val(val, caller) ⇒ Object



64
65
66
# File 'lib/no_brainer/query_runner/em_driver.rb', line 64

def on_stream_val(val, caller)
  push([val])
end

#on_unhandled_change(val, caller) ⇒ Object



68
69
70
# File 'lib/no_brainer/query_runner/em_driver.rb', line 68

def on_unhandled_change(val, caller)
  push([val])
end

#push(v) ⇒ Object



72
73
74
75
76
77
# File 'lib/no_brainer/query_runner/em_driver.rb', line 72

def push(v)
  raise "internal error: unexpected stream" if @has_atom
  @queue ||= EventMachine::Queue.new
  @queue.push(v)
  response_ready!
end

#response_ready!Object



86
87
88
# File 'lib/no_brainer/query_runner/em_driver.rb', line 86

def response_ready!
  @ready.succeed(nil) if @ready
end

#set_atom(v) ⇒ Object



79
80
81
82
83
84
# File 'lib/no_brainer/query_runner/em_driver.rb', line 79

def set_atom(v)
  raise "internal error: unexpected atom" if @queue
  @has_atom = true
  @value = v
  response_ready!
end

#valueObject

Raises:

  • (@error)


95
96
97
98
99
# File 'lib/no_brainer/query_runner/em_driver.rb', line 95

def value
  wait_for_response
  raise @error if @error
  @has_atom ? @value : Cursor.new(self, @queue)
end

#wait_for_responseObject



90
91
92
93
# File 'lib/no_brainer/query_runner/em_driver.rb', line 90

def wait_for_response
  NoBrainer::QueryRunner::EMDriver.sync { |w| @ready.callback(&w) } if @ready
  @ready = nil
end