Class: GnipApi::Apis::PowerTrack::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/gnip_api/apis/power_track/stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Stream

Returns a new instance of Stream.

[View source]

7
8
9
10
11
# File 'lib/gnip_api/apis/power_track/stream.rb', line 7

def initialize params = {}
  @stream = params[:stream] || GnipApi.config.label
  @source = params[:source] || GnipApi.config.source
  set_config
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.


5
6
7
# File 'lib/gnip_api/apis/power_track/stream.rb', line 5

def adapter
  @adapter
end

Instance Method Details

#build_message(params) ⇒ Object

[View source]

45
46
47
# File 'lib/gnip_api/apis/power_track/stream.rb', line 45

def build_message params
  Gnip::Message.build(params)
end

#consumeObject

[View source]

17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/gnip_api/apis/power_track/stream.rb', line 17

def consume
  request = create_request
  adapter.stream_get request do |chunk|
    @buffer.insert! chunk
    begin
      yield process_entries(@buffer.read!)
    rescue Exception => e
      puts e.class
      puts e.message
      puts e.backtrace[0..10].join("\n")
      raise e
    end
  end
end

#log_system_messages(entries) ⇒ Object

[View source]

39
40
41
42
43
# File 'lib/gnip_api/apis/power_track/stream.rb', line 39

def log_system_messages entries
  entries.select{|message| message.system_message? }.each do |system_message|
    GnipApi.logger.send(system_message.log_method, system_message.message)
  end
end

#loggerObject

[View source]

13
14
15
# File 'lib/gnip_api/apis/power_track/stream.rb', line 13

def logger
  GnipApi.logger
end

#parse_json(json) ⇒ Object

[View source]

49
50
51
52
53
54
55
# File 'lib/gnip_api/apis/power_track/stream.rb', line 49

def parse_json json
  begin 
    GnipApi::JsonParser.parse json
  rescue GnipApi::Errors::JsonParser::ParseError
    nil
  end
end

#process_entries(entries) ⇒ Object

[View source]

32
33
34
35
36
37
# File 'lib/gnip_api/apis/power_track/stream.rb', line 32

def process_entries entries
  entries.map!{|e| parse_json(e)}.compact!
  entries.map!{|e| build_message(e)}
  log_system_messages(entries)
  entries
end