Class: GnipApi::Apis::PowerTrack::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/gnip_api/apis/power_track/stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Stream

Returns a new instance of Stream.

[View source]

7
8
9
10
11
12
# File 'lib/gnip_api/apis/power_track/stream.rb', line 7

def initialize params = {}
  @stream = params[:stream] || GnipApi.config.label
  @output_format = GnipApi.config.stream_output_format
  raise GnipApi::Errors::Configuration::InvalidOutputFormat unless GnipApi::Configuration::OUTPUT_FORMATS.include?(@output_format)
  set_config
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.


5
6
7
# File 'lib/gnip_api/apis/power_track/stream.rb', line 5

def adapter
  @adapter
end

Instance Method Details

#build_message(params) ⇒ Object

[View source]

48
49
50
# File 'lib/gnip_api/apis/power_track/stream.rb', line 48

def build_message params
  Gnip::Message.build(params)
end

#consumeObject

[View source]

18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/gnip_api/apis/power_track/stream.rb', line 18

def consume
  request = create_request
  adapter.stream_get request do |chunk|
    @buffer.insert! chunk
    begin
      yield process_entries(@buffer.read!)
    rescue Exception => e
      puts e.class
      puts e.message
      puts e.backtrace[0..10].join("\n")
      raise e
    end
  end
end

#log_system_messages(entries) ⇒ Object

[View source]

42
43
44
45
46
# File 'lib/gnip_api/apis/power_track/stream.rb', line 42

def log_system_messages entries
  entries.select{|message| message.system_message? }.each do |system_message|
    GnipApi.logger.send(system_message.log_method, system_message.message)
  end
end

#loggerObject

[View source]

14
15
16
# File 'lib/gnip_api/apis/power_track/stream.rb', line 14

def logger
  GnipApi.logger
end

#parse_json(json) ⇒ Object

[View source]

52
53
54
55
56
57
58
# File 'lib/gnip_api/apis/power_track/stream.rb', line 52

def parse_json json
  begin 
    GnipApi::JsonParser.new.parse json
  rescue GnipApi::Errors::JsonParser::ParseError
    nil
  end
end

#process_entries(entries) ⇒ Object

[View source]

33
34
35
36
37
38
39
40
# File 'lib/gnip_api/apis/power_track/stream.rb', line 33

def process_entries entries
  return entries if @output_format == :json
  return entries.map{|e| parse_json(e)}.compact if @output_format == :parsed_json
  data = entries.map{|e| parse_json(e)}.compact
  data.map!{|e| build_message(e)} 
  log_system_messages(data)
  return data
end