Class: Firehose::Client::Producer::Http
- Inherits:
-
Object
- Object
- Firehose::Client::Producer::Http
- Defined in:
- lib/firehose/client/producer.rb
Overview
Publish messages to Firehose via an HTTP interface.
Defined Under Namespace
Classes: Builder
Constant Summary collapse
- PublishError =
Exception gets raised when a 202 is not received from the server after a message is published.
Class.new(RuntimeError)
- TimeoutError =
Class.new(Faraday::Error::TimeoutError)
- DEFAULT_TIMEOUT =
How many seconds should we wait for a publish to take?
1
- DEFAULT_ERROR_HANDLER =
->(e) { raise e }
Instance Attribute Summary collapse
-
#timeout ⇒ Object
readonly
URI for the Firehose server.
-
#uri ⇒ Object
readonly
URI for the Firehose server.
Class Method Summary collapse
-
.adapter ⇒ Object
Use :net_http for the default Faraday adapter.
-
.adapter=(adapter) ⇒ Object
What adapter should Firehose use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.
Instance Method Summary collapse
-
#error_handler ⇒ Object
Raise an exception if an error occurs when connecting to the Firehose.
-
#initialize(uri = Firehose::URI, timeout = DEFAULT_TIMEOUT) ⇒ Http
constructor
A new instance of Http.
-
#on_error(&block) ⇒ Object
Handle errors that could happen while publishing a message.
-
#publish(message) ⇒ Object
A DSL for publishing messages.
-
#put(message, channel, opts, &block) ⇒ Object
Publish the message via HTTP.
Constructor Details
Instance Attribute Details
#timeout ⇒ Object (readonly)
URI for the Firehose server. This URI does not include the path of the channel.
30 31 32 |
# File 'lib/firehose/client/producer.rb', line 30 def timeout @timeout end |
#uri ⇒ Object (readonly)
URI for the Firehose server. This URI does not include the path of the channel.
30 31 32 |
# File 'lib/firehose/client/producer.rb', line 30 def uri @uri end |
Class Method Details
.adapter ⇒ Object
Use :net_http for the default Faraday adapter.
100 101 102 |
# File 'lib/firehose/client/producer.rb', line 100 def self.adapter @adapter ||= Faraday.default_adapter end |
.adapter=(adapter) ⇒ Object
What adapter should Firehose use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.
95 96 97 |
# File 'lib/firehose/client/producer.rb', line 95 def self.adapter=(adapter) @adapter = adapter end |
Instance Method Details
#error_handler ⇒ Object
Raise an exception if an error occurs when connecting to the Firehose.
89 90 91 |
# File 'lib/firehose/client/producer.rb', line 89 def error_handler @error_handler || DEFAULT_ERROR_HANDLER end |
#on_error(&block) ⇒ Object
Handle errors that could happen while publishing a message.
84 85 86 |
# File 'lib/firehose/client/producer.rb', line 84 def on_error(&block) @error_handler = block end |
#publish(message) ⇒ Object
A DSL for publishing messages.
39 40 41 |
# File 'lib/firehose/client/producer.rb', line 39 def publish() Builder.new(self, ) end |
#put(message, channel, opts, &block) ⇒ Object
Publish the message via HTTP.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/firehose/client/producer.rb', line 44 def put(, channel, opts, &block) ttl = opts[:ttl] timeout = opts[:timeout] || @timeout || DEFAULT_TIMEOUT buffer_size = opts[:buffer_size] response = conn.put do |req| req.[:timeout] = timeout if conn.path_prefix.nil? || conn.path_prefix == '/' # This avoids a double / if the channel starts with a / too (which is expected). req.path = channel else if conn.path_prefix =~ /\/\Z/ || channel =~ /\A\// req.path = [conn.path_prefix, channel].compact.join else # Add a / so the prefix and channel aren't just rammed together. req.path = [conn.path_prefix, channel].compact.join('/') end end req.body = req.headers['Cache-Control'] = "max-age=#{ttl.to_i}" if ttl req.headers["X-Firehose-Buffer-Size"] = buffer_size.to_s if buffer_size end response.on_complete do case response.status when 202 # Fire off the callback if everything worked out OK. block.call(response) if block else # don't pass along basic auth header, if present response_data = response.inspect.gsub(/"Authorization"=>"Basic \S+"/, '"Authorization" => "Basic [HIDDEN]"') endpoint = "#{uri}/#{channel}".gsub(/:\/\/\S+@/, "://") error_handler.call PublishError.new("Could not publish #{.inspect} to '#{endpoint}': #{response_data}") end end # Hide Faraday with this Timeout exception, and through the error handler. rescue Faraday::Error::TimeoutError => e error_handler.call TimeoutError.new(e) end |