Class: RethinkDB::QueryHandle

Inherits:
Object
  • Object
show all
Defined in:
lib/nobrainer_streams/rethinkdb_monkeypatch.rb

Instance Method Summary collapse

Instance Method Details

#callback(res) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/nobrainer_streams/rethinkdb_monkeypatch.rb', line 48

def callback(res)
  begin
    if @handler.stopped?
      @closed = true
      @conn.stop(@token)
      return
    elsif res
      is_cfeed = (res['n'] & [Response::ResponseNote::SEQUENCE_FEED,
                              Response::ResponseNote::ATOM_FEED,
                              Response::ResponseNote::ORDER_BY_LIMIT_FEED,
                              Response::ResponseNote::UNIONED_FEED]) != []

      case res['t']
      when Response::ResponseType::SUCCESS_PARTIAL,
           Response::ResponseType::SUCCESS_SEQUENCE
        guarded_async_run do
          handle_open
          if res['t'] == Response::ResponseType::SUCCESS_PARTIAL
            @conn.register_query(@token, @all_opts, self) if !@conn.closed?
            @conn.dispatch([Query::QueryType::CONTINUE], @token) if !@conn.closed?
          end
          Shim.response_to_native(res, @msg, @all_opts).each do |row|
            if is_cfeed
              if (row.has_key?('new_val') && row.has_key?('old_val') &&
                  @handler.respond_to?(:on_change))
                handle(:on_change, row['old_val'], row['new_val'])
              elsif (row.has_key?('new_val') && !row.has_key?('old_val') &&
                     @handler.respond_to?(:on_initial_val))
                handle(:on_initial_val, row['new_val'])
              elsif (row.has_key?('old_val') && !row.has_key?('new_val') &&
                     @handler.respond_to?(:on_uninitial_val))
                handle(:on_uninitial_val, row['old_val'])
              elsif row.has_key?('error') && @handler.respond_to?(:on_change_error)
                handle(:on_change_error, row['error'])
              elsif row.has_key?('state') && @handler.respond_to?(:on_state)
                handle(:on_state, row['state'])
              else
                handle(:on_unhandled_change, row)
              end
            else
              handle(:on_stream_val, row)
            end
          end
          if res['t'] == Response::ResponseType::SUCCESS_SEQUENCE ||
              @conn.closed?
            handle_close
          end
        end
      when Response::ResponseType::SUCCESS_ATOM
        guarded_async_run do
          return if @closed
          handle_open
          val = Shim.response_to_native(res, @msg, @all_opts)
          if val.is_a?(Array)
            handle(:on_array, val)
          else
            handle(:on_atom, val)
          end
          handle_close
        end
      when Response::ResponseType::WAIT_COMPLETE
        guarded_async_run do
          return if @closed
          handle_open
          handle(:on_wait_complete)
          handle_close
        end
      else
        exc = nil
        begin
          exc = Shim.response_to_native(res, @msg, @all_opts)
        rescue Exception => e
          exc = e
        end
        guarded_async_run do
          return if @closed
          handle_open
          handle(:on_error, e)
          handle_close
        end
      end
    else
      guarded_async_run {
        return if @closed
        handle_close
      }
    end
  rescue Exception => e
    guarded_async_run do
      return if @closed
      handle_open
      handle(:on_error, e)
      handle_close
    end
  end
end

#guarded_async_run(&b) ⇒ Object

Override this method with an async dispatch, making sure that when the block is run @closed == false



44
45
46
# File 'lib/nobrainer_streams/rethinkdb_monkeypatch.rb', line 44

def guarded_async_run(&b)
  raise "Must override QueryHandle#guarded_async_run"
end