Class: PowerTrack::Stream

Inherits:
Object
  • Object
show all
Includes:
API, VoidLogger::LoggerMixin
Defined in:
lib/powertrack/streaming/stream.rb

Overview

A PowerTrack stream to be used for both updating the rules and collecting new messages.

Constant Summary collapse

FEATURE_URL_FORMAT =

The format of the URL to connect to the stream service

hostname, domain, feature, stream type, account, source, label, sub-feature
'https://gnip-%s.%s.com/%s/%s/accounts/%s/publishers/%s/%s%s.json'.freeze
DEFAULT_CONNECTION_TIMEOUT =

The default timeout on a connection to PowerTrack. Can be overriden per call.

30
DEFAULT_INACTIVITY_TIMEOUT =

The default timeout for inactivity on a connection to PowerTrack. Can be overriden per call.

50
DEFAULT_STREAM_OPTIONS =

The default options for using the stream.

{
  # override the default connection timeout
  connect_timeout: DEFAULT_CONNECTION_TIMEOUT,
  # override the default inactivity timeout
  inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT,
  # enable the replay mode to get activities over the last 5 days
  # see http://support.gnip.com/apis/replay/api_reference.html
  replay: false
}.freeze
DEFAULT_OK_RESPONSE_STATUS =
200
HEARTBEAT_MESSAGE_PATTERN =

The patterns used to identify the various types of message received from GNIP everything else is an activity

/\A\s*\z/.freeze
SYSTEM_MESSAGE_PATTERN =
/\A\s*\{\s*"(info|warn|error)":/mi.freeze
REPLAY_TIMESTAMP_FORMAT =

The format used to send UTC timestamps in Replay mode

'%Y%m%d%H%M'.freeze
DEFAULT_LIST_RULES_OPTIONS =
{
  compressed: true,
  objectify: true
}.freeze
DEFAULT_TRACK_OPTIONS =
{
  # receive GZip-compressed payloads ?
  compressed: true,
  # max number of retries after a disconnection
  max_retries: 2,
  # advanced options to configure exponential backoff used for retries
  backoff: nil,
  # max number of seconds to wait for last message handlers to complete
  stop_timeout: 10,
  # pass message in raw form (JSON formatted string) instead of JSON-decoded
  # Ruby objects to message handlers
  raw: false,
  # the starting date from which the activities will be recovered (replay mode only)
  from: nil,
  # the ending date to which the activities will be recovered (replay mode only)
  to: nil,
  # specify a number of minutes to leverage the Backfill feature
  backfill_minutes: nil,
  # called for each message received, except heartbeats
  on_message: nil,
  # called for each activity received
  on_activity: nil,
  # called for each system message received
  on_system: nil,
  # called for each heartbeat received
  on_heartbeat: nil,
  # called periodically to detect if the tracked has to be closed
  close_now: nil
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from API

#add_rule, #delete_rule

Constructor Details

#initialize(username, password, account_name, data_source, label, options = nil) ⇒ Stream

Returns a new instance of Stream.



54
55
56
57
58
59
60
61
62
# File 'lib/powertrack/streaming/stream.rb', line 54

def initialize(username, password, , data_source, label, options=nil)
  @username = username
  @password = password
  @account_name = 
  @data_source = data_source
  @label = label
  @options = DEFAULT_STREAM_OPTIONS.merge(options || {})
  @replay = !!@options[:replay]
end

Instance Attribute Details

#account_nameObject (readonly)

Returns the value of attribute account_name.



52
53
54
# File 'lib/powertrack/streaming/stream.rb', line 52

def 
  @account_name
end

#data_sourceObject (readonly)

Returns the value of attribute data_source.



52
53
54
# File 'lib/powertrack/streaming/stream.rb', line 52

def data_source
  @data_source
end

#labelObject (readonly)

Returns the value of attribute label.



52
53
54
# File 'lib/powertrack/streaming/stream.rb', line 52

def label
  @label
end

#usernameObject (readonly)

Returns the value of attribute username.



52
53
54
# File 'lib/powertrack/streaming/stream.rb', line 52

def username
  @username
end

Instance Method Details

#add_rules(*rules) ⇒ Object

Adds many rules to your PowerTrack stream’s ruleset.

POST /rules

See support.gnip.com/apis/powertrack/api_reference.html#AddRules



69
70
71
72
73
74
# File 'lib/powertrack/streaming/stream.rb', line 69

def add_rules(*rules)
  # flatten the rules in case it was provided as an array
  make_rules_request(:post,
    body: MultiJson.encode('rules' => rules.flatten),
    ok: 201)
end

#delete_rules(*rules) ⇒ Object

Removes the specified rules from the stream.

DELETE /rules

See support.gnip.com/apis/powertrack/api_reference.html#DeleteRules



81
82
83
84
85
86
87
88
89
90
# File 'lib/powertrack/streaming/stream.rb', line 81

def delete_rules(*rules)
  # v2 does not use DELETE anymore
  delete_verb = :post
  # flatten the rules in case it was provided as an array
  delete_options = { body: MultiJson.encode('rules' => rules.flatten) }
  # v2 uses a query parameter
  delete_options[:query] = { '_method' => 'delete' }

  make_rules_request(delete_verb, delete_options)
end

#list_rules(options = nil) ⇒ Object

Retrieves all existing rules for a stream.

Returns an array of PowerTrack::Rule objects when the response permits so.

GET /rules

See support.gnip.com/apis/powertrack/api_reference.html#ListRules



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/powertrack/streaming/stream.rb', line 104

def list_rules(options=nil)
  options = DEFAULT_LIST_RULES_OPTIONS.merge(options || {})
  res = make_rules_request(:get, headers: gzip_compressed_header(options[:compressed]))

  # return Rule objects when required and feasible/appropriate
  if options[:objectify] &&
     res.is_a?(Hash) &&
     (rules = res['rules']).is_a?(Array) &&
     rules.all? { |rule| rule.is_a?(Hash) && rule.key?('value') }
    rules.map do |rule|
      PowerTrack::Rule.new(rule['value'], tag: rule['tag'], id: rule['id'])
    end
  else
    res
  end
end

#track(options = nil) ⇒ Object

Establishes a persistent connection to the PowerTrack data stream, through which the social data will be delivered.

Manages reconnections when being disconnected.

GET /track/:stream

See support.gnip.com/apis/powertrack/api_reference.html#Stream



159
160
161
162
163
# File 'lib/powertrack/streaming/stream.rb', line 159

def track(options=nil)
  options = DEFAULT_TRACK_OPTIONS.merge(options || {})
  retrier = PowerTrack::Retrier.new(options[:max_retries])
  handle_api_response(*retrier.retry { track_once(options, retrier) })
end