Class: PowerTrack::Stream
- Inherits:
-
Object
- Object
- PowerTrack::Stream
- Includes:
- API, VoidLogger::LoggerMixin
- Defined in:
- lib/powertrack/streaming/stream.rb
Overview
A PowerTrack stream to be used for both updating the rules and collecting new messages.
Constant Summary collapse
- FEATURE_URL_FORMAT =
The format of the URLs to connect to the various stream services
"https://%s:%s/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze
- DEFAULT_CONNECTION_TIMEOUT =
The default timeout on a connection to PowerTrack. Can be overriden per call.
30- DEFAULT_INACTIVITY_TIMEOUT =
The default timeout for inactivity on a connection to PowerTrack. Can be overriden per call.
50- DEFAULT_STREAM_OPTIONS =
The default options for using the stream.
{ connect_timeout: DEFAULT_CONNECTION_TIMEOUT, inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT, # use a client id if you want to leverage the Backfill feature client_id: nil, # enable the replay mode to get activities over the last 5 days # see http://support.gnip.com/apis/replay/api_reference.html replay: false }
- DEFAULT_OK_RESPONSE_STATUS =
200- HEARTBEAT_MESSAGE_PATTERN =
The patterns used to identify the various types of message received from GNIP everything else is an activity
/\A\s*\z/- SYSTEM_MESSAGE_PATTERN =
/\A\s*\{\s*"(info|warn|error)":/mi- REPLAY_TIMESTAMP_FORMAT =
The format used to send UTC timestamps in Replay mode
'%Y%m%d%H%M'- DEFAULT_LIST_RULES_OPTIONS =
{ compressed: true, objectify: true }.freeze
- DEFAULT_TRACK_OPTIONS =
{ # receive GZip-compressed payloads ? compressed: true, # max number of retries after a disconnection max_retries: 2, # advanced options to configure exponential backoff used for retries backoff: nil, # max number of seconds to wait for last message handlers to complete stop_timeout: 10, # pass message in raw form (JSON formatted string) instead of JSON-decoded # Ruby objects to message handlers raw: false, # the starting date from which the activities will be recovered (replay mode only) from: nil, # the ending date to which the activities will be recovered (replay mode only) to: nil, # called for each message received, except heartbeats on_message: nil, # called for each activity received on_activity: nil, # called for each system message received on_system: nil, # called for each heartbeat received on_heartbeat: nil, # called periodically to detect if the tracked has to be closed close_now: nil }.freeze
Instance Attribute Summary collapse
-
#account_name ⇒ Object
readonly
Returns the value of attribute account_name.
-
#data_source ⇒ Object
readonly
Returns the value of attribute data_source.
-
#label ⇒ Object
readonly
Returns the value of attribute label.
-
#username ⇒ Object
readonly
Returns the value of attribute username.
Instance Method Summary collapse
-
#add_rules(*rules) ⇒ Object
Adds many rules to your PowerTrack stream’s ruleset.
-
#delete_rules(*rules) ⇒ Object
Removes the specified rules from the stream.
-
#initialize(username, password, account_name, data_source, label, options = nil) ⇒ Stream
constructor
A new instance of Stream.
-
#list_rules(options = nil) ⇒ Object
Retrieves all existing rules for a stream.
-
#track(options = nil) ⇒ Object
Establishes a persistent connection to the PowerTrack data stream, through which the social data will be delivered.
Methods included from API
Constructor Details
#initialize(username, password, account_name, data_source, label, options = nil) ⇒ Stream
Returns a new instance of Stream.
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/powertrack/streaming/stream.rb', line 53 def initialize(username, password, account_name, data_source, label, =nil) @username = username @password = password @account_name = account_name @data_source = data_source @label = label @options = DEFAULT_STREAM_OPTIONS.merge( || {}) @client_id = @options[:client_id] @replay = !!@options[:replay] @stream_mode = @replay ? 'replay' : 'streams' end |
Instance Attribute Details
#account_name ⇒ Object (readonly)
Returns the value of attribute account_name.
51 52 53 |
# File 'lib/powertrack/streaming/stream.rb', line 51 def account_name @account_name end |
#data_source ⇒ Object (readonly)
Returns the value of attribute data_source.
51 52 53 |
# File 'lib/powertrack/streaming/stream.rb', line 51 def data_source @data_source end |
#label ⇒ Object (readonly)
Returns the value of attribute label.
51 52 53 |
# File 'lib/powertrack/streaming/stream.rb', line 51 def label @label end |
#username ⇒ Object (readonly)
Returns the value of attribute username.
51 52 53 |
# File 'lib/powertrack/streaming/stream.rb', line 51 def username @username end |
Instance Method Details
#add_rules(*rules) ⇒ Object
Adds many rules to your PowerTrack stream’s ruleset.
POST /rules
See support.gnip.com/apis/powertrack/api_reference.html#AddRules
70 71 72 73 |
# File 'lib/powertrack/streaming/stream.rb', line 70 def add_rules(*rules) # flatten the rules in case it was provided as an array make_rules_request(:post, body: MultiJson.encode('rules' => rules.flatten), ok: 201) end |
#delete_rules(*rules) ⇒ Object
Removes the specified rules from the stream.
DELETE /rules
See support.gnip.com/apis/powertrack/api_reference.html#DeleteRules
80 81 82 83 |
# File 'lib/powertrack/streaming/stream.rb', line 80 def delete_rules(*rules) # flatten the rules in case it was provided as an array make_rules_request(:delete, body: MultiJson.encode('rules' => rules.flatten)) end |
#list_rules(options = nil) ⇒ Object
Retrieves all existing rules for a stream.
Returns an array of PowerTrack::Rule objects when the response permits so.
GET /rules
See support.gnip.com/apis/powertrack/api_reference.html#ListRules
97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/powertrack/streaming/stream.rb', line 97 def list_rules(=nil) = DEFAULT_LIST_RULES_OPTIONS.merge( || {}) res = make_rules_request(:get, headers: gzip_compressed_header([:compressed])) # return Rule objects when required and feasible/appropriate if [:objectify] && res.is_a?(Hash) && (rules = res['rules']).is_a?(Array) && rules.all? { |rule| rule.is_a?(Hash) && rule.key?('value') } rules.map { |rule| PowerTrack::Rule.new(rule['value'], rule['tag']) } else res end end |
#track(options = nil) ⇒ Object
Establishes a persistent connection to the PowerTrack data stream, through which the social data will be delivered.
GET /track/:stream
See support.gnip.com/apis/powertrack/api_reference.html#Stream
146 147 148 149 150 |
# File 'lib/powertrack/streaming/stream.rb', line 146 def track(=nil) = DEFAULT_TRACK_OPTIONS.merge( || {}) retrier = PowerTrack::Retrier.new([:max_retries]) handle_api_response(*retrier.retry { track_once(, retrier) }) end |