Class: TweetStream::Client
- Inherits:
-
Object
- Object
- TweetStream::Client
- Extended by:
- Forwardable
- Defined in:
- lib/tweetstream/client.rb
Overview
Provides simple access to the Twitter Streaming API (https://dev.twitter.com/docs/streaming-api) for Ruby scripts that need to create a long connection to Twitter for tracking and other purposes.
Basic usage of the library is to call one of the provided methods and provide a block that will perform actions on a yielded Twitter::Tweet. For example:
TweetStream::Client.new.track('fail') do |status|
puts "[#{status.user.screen_name}] #{status.text}"
end
For information about a daemonized TweetStream client, view the TweetStream::Daemon class.
Direct Known Subclasses
Constant Summary collapse
- OPTION_CALLBACKS =
[:delete, :scrub_geo, :limit, :error, :enhance_your_calm, :unauthorized, :reconnect, :inited, :direct_message, :timeline_status, :anything, :no_data_received, :status_withheld, :user_withheld].freeze
Instance Attribute Summary collapse
-
#control ⇒ Object
readonly
Returns the value of attribute control.
-
#control_uri ⇒ Object
readonly
Returns the value of attribute control_uri.
-
#options ⇒ Object
Returns the value of attribute options.
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Instance Method Summary collapse
-
#close_connection ⇒ Object
Close the connection to twitter without closing the eventmachine loop.
-
#connect(path, options = {}, &block) ⇒ Object
connect to twitter without starting a new EventMachine run loop.
- #controllable? ⇒ Boolean
-
#filter(query_params = {}, &block) ⇒ Object
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords.
-
#firehose(query_parameters = {}, &block) ⇒ Object
Returns all public statuses.
-
#follow(*user_ids, &block) ⇒ Object
Returns public statuses from or in reply to a set of users.
-
#initialize(options = {}) ⇒ Client
constructor
Creates a new API.
-
#links(query_parameters = {}, &block) ⇒ Object
Returns all statuses containing http: and https:.
-
#locations(*locations_map, &block) ⇒ Object
Specifies a set of bounding boxes to track.
- #on(event, &block) ⇒ Object
-
#on_anything(&block) ⇒ Object
Set a Proc to be run whenever anything is encountered in the processing of the stream.
-
#on_control(&block) ⇒ Object
Set a Proc to be run when sitestream control is received.
-
#on_delete(&block) ⇒ Object
Set a Proc to be run when a deletion notice is received from the Twitter stream.
-
#on_direct_message(&block) ⇒ Object
Set a Proc to be run when a direct message is encountered in the processing of the stream.
-
#on_enhance_your_calm(&block) ⇒ Object
Set a Proc to be run when enhance_your_calm signal is received.
-
#on_error(&block) ⇒ Object
Set a Proc to be run when an HTTP error is encountered in the processing of the stream.
-
#on_event(event, &block) ⇒ Object
Set a Proc to be run on userstream events.
-
#on_friends(&block) ⇒ Object
Set a Proc to be run when a Site Stream friends list is received.
-
#on_inited(&block) ⇒ Object
Set a Proc to be run when connection established.
-
#on_limit(&block) ⇒ Object
Set a Proc to be run when a rate limit notice is received from the Twitter stream.
-
#on_no_data_received(&block) ⇒ Object
Set a Proc to be run when no data is received from the server and a stall occurs.
-
#on_reconnect(&block) ⇒ Object
Set a Proc to be run on reconnect.
-
#on_scrub_geo(&block) ⇒ Object
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream.
-
#on_stall_warning(&block) ⇒ Object
Set a Proc to be run when a stall warning is received.
-
#on_status_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
-
#on_timeline_status(&block) ⇒ Object
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
-
#on_unauthorized(&block) ⇒ Object
Set a Proc to be run when an HTTP status 401 is encountered while connecting to Twitter.
-
#on_user_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
-
#retweet(query_parameters = {}, &block) ⇒ Object
Returns all retweets.
-
#sample(query_parameters = {}, &block) ⇒ Object
Returns a random sample of all public statuses.
-
#sitestream(user_ids = [], query_params = {}, &block) ⇒ Object
Make a call to the userstream api.
-
#start(path, query_parameters = {}, &block) ⇒ Object
connect to twitter while starting a new EventMachine run loop.
-
#stop ⇒ Object
Terminate the currently running TweetStream and close EventMachine loop.
- #stop_stream ⇒ Object
-
#track(*keywords, &block) ⇒ Object
Specify keywords to track.
-
#userstream(query_params = {}, &block) ⇒ Object
Make a call to the userstream api for currently authenticated user.
Constructor Details
#initialize(options = {}) ⇒ Client
Creates a new API
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/tweetstream/client.rb', line 50 def initialize( = {}) self. = = TweetStream..merge() Configuration::VALID_OPTIONS_KEYS.each do |key| send("#{key}=", [key]) end @control_uri = nil @control = nil @callbacks = {} end |
Instance Attribute Details
#control ⇒ Object (readonly)
Returns the value of attribute control.
45 46 47 |
# File 'lib/tweetstream/client.rb', line 45 def control @control end |
#control_uri ⇒ Object (readonly)
Returns the value of attribute control_uri.
45 46 47 |
# File 'lib/tweetstream/client.rb', line 45 def control_uri @control_uri end |
#options ⇒ Object
Returns the value of attribute options.
44 45 46 |
# File 'lib/tweetstream/client.rb', line 44 def @options end |
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
45 46 47 |
# File 'lib/tweetstream/client.rb', line 45 def stream @stream end |
Instance Method Details
#close_connection ⇒ Object
Close the connection to twitter without closing the eventmachine loop
466 467 468 |
# File 'lib/tweetstream/client.rb', line 466 def close_connection @stream.close_connection if @stream end |
#connect(path, options = {}, &block) ⇒ Object
connect to twitter without starting a new EventMachine run loop
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 |
# File 'lib/tweetstream/client.rb', line 410 def connect(path, = {}, &block) stream_parameters, callbacks = (path, ) @stream = EM::Twitter::Client.connect(stream_parameters) @stream.each do |item| begin hash = MultiJson.decode(item, :symbolize_keys => true) rescue MultiJson::DecodeError invoke_callback(callbacks['error'], "MultiJson::DecodeError occured in stream: #{item}") next end unless hash.is_a?(::Hash) invoke_callback(callbacks['error'], "Unexpected JSON object in stream: #{item}") next end respond_to(hash, callbacks, &block) (callbacks['anything'], hash) end @stream.on_error do || invoke_callback(callbacks['error'], ) end @stream. do invoke_callback(callbacks['unauthorized']) end @stream.on_enhance_your_calm do invoke_callback(callbacks['enhance_your_calm']) end @stream.on_reconnect do |timeout, retries| invoke_callback(callbacks['reconnect'], timeout, retries) end @stream.on_max_reconnects do |timeout, retries| fail TweetStream::ReconnectError.new(timeout, retries) end @stream.on_no_data_received do invoke_callback(callbacks['no_data_received']) end @stream end |
#controllable? ⇒ Boolean
474 475 476 |
# File 'lib/tweetstream/client.rb', line 474 def controllable? !!@control end |
#filter(query_params = {}, &block) ⇒ Object
Make a call to the statuses/filter method of the Streaming API, you may provide :follow, :track or both as options to follow the tweets of specified users or track keywords. This method is provided separately for cases when it would conserve the number of HTTP connections to combine track and follow.
135 136 137 |
# File 'lib/tweetstream/client.rb', line 135 def filter(query_params = {}, &block) start('/1.1/statuses/filter.json', query_params.merge(:method => :post), &block) end |
#firehose(query_parameters = {}, &block) ⇒ Object
Returns all public statuses. The Firehose is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
65 66 67 |
# File 'lib/tweetstream/client.rb', line 65 def firehose(query_parameters = {}, &block) start('/1.1/statuses/firehose.json', query_parameters, &block) end |
#follow(*user_ids, &block) ⇒ Object
Returns public statuses from or in reply to a set of users. Mentions ("Hello @user!") and implicit replies ("@user Hello!" created without pressing the reply "swoosh") are not matched. Requires integer user IDs, not screen names. Query parameters may be passed as the last argument.
112 113 114 115 |
# File 'lib/tweetstream/client.rb', line 112 def follow(*user_ids, &block) query = TweetStream::Arguments.new(user_ids) filter(query..merge(:follow => query), &block) end |
#links(query_parameters = {}, &block) ⇒ Object
Returns all statuses containing http: and https:. The links stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case.
73 74 75 |
# File 'lib/tweetstream/client.rb', line 73 def links(query_parameters = {}, &block) start('/1.1/statuses/links.json', query_parameters, &block) end |
#locations(*locations_map, &block) ⇒ Object
Specifies a set of bounding boxes to track. Only tweets that are both created using the Geotagging API and are placed from within a tracked bounding box will be included in the stream -- the user's location field is not used to filter tweets (e.g. if a user has their location set to "San Francisco", but the tweet was not created using the Geotagging API and has no geo element, it will not be included in the stream). Bounding boxes are specified as a comma separate list of longitude/latitude pairs, with the first pair denoting the southwest corner of the box longitude/latitude pairs, separated by commas. The first pair specifies the southwest corner of the box.
125 126 127 128 |
# File 'lib/tweetstream/client.rb', line 125 def locations(*locations_map, &block) query = TweetStream::Arguments.new(locations_map) filter(query..merge(:locations => query), &block) end |
#on(event, &block) ⇒ Object
386 387 388 389 390 391 392 393 |
# File 'lib/tweetstream/client.rb', line 386 def on(event, &block) if block_given? @callbacks[event.to_s] = block self else @callbacks[event.to_s] end end |
#on_anything(&block) ⇒ Object
Set a Proc to be run whenever anything is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_anything do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
262 263 264 |
# File 'lib/tweetstream/client.rb', line 262 def on_anything(&block) on('anything', &block) end |
#on_control(&block) ⇒ Object
Set a Proc to be run when sitestream control is received
@client = TweetStream::Client.new
@client.on_control do
# do something with the status
end
382 383 384 |
# File 'lib/tweetstream/client.rb', line 382 def on_control(&block) on('control', &block) end |
#on_delete(&block) ⇒ Object
Set a Proc to be run when a deletion notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_delete do |status_id, user_id|
Tweet.delete(status_id)
end
Block must take two arguments: the status id and the user id. If no block is given, it will return the currently set deletion proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
170 171 172 |
# File 'lib/tweetstream/client.rb', line 170 def on_delete(&block) on('delete', &block) end |
#on_direct_message(&block) ⇒ Object
Set a Proc to be run when a direct message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client. do ||
# do something with the direct message
end
Block must take one argument: the direct message. If no block is given, it will return the currently set direct message proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
246 247 248 |
# File 'lib/tweetstream/client.rb', line 246 def (&block) on('direct_message', &block) end |
#on_enhance_your_calm(&block) ⇒ Object
Set a Proc to be run when enhance_your_calm signal is received.
@client = TweetStream::Client.new
@client.on_enhance_your_calm do
# do something, your account has been blocked
end
322 323 324 |
# File 'lib/tweetstream/client.rb', line 322 def on_enhance_your_calm(&block) on('enhance_your_calm', &block) end |
#on_error(&block) ⇒ Object
Set a Proc to be run when an HTTP error is encountered in the processing of the stream. Note that TweetStream will automatically try to reconnect, this is for reference only. Don't panic!
@client = TweetStream::Client.new
@client.on_error do ||
# Make note of error message
end
Block must take one argument: the error message. If no block is given, it will return the currently set error proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
219 220 221 |
# File 'lib/tweetstream/client.rb', line 219 def on_error(&block) on('error', &block) end |
#on_event(event, &block) ⇒ Object
Set a Proc to be run on userstream events
@client = TweetStream::Client.new
@client.on_event(:favorite) do |event|
# do something with the status
end
372 373 374 |
# File 'lib/tweetstream/client.rb', line 372 def on_event(event, &block) on(event, &block) end |
#on_friends(&block) ⇒ Object
Set a Proc to be run when a Site Stream friends list is received.
@client = TweetStream::Client.new
@client.on_friends do |friends|
# do something with the friends list
end
352 353 354 |
# File 'lib/tweetstream/client.rb', line 352 def on_friends(&block) on('friends', &block) end |
#on_inited(&block) ⇒ Object
Set a Proc to be run when connection established. Called in EventMachine::Connection#post_init
@client = TweetStream::Client.new
@client.on_inited do
puts 'Connected...'
end
301 302 303 |
# File 'lib/tweetstream/client.rb', line 301 def on_inited(&block) on('inited', &block) end |
#on_limit(&block) ⇒ Object
Set a Proc to be run when a rate limit notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_limit do |discarded_count|
# Make note of discarded count
end
Block must take one argument: the number of discarded tweets. If no block is given, it will return the currently set limit proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
202 203 204 |
# File 'lib/tweetstream/client.rb', line 202 def on_limit(&block) on('limit', &block) end |
#on_no_data_received(&block) ⇒ Object
Set a Proc to be run when no data is received from the server and a stall occurs. Twitter defines this to be 90 seconds.
@client = TweetStream::Client.new
@client.on_no_data_received do
# Make note of no data, possi
end
312 313 314 |
# File 'lib/tweetstream/client.rb', line 312 def on_no_data_received(&block) on('no_data_received', &block) end |
#on_reconnect(&block) ⇒ Object
Set a Proc to be run on reconnect.
@client = TweetStream::Client.new
@client.on_reconnect do |timeout, retries|
# Make note of the reconnection
end
289 290 291 |
# File 'lib/tweetstream/client.rb', line 289 def on_reconnect(&block) on('reconnect', &block) end |
#on_scrub_geo(&block) ⇒ Object
Set a Proc to be run when a scrub_geo notice is received from the Twitter stream. For example:
@client = TweetStream::Client.new
@client.on_scrub_geo do |up_to_status_id, user_id|
Tweet.where(:status_id <= up_to_status_id)
end
Block must take two arguments: the upper status id and the user id. If no block is given, it will return the currently set scrub_geo proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
186 187 188 |
# File 'lib/tweetstream/client.rb', line 186 def on_scrub_geo(&block) on('scrub_geo', &block) end |
#on_stall_warning(&block) ⇒ Object
Set a Proc to be run when a stall warning is received.
@client = TweetStream::Client.new
@client.on_stall_warning do |warning|
# do something with the friends list
end
362 363 364 |
# File 'lib/tweetstream/client.rb', line 362 def on_stall_warning(&block) on('stall_warning', &block) end |
#on_status_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
@client = TweetStream::Client.new
@client.on_status_withheld do |status|
# do something with the status
end
332 333 334 |
# File 'lib/tweetstream/client.rb', line 332 def on_status_withheld(&block) on('status_withheld', &block) end |
#on_timeline_status(&block) ⇒ Object
Set a Proc to be run when a regular timeline message is encountered in the processing of the stream.
@client = TweetStream::Client.new
@client.on_timeline_status do |status|
# do something with the status
end
Block can take one or two arguments. |status (, client)| If no block is given, it will return the currently set timeline status proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
278 279 280 |
# File 'lib/tweetstream/client.rb', line 278 def on_timeline_status(&block) on('timeline_status', &block) end |
#on_unauthorized(&block) ⇒ Object
Set a Proc to be run when an HTTP status 401 is encountered while connecting to Twitter. This could happen when system clock drift has occured.
If no block is given, it will return the currently set unauthorized proc. When a block is given, the TweetStream::Client object is returned to allow for chaining.
230 231 232 |
# File 'lib/tweetstream/client.rb', line 230 def (&block) on('unauthorized', &block) end |
#on_user_withheld(&block) ⇒ Object
Set a Proc to be run when a status_withheld message is received.
@client = TweetStream::Client.new
@client.on_user_withheld do |status|
# do something with the status
end
342 343 344 |
# File 'lib/tweetstream/client.rb', line 342 def on_user_withheld(&block) on('user_withheld', &block) end |
#retweet(query_parameters = {}, &block) ⇒ Object
Returns all retweets. The retweet stream is not a generally available resource. Few applications require this level of access. Creative use of a combination of other resources and various access levels can satisfy nearly every application use case. As of 9/11/2009, the site-wide retweet feature has not yet launched, so there are currently few, if any, retweets on this stream.
83 84 85 |
# File 'lib/tweetstream/client.rb', line 83 def retweet(query_parameters = {}, &block) start('/1.1/statuses/retweet.json', query_parameters, &block) end |
#sample(query_parameters = {}, &block) ⇒ Object
Returns a random sample of all public statuses. The default access level provides a small proportion of the Firehose. The "Gardenhose" access level provides a proportion more suitable for data mining and research applications that desire a larger proportion to be statistically significant sample.
92 93 94 |
# File 'lib/tweetstream/client.rb', line 92 def sample(query_parameters = {}, &block) start('/1.1/statuses/sample.json', query_parameters, &block) end |
#sitestream(user_ids = [], query_params = {}, &block) ⇒ Object
Make a call to the userstream api
147 148 149 150 151 152 153 154 155 156 |
# File 'lib/tweetstream/client.rb', line 147 def sitestream(user_ids = [], query_params = {}, &block) stream_params = {:host => 'sitestream.twitter.com'} query_params.merge!( :method => :post, :follow => user_ids, :extra_stream_parameters => stream_params ) query_params.merge!(:with => 'followings') if query_params.delete(:followings) start('/1.1/site.json', query_params, &block) end |
#start(path, query_parameters = {}, &block) ⇒ Object
connect to twitter while starting a new EventMachine run loop
396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/tweetstream/client.rb', line 396 def start(path, query_parameters = {}, &block) if EventMachine.reactor_running? connect(path, query_parameters, &block) else EventMachine.epoll EventMachine.kqueue EventMachine.run do connect(path, query_parameters, &block) end end end |
#stop ⇒ Object
Terminate the currently running TweetStream and close EventMachine loop
460 461 462 463 |
# File 'lib/tweetstream/client.rb', line 460 def stop EventMachine.stop_event_loop @last_status end |
#stop_stream ⇒ Object
470 471 472 |
# File 'lib/tweetstream/client.rb', line 470 def stop_stream @stream.stop if @stream end |
#track(*keywords, &block) ⇒ Object
Specify keywords to track. Queries are subject to Track Limitations, described in Track Limiting and subject to access roles, described in the statuses/filter method. Track keywords are case-insensitive logical ORs. Terms are exact-matched, and also exact-matched ignoring punctuation. Phrases, keywords with spaces, are not supported. Keywords containing punctuation will only exact match tokens. Query parameters may be passed as the last argument.
103 104 105 106 |
# File 'lib/tweetstream/client.rb', line 103 def track(*keywords, &block) query = TweetStream::Arguments.new(keywords) filter(query..merge(:track => query), &block) end |
#userstream(query_params = {}, &block) ⇒ Object
Make a call to the userstream api for currently authenticated user
140 141 142 143 144 |
# File 'lib/tweetstream/client.rb', line 140 def userstream(query_params = {}, &block) stream_params = {:host => 'userstream.twitter.com'} query_params.merge!(:extra_stream_parameters => stream_params) start('/1.1/user.json', query_params, &block) end |