Class: Gnip::GnipStream::Stream
- Inherits:
-
Object
- Object
- Gnip::GnipStream::Stream
- Defined in:
- lib/gnip/gnip-stream/stream.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#backfill_client ⇒ Object
Returns the value of attribute backfill_client.
-
#url ⇒ Object
Returns the value of attribute url.
-
#version ⇒ Object
Returns the value of attribute version.
Instance Method Summary collapse
- #configure_handlers ⇒ Object
- #connect ⇒ Object
- #consume(&block) ⇒ Object
- #handle_connection_close(http_connection) ⇒ Object
- #handle_error(http_connection) ⇒ Object
-
#initialize(client) ⇒ Stream
constructor
A new instance of Stream.
- #on_connection_close(&block) ⇒ Object
- #on_error(&block) ⇒ Object
- #on_message(&block) ⇒ Object
- #process_chunk(chunk) ⇒ Object
Constructor Details
#initialize(client) ⇒ Stream
Returns a new instance of Stream.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/gnip/gnip-stream/stream.rb', line 11 def initialize(client) self.version = client.power_track_version case version when '1.0' @url = "https://stream.gnip.com:443/accounts/#{client.account}/publishers/#{client.publisher}/streams/track/#{client.label}.json" when '2.0' @url = "https://gnip-stream.twitter.com/stream/powertrack/accounts/#{client.account}/publishers/#{client.publisher}/#{client.label}.json" else raise Exception.new("version #{version} is not supported from this gem.") end @backfill_client = client.backfill_client @processor = JsonDataBuffer.new("\r\n", Regexp.new(/^\{.*\}\r\n/)) @headers = { 'authorization' => [client.username, client.password], 'accept-encoding' => 'gzip, compressed' } @error_handler = ErrorReconnect.new(self, :consume) @connection_close_handler = ErrorReconnect.new(self, :consume) configure_handlers end |
Instance Attribute Details
#backfill_client ⇒ Object
Returns the value of attribute backfill_client.
9 10 11 |
# File 'lib/gnip/gnip-stream/stream.rb', line 9 def backfill_client @backfill_client end |
#url ⇒ Object
Returns the value of attribute url.
9 10 11 |
# File 'lib/gnip/gnip-stream/stream.rb', line 9 def url @url end |
#version ⇒ Object
Returns the value of attribute version.
9 10 11 |
# File 'lib/gnip/gnip-stream/stream.rb', line 9 def version @version end |
Instance Method Details
#configure_handlers ⇒ Object
29 30 31 32 |
# File 'lib/gnip/gnip-stream/stream.rb', line 29 def configure_handlers on_error { |error| @error_handler.attempt_to_reconnect("Gnip Connection Error. Reason was: #{error.inspect}") } on_connection_close { @connection_close_handler.attempt_to_reconnect('Gnip Connection Closed') } end |
#connect ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/gnip/gnip-stream/stream.rb', line 52 def connect EM.run do = {} = { query: { 'client' => backfill_client } } if backfill_client.present? http = EM::HttpRequest.new(url, inactivity_timeout: 45, connection_timeout: 75).get({ head: @headers }.merge!()) http.stream { |chunk| process_chunk(chunk) } http.callback { handle_connection_close(http) EM.stop } http.errback { handle_error(http) EM.stop } end end |
#consume(&block) ⇒ Object
34 35 36 37 38 |
# File 'lib/gnip/gnip-stream/stream.rb', line 34 def consume(&block) @client_callback = block if block (&@client_callback) connect end |
#handle_connection_close(http_connection) ⇒ Object
80 81 82 |
# File 'lib/gnip/gnip-stream/stream.rb', line 80 def handle_connection_close(http_connection) @on_connection_close.call(http_connection) end |
#handle_error(http_connection) ⇒ Object
76 77 78 |
# File 'lib/gnip/gnip-stream/stream.rb', line 76 def handle_error(http_connection) @on_error.call(http_connection) end |
#on_connection_close(&block) ⇒ Object
44 45 46 |
# File 'lib/gnip/gnip-stream/stream.rb', line 44 def on_connection_close(&block) @on_connection_close = block end |
#on_error(&block) ⇒ Object
48 49 50 |
# File 'lib/gnip/gnip-stream/stream.rb', line 48 def on_error(&block) @on_error = block end |
#on_message(&block) ⇒ Object
40 41 42 |
# File 'lib/gnip/gnip-stream/stream.rb', line 40 def (&block) @on_message = block end |
#process_chunk(chunk) ⇒ Object
69 70 71 72 73 74 |
# File 'lib/gnip/gnip-stream/stream.rb', line 69 def process_chunk(chunk) @processor.process(chunk) @processor.complete_entries.each do |entry| EM.defer { @on_message.call(entry) } end end |