Class: Brow::Transport
- Inherits:
-
Object
- Object
- Brow::Transport
- Defined in:
- lib/brow/transport.rb
Constant Summary collapse
- RETRIES =
Private: Default number of times to retry request.
10
- READ_TIMEOUT =
Private: Default read timeout on requests.
8
- OPEN_TIMEOUT =
Private: Default open timeout on requests.
4
- WRITE_TIMEOUT =
Private: Default write timeout on requests.
4
- VALID_HTTP_SCHEMES =
Private: URL schemes that this transport supports.
Set["http", "https"].freeze
Instance Attribute Summary collapse
-
#backoff_policy ⇒ Object
readonly
Private.
-
#headers ⇒ Object
readonly
Private.
-
#http ⇒ Object
readonly
Private.
-
#logger ⇒ Object
readonly
Private.
-
#retries ⇒ Object
readonly
Private.
-
#url ⇒ Object
readonly
Private.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Transport
constructor
A new instance of Transport.
-
#send_batch(batch) ⇒ Response
Sends a batch of messages to the API.
-
#shutdown ⇒ Object
Closes a persistent connection if it exists.
Constructor Details
#initialize(options = {}) ⇒ Transport
Returns a new instance of Transport.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/brow/transport.rb', line 31 def initialize( = {}) @url = .fetch(:url) { ENV.fetch("BROW_URL") { raise ArgumentError, ":url is required to be present so we know where to send batches" } } @uri = URI.parse(@url) unless VALID_HTTP_SCHEMES.include?(@uri.scheme) raise ArgumentError, ":url was must be http(s) scheme but was #{@uri.scheme.inspect}" end # Default path if people forget a slash. if @uri.path.nil? || @uri.path.empty? @uri.path = "/" end @headers = [:headers] || {} @retries = .fetch(:retries) { ENV.fetch("BROW_RETRIES", RETRIES).to_i } unless @retries >= 0 raise ArgumentError, ":retries must be >= to 0 but was #{@retries.inspect}" end @logger = .fetch(:logger) { Brow.logger } @backoff_policy = .fetch(:backoff_policy) { Brow::BackoffPolicy.new() } @http = Net::HTTP.new(@uri.host, @uri.port) @http.use_ssl = @uri.scheme == "https" read_timeout = .fetch(:read_timeout) { ENV.fetch("BROW_READ_TIMEOUT", READ_TIMEOUT).to_f } @http.read_timeout = read_timeout if read_timeout open_timeout = .fetch(:open_timeout) { ENV.fetch("BROW_OPEN_TIMEOUT", OPEN_TIMEOUT).to_f } @http.open_timeout = open_timeout if open_timeout if RUBY_VERSION >= '2.6.0' write_timeout = .fetch(:write_timeout) { ENV.fetch("BROW_WRITE_TIMEOUT", WRITE_TIMEOUT).to_f } @http.write_timeout = write_timeout if write_timeout else Kernel.warn("Warning: option :write_timeout requires Ruby version 2.6.0 or later") end end |
Instance Attribute Details
#backoff_policy ⇒ Object (readonly)
Private
29 30 31 |
# File 'lib/brow/transport.rb', line 29 def backoff_policy @backoff_policy end |
#headers ⇒ Object (readonly)
Private
29 30 31 |
# File 'lib/brow/transport.rb', line 29 def headers @headers end |
#http ⇒ Object (readonly)
Private
29 30 31 |
# File 'lib/brow/transport.rb', line 29 def http @http end |
#logger ⇒ Object (readonly)
Private
29 30 31 |
# File 'lib/brow/transport.rb', line 29 def logger @logger end |
#retries ⇒ Object (readonly)
Private
29 30 31 |
# File 'lib/brow/transport.rb', line 29 def retries @retries end |
#url ⇒ Object (readonly)
Private
29 30 31 |
# File 'lib/brow/transport.rb', line 29 def url @url end |
Instance Method Details
#send_batch(batch) ⇒ Response
Sends a batch of messages to the API
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/brow/transport.rb', line 88 def send_batch(batch) logger.debug { "#{LOG_PREFIX} Sending request for #{batch.length} items" } last_response, exception = retry_with_backoff(retries) do response = send_request(batch) logger.debug { "#{LOG_PREFIX} Response: status=#{response.code}, body=#{response.body}" } [Response.new(response.code.to_i, nil), retry?(response)] end if exception logger.error { "#{LOG_PREFIX} #{exception.}" } exception.backtrace.each { |line| logger.error(line) } Response.new(-1, exception.to_s) else last_response end ensure backoff_policy.reset batch.clear end |
#shutdown ⇒ Object
Closes a persistent connection if it exists
110 111 112 113 |
# File 'lib/brow/transport.rb', line 110 def shutdown logger.info { "#{LOG_PREFIX} Transport shutting down" } @http.finish if @http.started? end |