Class: Gnip::GnipStream::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/gnip/gnip-stream/stream.rb

Direct Known Subclasses

Replay

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Stream

Returns a new instance of Stream.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/gnip/gnip-stream/stream.rb', line 11

def initialize(client)
  self.version = client.power_track_version
  case version
  when '1.0'
    @url = "https://stream.gnip.com:443/accounts/#{client.}/publishers/#{client.publisher}/streams/track/#{client.label}.json"
  when '2.0'
    @url = "https://gnip-stream.twitter.com/stream/powertrack/accounts/#{client.}/publishers/#{client.publisher}/#{client.label}.json"
  else
    raise Exception.new("version #{version} is not supported from this gem.")
  end
  @backfill_client = client.backfill_client
  @processor  = JsonDataBuffer.new("\r\n", Regexp.new(/^\{.*\}\r\n/))
  @headers    = { 'authorization' => [client.username, client.password], 'accept-encoding' => 'gzip, compressed' }
  @error_handler = ErrorReconnect.new(self, :consume)
  @connection_close_handler = ErrorReconnect.new(self, :consume)
  configure_handlers
end

Instance Attribute Details

#backfill_clientObject

Returns the value of attribute backfill_client.



9
10
11
# File 'lib/gnip/gnip-stream/stream.rb', line 9

def backfill_client
  @backfill_client
end

#urlObject

Returns the value of attribute url.



9
10
11
# File 'lib/gnip/gnip-stream/stream.rb', line 9

def url
  @url
end

#versionObject

Returns the value of attribute version.



9
10
11
# File 'lib/gnip/gnip-stream/stream.rb', line 9

def version
  @version
end

Instance Method Details

#configure_handlersObject



29
30
31
32
# File 'lib/gnip/gnip-stream/stream.rb', line 29

def configure_handlers
  on_error { |error| @error_handler.attempt_to_reconnect("Gnip Connection Error. Reason was: #{error.inspect}") }
  on_connection_close { @connection_close_handler.attempt_to_reconnect('Gnip Connection Closed') }
end

#connectObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/gnip/gnip-stream/stream.rb', line 52

def connect
  EM.run do
    options = {}
    options = { query: { 'client' => backfill_client } } if backfill_client.present?
    http = EM::HttpRequest.new(url, inactivity_timeout: 45, connection_timeout: 75).get({ head: @headers }.merge!(options))
    http.stream { |chunk| process_chunk(chunk) }
    http.callback {
      handle_connection_close(http)
      EM.stop
    }
    http.errback {
      handle_error(http)
      EM.stop
    }
  end
end

#consume(&block) ⇒ Object



34
35
36
37
38
# File 'lib/gnip/gnip-stream/stream.rb', line 34

def consume(&block)
  @client_callback = block if block
  on_message(&@client_callback)
  connect
end

#handle_connection_close(http_connection) ⇒ Object



80
81
82
# File 'lib/gnip/gnip-stream/stream.rb', line 80

def handle_connection_close(http_connection)
  @on_connection_close.call(http_connection)
end

#handle_error(http_connection) ⇒ Object



76
77
78
# File 'lib/gnip/gnip-stream/stream.rb', line 76

def handle_error(http_connection)
  @on_error.call(http_connection)
end

#on_connection_close(&block) ⇒ Object



44
45
46
# File 'lib/gnip/gnip-stream/stream.rb', line 44

def on_connection_close(&block)
  @on_connection_close = block
end

#on_error(&block) ⇒ Object



48
49
50
# File 'lib/gnip/gnip-stream/stream.rb', line 48

def on_error(&block)
  @on_error = block
end

#on_message(&block) ⇒ Object



40
41
42
# File 'lib/gnip/gnip-stream/stream.rb', line 40

def on_message(&block)
  @on_message = block
end

#process_chunk(chunk) ⇒ Object



69
70
71
72
73
74
# File 'lib/gnip/gnip-stream/stream.rb', line 69

def process_chunk(chunk)
  @processor.process(chunk)
  @processor.complete_entries.each do |entry|
    EM.defer { @on_message.call(entry) }
  end
end