Class: Firehose::Client::Producer::Http
- Inherits:
-
Object
- Object
- Firehose::Client::Producer::Http
- Defined in:
- lib/firehose/client/producer.rb
Overview
Publish messages to Firehose via an HTTP interface.
Defined Under Namespace
Classes: Builder
Constant Summary collapse
- PublishError =
Exception gets raised when a 202 is not received from the server after a message is published.
Class.new(RuntimeError)
- TimeoutError =
Class.new(Faraday::Error::TimeoutError)
- DEFAULT_TIMEOUT =
How many seconds should we wait for a publish to take?
1
Instance Attribute Summary collapse
-
#timeout ⇒ Object
readonly
URI for the Firehose server.
-
#uri ⇒ Object
readonly
URI for the Firehose server.
Class Method Summary collapse
-
.adapter ⇒ Object
Use :net_http for the default Faraday adapter.
-
.adapter=(adapter) ⇒ Object
What adapter should Firehose use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.
Instance Method Summary collapse
-
#error_handler ⇒ Object
Raise an exception if an error occurs when connecting to the Firehose.
-
#initialize(uri = Firehose::URI, timeout = DEFAULT_TIMEOUT) ⇒ Http
constructor
A new instance of Http.
-
#on_error(&block) ⇒ Object
Handle errors that could happen while publishing a message.
-
#publish(message) ⇒ Object
A DSL for publishing messages.
-
#put(message, channel, opts, &block) ⇒ Object
Publish the message via HTTP.
Constructor Details
Instance Attribute Details
#timeout ⇒ Object (readonly)
URI for the Firehose server. This URI does not include the path of the channel.
29 30 31 |
# File 'lib/firehose/client/producer.rb', line 29 def timeout @timeout end |
#uri ⇒ Object (readonly)
URI for the Firehose server. This URI does not include the path of the channel.
29 30 31 |
# File 'lib/firehose/client/producer.rb', line 29 def uri @uri end |
Class Method Details
.adapter ⇒ Object
Use :net_http for the default Faraday adapter.
94 95 96 |
# File 'lib/firehose/client/producer.rb', line 94 def self.adapter @adapter ||= Faraday.default_adapter end |
.adapter=(adapter) ⇒ Object
What adapter should Firehose use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.
89 90 91 |
# File 'lib/firehose/client/producer.rb', line 89 def self.adapter=(adapter) @adapter = adapter end |
Instance Method Details
#error_handler ⇒ Object
Raise an exception if an error occurs when connecting to the Firehose.
83 84 85 |
# File 'lib/firehose/client/producer.rb', line 83 def error_handler @error_handler || Proc.new{ |e| raise e } end |
#on_error(&block) ⇒ Object
Handle errors that could happen while publishing a message.
78 79 80 |
# File 'lib/firehose/client/producer.rb', line 78 def on_error(&block) @error_handler = block end |
#publish(message) ⇒ Object
A DSL for publishing messages.
38 39 40 |
# File 'lib/firehose/client/producer.rb', line 38 def publish() Builder.new(self, ) end |
#put(message, channel, opts, &block) ⇒ Object
Publish the message via HTTP.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/firehose/client/producer.rb', line 43 def put(, channel, opts, &block) ttl = opts[:ttl] timeout = opts[:timeout] || @timeout || DEFAULT_TIMEOUT response = conn.put do |req| req.[:timeout] = timeout if conn.path_prefix.nil? || conn.path_prefix == '/' # This avoids a double / if the channel starts with a / too (which is expected). req.path = channel else if conn.path_prefix =~ /\/\Z/ || channel =~ /\A\// req.path = [conn.path_prefix, channel].compact.join else # Add a / so the prefix and channel aren't just rammed together. req.path = [conn.path_prefix, channel].compact.join('/') end end req.body = req.headers['Cache-Control'] = "max-age=#{ttl.to_i}" if ttl end response.on_complete do case response.status when 202 # Fire off the callback if everything worked out OK. block.call(response) if block else error_handler.call PublishError.new("Could not publish #{.inspect} to '#{uri.to_s}/#{channel}': #{response.inspect}") end end # Hide Faraday with this Timeout exception, and through the error handler. rescue Faraday::Error::TimeoutError => e error_handler.call TimeoutError.new(e) end |