Class: BBK::HTTP::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/http/publisher.rb

Defined Under Namespace

Classes: PublishError

Constant Summary collapse

PROTOCOLS =
%w[http https].freeze
CONTENT_TYPE_HEADER =
'Content-Type'.freeze
METHODS =
Faraday::Connection::METHODS

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) ⇒ Publisher

Returns a new instance of Publisher.



38
39
40
41
42
# File 'lib/bbk/http/publisher.rb', line 38

def initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger)
  @domains = domains
  @default_connection_options = default_connection_options
  @logger = ActiveSupport::TaggedLogging.new(logger).tagged(self.class.name)
end

Instance Attribute Details

#domainsObject (readonly)

Returns the value of attribute domains.



36
37
38
# File 'lib/bbk/http/publisher.rb', line 36

def domains
  @domains
end

#loggerObject (readonly)

Returns the value of attribute logger.



36
37
38
# File 'lib/bbk/http/publisher.rb', line 36

def logger
  @logger
end

Instance Method Details

#closeObject



48
# File 'lib/bbk/http/publisher.rb', line 48

def close; end

#protocolsObject



44
45
46
# File 'lib/bbk/http/publisher.rb', line 44

def protocols
  PROTOCOLS
end

#publish(result) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/bbk/http/publisher.rb', line 50

def publish(result)
  logger.debug "Try publish dispatcher result #{result.inspect}"
  route = result.route
  result_domain = route.domain

  raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme)
  raise "Unknown domain #{result_domain}" unless domains.has?(result_domain)

  domain = domains[result_domain]
  route_info = domain.call(route)
  logger.debug "Route #{route.inspect} transformed to #{route_info.inspect}"
  headers = { **result.message.headers, **route_info.headers }.transform_values(&:to_s)
  raw_publish(
    route_info.uri,
    method:  route_info.method,
    options: route_info.options,
    body:    result.message.payload,
    headers: headers
  )
end

#raw_publish(uri, method:, options: {}, body: nil, headers: {}) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/bbk/http/publisher.rb', line 71

def raw_publish(uri, method:, options: {}, body: nil, headers: {})
  raise "Unsupported method(#{method.inspect})" unless METHODS.include?(method.to_sym)

  content_type = headers.fetch(CONTENT_TYPE_HEADER, Faraday::Request::Json::MIME_TYPE)
  payload = if content_type == Faraday::Request::Json::MIME_TYPE
    Oj.generate(body)
  else
    body
  end
  headers[CONTENT_TYPE_HEADER] = content_type
  options = @default_connection_options.merge(options)
  logger.debug("Connection options: #{options}") if options.present?
  response = Faraday.new(uri, **options.slice(*Faraday::ConnectionOptions.members)).send(
    method.to_sym, '', payload, headers
  )
  Concurrent::Promises.resolvable_future.tap do |f|
    data = response.to_hash
    if response.success?
      f.fulfill(data)
    else
      logger.error "Get error response #{data.except(:body)}"
      f.reject(PublishError.new(data))
    end
  end
rescue Faraday::ConnectionFailed => e
  logger.error "Faraday connection failed error: #{e.inspect}"
  Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e }))
rescue Faraday::Error => e
  logger.error "Faraday error: #{e.inspect}"
  Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e }))
end