Class: Firehose::Client::Consumer::HttpLongPoll
- Inherits:
-
Object
- Object
- Firehose::Client::Consumer::HttpLongPoll
- Defined in:
- lib/firehose/client/consumer.rb
Overview
Connect to Firehose via HTTP Long Polling and consume messages.
Constant Summary collapse
- JITTER =
0.003
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#initialize(url, logger = Firehose.logger) ⇒ HttpLongPoll
constructor
A new instance of HttpLongPoll.
- #request(last_sequence = 0) ⇒ Object
Constructor Details
#initialize(url, logger = Firehose.logger) ⇒ HttpLongPoll
Returns a new instance of HttpLongPoll.
59 60 61 |
# File 'lib/firehose/client/consumer.rb', line 59 def initialize(url, logger = Firehose.logger) @url, @logger = url, logger end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
57 58 59 |
# File 'lib/firehose/client/consumer.rb', line 57 def logger @logger end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
57 58 59 |
# File 'lib/firehose/client/consumer.rb', line 57 def url @url end |
Instance Method Details
#request(last_sequence = 0) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/firehose/client/consumer.rb', line 63 def request(last_sequence=0) http = EM::HttpRequest.new(url, :inactivity_timeout => 0).get(:query => {'last_message_sequence' => last_sequence}) http.callback do case status = http.response_header.status when 200 json = JSON.parse(http.response) next_sequence = json['last_sequence'].to_i = json['message'] logger.info "HTTP 200 | Next Sequence: #{next_sequence} - #{[0...40].inspect}" EM::add_timer(jitter) { request next_sequence } when 204 logger.info "HTTP 204 | Last Sequence #{last_sequence}" EM::add_timer(jitter) { request last_sequence } else logger.error "HTTP #{status} | Failed" end end http.errback do logger.error "Connection Failed" end end |