Class: Vayacondios::Server::StreamHandler
- Inherits:
-
EventsHandler
- Object
- DocumentHandler
- EventsHandler
- Vayacondios::Server::StreamHandler
- Defined in:
- lib/vayacondios/server/handlers/stream_handler.rb
Instance Attribute Summary collapse
-
#cursor ⇒ Object
readonly
Returns the value of attribute cursor.
-
#on_data ⇒ Object
readonly
Returns the value of attribute on_data.
-
#timer ⇒ Object
readonly
Returns the value of attribute timer.
Attributes inherited from DocumentHandler
Instance Method Summary collapse
- #close_stream! ⇒ Object
- #retrieve(params, query) ⇒ Object
- #stream_data(&on_data) ⇒ Object
- #stream_events ⇒ Object
- #update_cursor(latest) ⇒ Object
Methods inherited from EventsHandler
#base_retrieve, #delete, #search
Methods inherited from DocumentHandler
#action_successful, #base_create, #base_delete, #base_retrieve, #base_search, #base_update, #call, #initialize
Constructor Details
This class inherits a constructor from Vayacondios::Server::DocumentHandler
Instance Attribute Details
#cursor ⇒ Object (readonly)
Returns the value of attribute cursor.
4 5 6 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 4 def cursor @cursor end |
#on_data ⇒ Object (readonly)
Returns the value of attribute on_data.
4 5 6 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 4 def on_data @on_data end |
#timer ⇒ Object (readonly)
Returns the value of attribute timer.
4 5 6 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 4 def timer @timer end |
Instance Method Details
#close_stream! ⇒ Object
17 18 19 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 17 def close_stream! timer.cancel end |
#retrieve(params, query) ⇒ Object
6 7 8 9 10 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 6 def retrieve(params, query) @timer = EM::Synchrony.add_periodic_timer(1){ stream_events } @cursor = Event.receive(params).prepare_search(query) Goliath::Response::STREAMING end |
#stream_data(&on_data) ⇒ Object
12 13 14 15 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 12 def stream_data(&on_data) @on_data = on_data self end |
#stream_events ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 26 def stream_events log.debug 'Streaming events' log.debug " Stream cursor is #{cursor.filter}" available = database.call(:search, cursor, cursor.filter.dup, {}) unless available.empty? available.each do |result| event = Event.new.format_response(result) on_data.call event update_cursor event[:time] end end end |
#update_cursor(latest) ⇒ Object
21 22 23 24 |
# File 'lib/vayacondios/server/handlers/stream_handler.rb', line 21 def update_cursor latest cursor.filter.delete(:_t) cursor.prepare_search(after: latest) end |