Class: Vayacondios::Server::StreamHandler

Inherits:
EventsHandler show all
Defined in:
lib/vayacondios/server/handlers/stream_handler.rb

Instance Attribute Summary collapse

Attributes inherited from DocumentHandler

#database, #log

Instance Method Summary collapse

Methods inherited from EventsHandler

#base_retrieve, #delete, #search

Methods inherited from DocumentHandler

#action_successful, #base_create, #base_delete, #base_retrieve, #base_search, #base_update, #call, #initialize

Constructor Details

This class inherits a constructor from Vayacondios::Server::DocumentHandler

Instance Attribute Details

#cursorObject (readonly)

Returns the value of attribute cursor.



4
5
6
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 4

def cursor
  @cursor
end

#on_dataObject (readonly)

Returns the value of attribute on_data.



4
5
6
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 4

def on_data
  @on_data
end

#timerObject (readonly)

Returns the value of attribute timer.



4
5
6
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 4

def timer
  @timer
end

Instance Method Details

#close_stream!Object



17
18
19
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 17

def close_stream!
  timer.cancel
end

#retrieve(params, query) ⇒ Object



6
7
8
9
10
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 6

def retrieve(params, query)
  @timer = EM::Synchrony.add_periodic_timer(1){ stream_events }
  @cursor = Event.receive(params).prepare_search(query)
  Goliath::Response::STREAMING
end

#stream_data(&on_data) ⇒ Object



12
13
14
15
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 12

def stream_data(&on_data)
  @on_data = on_data
  self
end

#stream_eventsObject



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 26

def stream_events
  log.debug 'Streaming events'
  log.debug "  Stream cursor is #{cursor.filter}"
  available = database.call(:search, cursor, cursor.filter.dup, {})
  unless available.empty?
    available.each do |result|
      event = Event.new.format_response(result)
      on_data.call event
      update_cursor event[:time]
    end
  end
end

#update_cursor(latest) ⇒ Object



21
22
23
24
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 21

def update_cursor latest
  cursor.filter.delete(:_t)
  cursor.prepare_search(after: latest)
end