Class: RethinkDB::RQL

Inherits:
Object
  • Object
show all
Defined in:
lib/nobrainer_streams/rethinkdb_monkeypatch.rb

Instance Method Summary collapse

Instance Method Details

#async_run(*args, &b) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/nobrainer_streams/rethinkdb_monkeypatch.rb', line 224

def async_run(*args, &b)
  unbound_if(@body == RQL)
  args = parse(*args, &b)
  if args[:block].is_a?(Proc)
    args[:block] = CallbackHandler.new(args[:block])
  end
  if !args[:block].is_a?(Handler)
    raise ArgumentError, "No handler specified."
  end

  async_handler = args[:async_handler]
  if !async_handler.is_a?(AsyncHandler)
    raise ArgumentError, "No async handler specified."
  end

  # If the user has defined the `on_state` method, we assume they want states.
  if args[:block].respond_to?(:on_state)
    args[:opts] = args[:opts].merge(include_states: true)
  end

  async_handler.callback = args[:block]
  async_handler.connection = args[:conn]
  async_handler.options = args[:opts]

  async_handler.run do
    async_handler.connection.run(@body, async_handler.options, async_handler.handler)
  end
end

#em_run(*args, &b) ⇒ Object



220
221
222
# File 'lib/nobrainer_streams/rethinkdb_monkeypatch.rb', line 220

def em_run(*args, &b)
  async_run(*args, EMHandler, &b)
end

#parse(*args, &b) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/nobrainer_streams/rethinkdb_monkeypatch.rb', line 164

def parse(*args, &b)
  conn = nil
  opts = nil
  block = nil
  async_handler = nil
  args = args.map{|x| x.is_a?(Class) ? x.new : x}
  args.each {|arg|
    case arg
    when RethinkDB::Connection
      raise ArgumentError, "Unexpected second Connection #{arg.inspect}." if conn
      conn = arg
    when Hash
      raise ArgumentError, "Unexpected second Hash #{arg.inspect}." if opts
      opts = arg
    when Proc
      raise ArgumentError, "Unexpected second callback #{arg.inspect}." if block
      block = arg
    when Handler
      raise ArgumentError, "Unexpected second callback #{arg.inspect}." if block
      block = arg
    when AsyncHandler
      raise ArgumentError, "Unexpected second AsyncHandler #{arg.inspect}." if async_handler
      async_handler = arg
    else
      raise ArgumentError, "Unexpected argument #{arg.inspect} " +
        "(got #{args.inspect})."
    end
  }
  conn = @@default_conn if !conn
  opts = {} if !opts
  block = b if !block
  if (tf = opts[:time_format])
    opts[:time_format] = (tf = tf.to_s)
    if tf != 'raw' && tf != 'native'
      raise ArgumentError, "`time_format` must be 'raw' or 'native' (got `#{tf}`)."
    end
  end
  if (gf = opts[:group_format])
    opts[:group_format] = (gf = gf.to_s)
    if gf != 'raw' && gf != 'native'
      raise ArgumentError, "`group_format` must be 'raw' or 'native' (got `#{gf}`)."
    end
  end
  if (bf = opts[:binary_format])
    opts[:binary_format] = (bf = bf.to_s)
    if bf != 'raw' && bf != 'native'
      raise ArgumentError, "`binary_format` must be 'raw' or 'native' (got `#{bf}`)."
    end
  end
  if !conn
    raise ArgumentError, "No connection specified!\n" \
    "Use `query.run(conn)` or `conn.repl(); query.run`."
  end
  {conn: conn, opts: opts, block: block, async_handler: async_handler}
end