Class: Twitter::JSONStream
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- Twitter::JSONStream
- Defined in:
- lib/twitter/json_stream.rb
Constant Summary collapse
- MAX_LINE_LENGTH =
1024*1024
- NF_RECONNECT_START =
network failure reconnections
0.25
- NF_RECONNECT_ADD =
0.25
- NF_RECONNECT_MAX =
16
- AF_RECONNECT_START =
app failure reconnections
10
- AF_RECONNECT_MUL =
2
- RECONNECT_MAX =
320
- RETRIES_MAX =
10
- NO_DATA_TIMEOUT =
90
- DEFAULT_OPTIONS =
{ :method => 'GET', :path => '/', :content_type => "application/x-www-form-urlencoded", :content => '', :path => '/1/statuses/filter.json', :host => 'stream.twitter.com', :port => 443, :ssl => true, :user_agent => 'TwitterStream', :timeout => 0, :proxy => ENV['HTTP_PROXY'], :auth => nil, :oauth => {}, :filters => [], :params => {}, :auto_reconnect => true }
Instance Attribute Summary collapse
-
#af_last_reconnect ⇒ Object
Returns the value of attribute af_last_reconnect.
-
#code ⇒ Object
Returns the value of attribute code.
-
#headers ⇒ Object
Returns the value of attribute headers.
-
#last_data_received_at ⇒ Object
Returns the value of attribute last_data_received_at.
-
#nf_last_reconnect ⇒ Object
Returns the value of attribute nf_last_reconnect.
-
#proxy ⇒ Object
Returns the value of attribute proxy.
-
#reconnect_retries ⇒ Object
Returns the value of attribute reconnect_retries.
Class Method Summary collapse
Instance Method Summary collapse
- #connection_completed ⇒ Object
- #each_item(&block) ⇒ Object
- #immediate_reconnect ⇒ Object
-
#initialize(options = {}) ⇒ JSONStream
constructor
A new instance of JSONStream.
- #on_close(&block) ⇒ Object
- #on_error(&block) ⇒ Object
- #on_max_reconnects(&block) ⇒ Object
-
#on_no_data(&block) ⇒ Object
Called when no data has been received for NO_DATA_TIMEOUT seconds.
- #on_reconnect(&block) ⇒ Object
- #post_init ⇒ Object
-
#receive_data(data) ⇒ Object
Receives raw data from the HTTP connection and pushes it into the HTTP parser which then drives subsequent callbacks.
- #stop ⇒ Object
- #unbind ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ JSONStream
Returns a new instance of JSONStream.
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/twitter/json_stream.rb', line 69 def initialize = {} @options = DEFAULT_OPTIONS.merge() # merge in case initialize called directly @gracefully_closed = false @nf_last_reconnect = nil @af_last_reconnect = nil @reconnect_retries = 0 @immediate_reconnect = false @on_inited_callback = .delete(:on_inited) @proxy = URI.parse([:proxy]) if [:proxy] @last_data_received_at = nil end |
Instance Attribute Details
#af_last_reconnect ⇒ Object
Returns the value of attribute af_last_reconnect.
47 48 49 |
# File 'lib/twitter/json_stream.rb', line 47 def af_last_reconnect @af_last_reconnect end |
#code ⇒ Object
Returns the value of attribute code.
44 45 46 |
# File 'lib/twitter/json_stream.rb', line 44 def code @code end |
#headers ⇒ Object
Returns the value of attribute headers.
45 46 47 |
# File 'lib/twitter/json_stream.rb', line 45 def headers @headers end |
#last_data_received_at ⇒ Object
Returns the value of attribute last_data_received_at.
49 50 51 |
# File 'lib/twitter/json_stream.rb', line 49 def last_data_received_at @last_data_received_at end |
#nf_last_reconnect ⇒ Object
Returns the value of attribute nf_last_reconnect.
46 47 48 |
# File 'lib/twitter/json_stream.rb', line 46 def nf_last_reconnect @nf_last_reconnect end |
#proxy ⇒ Object
Returns the value of attribute proxy.
50 51 52 |
# File 'lib/twitter/json_stream.rb', line 50 def proxy @proxy end |
#reconnect_retries ⇒ Object
Returns the value of attribute reconnect_retries.
48 49 50 |
# File 'lib/twitter/json_stream.rb', line 48 def reconnect_retries @reconnect_retries end |
Class Method Details
.connect(options = {}) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/twitter/json_stream.rb', line 52 def self.connect = {} [:port] = 443 if [:ssl] && !.has_key?(:port) = DEFAULT_OPTIONS.merge() host = [:host] port = [:port] if [:proxy] proxy_uri = URI.parse([:proxy]) host = proxy_uri.host port = proxy_uri.port end connection = EventMachine.connect host, port, self, connection end |
Instance Method Details
#connection_completed ⇒ Object
134 135 136 137 |
# File 'lib/twitter/json_stream.rb', line 134 def connection_completed start_tls if @options[:ssl] send_request end |
#each_item(&block) ⇒ Object
81 82 83 |
# File 'lib/twitter/json_stream.rb', line 81 def each_item &block @each_item_callback = block end |
#immediate_reconnect ⇒ Object
112 113 114 115 116 |
# File 'lib/twitter/json_stream.rb', line 112 def immediate_reconnect @immediate_reconnect = true @gracefully_closed = false close_connection end |
#on_close(&block) ⇒ Object
103 104 105 |
# File 'lib/twitter/json_stream.rb', line 103 def on_close &block @close_callback = block end |
#on_error(&block) ⇒ Object
85 86 87 |
# File 'lib/twitter/json_stream.rb', line 85 def on_error &block @error_callback = block end |
#on_max_reconnects(&block) ⇒ Object
99 100 101 |
# File 'lib/twitter/json_stream.rb', line 99 def on_max_reconnects &block @max_reconnects_callback = block end |
#on_no_data(&block) ⇒ Object
Called when no data has been received for NO_DATA_TIMEOUT seconds. Reconnecting is probably in order as per the Twitter recommendations
95 96 97 |
# File 'lib/twitter/json_stream.rb', line 95 def on_no_data &block @no_data_callback = block end |
#on_reconnect(&block) ⇒ Object
89 90 91 |
# File 'lib/twitter/json_stream.rb', line 89 def on_reconnect &block @reconnect_callback = block end |
#post_init ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/twitter/json_stream.rb', line 139 def post_init reset_state @on_inited_callback.call if @on_inited_callback @reconnect_timer = EventMachine.add_periodic_timer(5) do if @gracefully_closed @reconnect_timer.cancel elsif @last_data_received_at && Time.now - @last_data_received_at > NO_DATA_TIMEOUT no_data end end end |
#receive_data(data) ⇒ Object
Receives raw data from the HTTP connection and pushes it into the HTTP parser which then drives subsequent callbacks.
129 130 131 132 |
# File 'lib/twitter/json_stream.rb', line 129 def receive_data(data) @last_data_received_at = Time.now @parser << data end |
#stop ⇒ Object
107 108 109 110 |
# File 'lib/twitter/json_stream.rb', line 107 def stop @gracefully_closed = true close_connection end |
#unbind ⇒ Object
118 119 120 121 122 123 124 125 |
# File 'lib/twitter/json_stream.rb', line 118 def unbind if @state == :stream && !@buffer.empty? parse_stream_line(@buffer.flush) end schedule_reconnect if @options[:auto_reconnect] && !@gracefully_closed @close_callback.call if @close_callback @state = :init end |