Class: BBK::HTTP::Publisher
- Inherits:
-
Object
- Object
- BBK::HTTP::Publisher
- Defined in:
- lib/bbk/http/publisher.rb
Defined Under Namespace
Classes: PublishError
Constant Summary collapse
- PROTOCOLS =
%w[http https].freeze
- CONTENT_TYPE_HEADER =
'Content-Type'.freeze
- METHODS =
Faraday::Connection::METHODS
Instance Attribute Summary collapse
-
#domains ⇒ Object
readonly
Returns the value of attribute domains.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) ⇒ Publisher
constructor
A new instance of Publisher.
- #protocols ⇒ Object
- #publish(result) ⇒ Object
- #raw_publish(uri, method:, options: {}, body: nil, headers: {}) ⇒ Object
Constructor Details
#initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) ⇒ Publisher
Returns a new instance of Publisher.
38 39 40 41 42 |
# File 'lib/bbk/http/publisher.rb', line 38 def initialize(domains, default_connection_options: {}, logger: BBK::HTTP.logger) @domains = domains @default_connection_options = @logger = ActiveSupport::TaggedLogging.new(logger).tagged(self.class.name) end |
Instance Attribute Details
#domains ⇒ Object (readonly)
Returns the value of attribute domains.
36 37 38 |
# File 'lib/bbk/http/publisher.rb', line 36 def domains @domains end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
36 37 38 |
# File 'lib/bbk/http/publisher.rb', line 36 def logger @logger end |
Instance Method Details
#close ⇒ Object
48 |
# File 'lib/bbk/http/publisher.rb', line 48 def close; end |
#protocols ⇒ Object
44 45 46 |
# File 'lib/bbk/http/publisher.rb', line 44 def protocols PROTOCOLS end |
#publish(result) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/bbk/http/publisher.rb', line 50 def publish(result) logger.debug "Try publish dispatcher result #{result.inspect}" route = result.route result_domain = route.domain raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme) raise "Unknown domain #{result_domain}" unless domains.has?(result_domain) domain = domains[result_domain] route_info = domain.call(route) logger.debug "Route #{route.inspect} transformed to #{route_info.inspect}" headers = { **result..headers, **route_info.headers }.transform_values(&:to_s) raw_publish( route_info.uri, method: route_info.method, options: route_info., body: result..payload, headers: headers ) end |
#raw_publish(uri, method:, options: {}, body: nil, headers: {}) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/bbk/http/publisher.rb', line 71 def raw_publish(uri, method:, options: {}, body: nil, headers: {}) raise "Unsupported method(#{method.inspect})" unless METHODS.include?(method.to_sym) content_type = headers.fetch(CONTENT_TYPE_HEADER, Faraday::Request::Json::MIME_TYPE) payload = if content_type == Faraday::Request::Json::MIME_TYPE Oj.generate(body) else body end headers[CONTENT_TYPE_HEADER] = content_type = @default_connection_options.merge() logger.debug("Connection options: #{}") if .present? response = Faraday.new(uri, **.slice(*Faraday::ConnectionOptions.members)).send( method.to_sym, '', payload, headers ) Concurrent::Promises.resolvable_future.tap do |f| data = response.to_hash if response.success? f.fulfill(data) else logger.error "Get error response #{data.except(:body)}" f.reject(PublishError.new(data)) end end rescue Faraday::ConnectionFailed => e logger.error "Faraday connection failed error: #{e.inspect}" Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e })) rescue Faraday::Error => e logger.error "Faraday error: #{e.inspect}" Concurrent::Promises.resolvable_future.reject(PublishError.new({ cause: e })) end |