Class: PowerTrack::Stream
- Inherits:
-
Object
- Object
- PowerTrack::Stream
- Includes:
- API, VoidLogger::LoggerMixin
- Defined in:
- lib/powertrack/streaming/stream.rb
Overview
A PowerTrack stream to be used for both updating the rules and collecting new messages.
Constant Summary collapse
- FEATURE_URL_FORMAT =
The format of the URL to connect to the stream service
- hostname, domain, feature, stream type, account, source, label, sub-feature
'https://gnip-%s.%s.com/%s/%s/accounts/%s/publishers/%s/%s%s.json'.freeze
- DEFAULT_CONNECTION_TIMEOUT =
The default timeout on a connection to PowerTrack. Can be overriden per call.
30
- DEFAULT_INACTIVITY_TIMEOUT =
The default timeout for inactivity on a connection to PowerTrack. Can be overriden per call.
50
- DEFAULT_STREAM_OPTIONS =
The default options for using the stream.
{ # override the default connection timeout connect_timeout: DEFAULT_CONNECTION_TIMEOUT, # override the default inactivity timeout inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT, # enable the replay mode to get activities over the last 5 days # see http://support.gnip.com/apis/replay/api_reference.html replay: false }.freeze
- DEFAULT_OK_RESPONSE_STATUS =
200
- HEARTBEAT_MESSAGE_PATTERN =
The patterns used to identify the various types of message received from GNIP everything else is an activity
/\A\s*\z/.freeze
- SYSTEM_MESSAGE_PATTERN =
/\A\s*\{\s*"(info|warn|error)":/mi.freeze
- REPLAY_TIMESTAMP_FORMAT =
The format used to send UTC timestamps in Replay mode
'%Y%m%d%H%M'.freeze
- DEFAULT_LIST_RULES_OPTIONS =
{ compressed: true, objectify: true }.freeze
- DEFAULT_TRACK_OPTIONS =
{ # receive GZip-compressed payloads ? compressed: true, # max number of retries after a disconnection max_retries: 2, # advanced options to configure exponential backoff used for retries backoff: nil, # max number of seconds to wait for last message handlers to complete stop_timeout: 10, # pass message in raw form (JSON formatted string) instead of JSON-decoded # Ruby objects to message handlers raw: false, # the starting date from which the activities will be recovered (replay mode only) from: nil, # the ending date to which the activities will be recovered (replay mode only) to: nil, # specify a number of minutes to leverage the Backfill feature backfill_minutes: nil, # called for each message received, except heartbeats on_message: nil, # called for each activity received on_activity: nil, # called for each system message received on_system: nil, # called for each heartbeat received on_heartbeat: nil, # called periodically to detect if the tracked has to be closed close_now: nil }.freeze
Instance Attribute Summary collapse
-
#account_name ⇒ Object
readonly
Returns the value of attribute account_name.
-
#data_source ⇒ Object
readonly
Returns the value of attribute data_source.
-
#label ⇒ Object
readonly
Returns the value of attribute label.
-
#username ⇒ Object
readonly
Returns the value of attribute username.
Instance Method Summary collapse
-
#add_rules(*rules) ⇒ Object
Adds many rules to your PowerTrack stream’s ruleset.
-
#delete_rules(*rules) ⇒ Object
Removes the specified rules from the stream.
-
#initialize(username, password, account_name, data_source, label, options = nil) ⇒ Stream
constructor
A new instance of Stream.
-
#list_rules(options = nil) ⇒ Object
Retrieves all existing rules for a stream.
-
#track(options = nil) ⇒ Object
Establishes a persistent connection to the PowerTrack data stream, through which the social data will be delivered.
Methods included from API
Constructor Details
#initialize(username, password, account_name, data_source, label, options = nil) ⇒ Stream
Returns a new instance of Stream.
54 55 56 57 58 59 60 61 62 |
# File 'lib/powertrack/streaming/stream.rb', line 54 def initialize(username, password, account_name, data_source, label, =nil) @username = username @password = password @account_name = account_name @data_source = data_source @label = label @options = DEFAULT_STREAM_OPTIONS.merge( || {}) @replay = !!@options[:replay] end |
Instance Attribute Details
#account_name ⇒ Object (readonly)
Returns the value of attribute account_name.
52 53 54 |
# File 'lib/powertrack/streaming/stream.rb', line 52 def account_name @account_name end |
#data_source ⇒ Object (readonly)
Returns the value of attribute data_source.
52 53 54 |
# File 'lib/powertrack/streaming/stream.rb', line 52 def data_source @data_source end |
#label ⇒ Object (readonly)
Returns the value of attribute label.
52 53 54 |
# File 'lib/powertrack/streaming/stream.rb', line 52 def label @label end |
#username ⇒ Object (readonly)
Returns the value of attribute username.
52 53 54 |
# File 'lib/powertrack/streaming/stream.rb', line 52 def username @username end |
Instance Method Details
#add_rules(*rules) ⇒ Object
Adds many rules to your PowerTrack stream’s ruleset.
POST /rules
See support.gnip.com/apis/powertrack/api_reference.html#AddRules
69 70 71 72 73 74 |
# File 'lib/powertrack/streaming/stream.rb', line 69 def add_rules(*rules) # flatten the rules in case it was provided as an array make_rules_request(:post, body: MultiJson.encode('rules' => rules.flatten), ok: 201) end |
#delete_rules(*rules) ⇒ Object
Removes the specified rules from the stream.
DELETE /rules
See support.gnip.com/apis/powertrack/api_reference.html#DeleteRules
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/powertrack/streaming/stream.rb', line 81 def delete_rules(*rules) # v2 does not use DELETE anymore delete_verb = :post # flatten the rules in case it was provided as an array = { body: MultiJson.encode('rules' => rules.flatten) } # v2 uses a query parameter [:query] = { '_method' => 'delete' } make_rules_request(delete_verb, ) end |
#list_rules(options = nil) ⇒ Object
Retrieves all existing rules for a stream.
Returns an array of PowerTrack::Rule objects when the response permits so.
GET /rules
See support.gnip.com/apis/powertrack/api_reference.html#ListRules
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/powertrack/streaming/stream.rb', line 104 def list_rules(=nil) = DEFAULT_LIST_RULES_OPTIONS.merge( || {}) res = make_rules_request(:get, headers: gzip_compressed_header([:compressed])) # return Rule objects when required and feasible/appropriate if [:objectify] && res.is_a?(Hash) && (rules = res['rules']).is_a?(Array) && rules.all? { |rule| rule.is_a?(Hash) && rule.key?('value') } rules.map do |rule| PowerTrack::Rule.new(rule['value'], tag: rule['tag'], id: rule['id']) end else res end end |
#track(options = nil) ⇒ Object
Establishes a persistent connection to the PowerTrack data stream, through which the social data will be delivered.
Manages reconnections when being disconnected.
GET /track/:stream
See support.gnip.com/apis/powertrack/api_reference.html#Stream
159 160 161 162 163 |
# File 'lib/powertrack/streaming/stream.rb', line 159 def track(=nil) = DEFAULT_TRACK_OPTIONS.merge( || {}) retrier = PowerTrack::Retrier.new([:max_retries]) handle_api_response(*retrier.retry { track_once(, retrier) }) end |