Class: Brow::Transport

Inherits:
Object
  • Object
show all
Defined in:
lib/brow/transport.rb

Constant Summary collapse

RETRIES =

Private: Default number of times to retry request.

10
READ_TIMEOUT =

Private: Default read timeout on requests.

8
OPEN_TIMEOUT =

Private: Default open timeout on requests.

4
WRITE_TIMEOUT =

Private: Default write timeout on requests.

4
VALID_HTTP_SCHEMES =

Private: URL schemes that this transport supports.

Set["http", "https"].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Transport

Returns a new instance of Transport.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/brow/transport.rb', line 31

def initialize(options = {})
  @url = options.fetch(:url) {
    ENV.fetch("BROW_URL") {
      raise ArgumentError, ":url is required to be present so we know where to send batches"
    }
  }
  @uri = URI.parse(@url)

  unless VALID_HTTP_SCHEMES.include?(@uri.scheme)
    raise ArgumentError, ":url was must be http(s) scheme but was #{@uri.scheme.inspect}"
  end

  # Default path if people forget a slash.
  if @uri.path.nil? || @uri.path.empty?
    @uri.path = "/"
  end

  @headers = options[:headers] || {}
  @retries = options.fetch(:retries) {
    ENV.fetch("BROW_RETRIES", RETRIES).to_i
  }

  unless @retries >= 0
    raise ArgumentError, ":retries must be >= to 0 but was #{@retries.inspect}"
  end

  @logger = options.fetch(:logger) { Brow.logger }
  @backoff_policy = options.fetch(:backoff_policy) {
    Brow::BackoffPolicy.new(options)
  }

  @http = Net::HTTP.new(@uri.host, @uri.port)
  @http.use_ssl = @uri.scheme == "https"

  read_timeout = options.fetch(:read_timeout) {
    ENV.fetch("BROW_READ_TIMEOUT", READ_TIMEOUT).to_f
  }
  @http.read_timeout = read_timeout if read_timeout

  open_timeout = options.fetch(:open_timeout) {
    ENV.fetch("BROW_OPEN_TIMEOUT", OPEN_TIMEOUT).to_f
  }
  @http.open_timeout = open_timeout if open_timeout

  if RUBY_VERSION >= '2.6.0'
    write_timeout = options.fetch(:write_timeout) {
      ENV.fetch("BROW_WRITE_TIMEOUT", WRITE_TIMEOUT).to_f
    }
    @http.write_timeout = write_timeout if write_timeout
  else
    Kernel.warn("Warning: option :write_timeout requires Ruby version 2.6.0 or later")
  end
end

Instance Attribute Details

#backoff_policyObject (readonly)

Private



29
30
31
# File 'lib/brow/transport.rb', line 29

def backoff_policy
  @backoff_policy
end

#headersObject (readonly)

Private



29
30
31
# File 'lib/brow/transport.rb', line 29

def headers
  @headers
end

#httpObject (readonly)

Private



29
30
31
# File 'lib/brow/transport.rb', line 29

def http
  @http
end

#loggerObject (readonly)

Private



29
30
31
# File 'lib/brow/transport.rb', line 29

def logger
  @logger
end

#retriesObject (readonly)

Private



29
30
31
# File 'lib/brow/transport.rb', line 29

def retries
  @retries
end

#urlObject (readonly)

Private



29
30
31
# File 'lib/brow/transport.rb', line 29

def url
  @url
end

Instance Method Details

#send_batch(batch) ⇒ Response

Sends a batch of messages to the API

Returns:



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/brow/transport.rb', line 88

def send_batch(batch)
  logger.debug { "#{LOG_PREFIX} Sending request for #{batch.length} items" }

  last_response, exception = retry_with_backoff(retries) do
    response = send_request(batch)
    logger.debug { "#{LOG_PREFIX} Response: status=#{response.code}, body=#{response.body}" }
    [Response.new(response.code.to_i, nil), retry?(response)]
  end

  if exception
    logger.error { "#{LOG_PREFIX} #{exception.message}" }
    exception.backtrace.each { |line| logger.error(line) }
    Response.new(-1, exception.to_s)
  else
    last_response
  end
ensure
  backoff_policy.reset
  batch.clear
end

#shutdownObject

Closes a persistent connection if it exists



110
111
112
113
# File 'lib/brow/transport.rb', line 110

def shutdown
  logger.info { "#{LOG_PREFIX} Transport shutting down" }
  @http.finish if @http.started?
end