Class: Firehose::Client::Consumer::HttpLongPoll

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/client/consumer.rb

Overview

Connect to Firehose via HTTP Long Polling and consume messages.

Constant Summary collapse

JITTER =
0.003

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, logger = Firehose.logger) ⇒ HttpLongPoll

Returns a new instance of HttpLongPoll.



59
60
61
# File 'lib/firehose/client/consumer.rb', line 59

def initialize(url, logger = Firehose.logger)
  @url, @logger = url, logger
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



57
58
59
# File 'lib/firehose/client/consumer.rb', line 57

def logger
  @logger
end

#urlObject (readonly)

Returns the value of attribute url.



57
58
59
# File 'lib/firehose/client/consumer.rb', line 57

def url
  @url
end

Instance Method Details

#request(last_sequence = 0) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/firehose/client/consumer.rb', line 63

def request(last_sequence=0)
  http = EM::HttpRequest.new(url, :inactivity_timeout => 0).get(:query => {'last_message_sequence' => last_sequence})
  http.callback do
    case status = http.response_header.status
    when 200
      json = JSON.parse(http.response)
      next_sequence = json['last_sequence'].to_i
      message = json['message']

      logger.info "HTTP 200 | Next Sequence: #{next_sequence} - #{message[0...40].inspect}"
      EM::add_timer(jitter) { request next_sequence }
    when 204
      logger.info "HTTP 204 | Last Sequence #{last_sequence}"
      EM::add_timer(jitter) { request last_sequence }
    else
      logger.error "HTTP #{status} | Failed"
    end
  end
  http.errback do
    logger.error "Connection Failed"
  end
end