Class: BBK::AMQP::Publisher
- Inherits:
-
Object
- Object
- BBK::AMQP::Publisher
- Defined in:
- lib/bbk/amqp/publisher.rb
Overview
Publisher send amqp messages
Defined Under Namespace
Classes: PublishedMessage
Constant Summary collapse
- HEADER_PROP_FIELDS =
%i[message_id reply_to correlation_id].freeze
- PROTOCOLS =
%w[mq amqp amqps].freeze
Instance Attribute Summary collapse
-
#ack_map ⇒ Object
readonly
Returns the value of attribute ack_map.
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#domains ⇒ Object
readonly
Returns the value of attribute domains.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#sended_messages ⇒ Object
readonly
Returns the value of attribute sended_messages.
Instance Method Summary collapse
-
#close ⇒ Object
Close publisher - try close amqp channel.
-
#initialize(connection, domains, logger: BBK::AMQP.logger) ⇒ Publisher
constructor
A new instance of Publisher.
-
#protocols ⇒ Array<Symbol>
Returned supported protocols list.
-
#publish(result) ⇒ Object
Publish dispatcher result.
-
#publish_message(routing_key, message, exchange:, options: {}) ⇒ Object
Publish message.
-
#raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) ⇒ Object
Publish raw payload.
Constructor Details
#initialize(connection, domains, logger: BBK::AMQP.logger) ⇒ Publisher
Returns a new instance of Publisher.
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/bbk/amqp/publisher.rb', line 28 def initialize(connection, domains, logger: BBK::AMQP.logger) @connection = connection @channel = connection.channel @domains = domains logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, "Ch##{@channel.id}"]) @ack_map = Concurrent::Map.new @sended_messages = Concurrent::Map.new @configured_exchanges = Set.new initialize_callbacks end |
Instance Attribute Details
#ack_map ⇒ Object (readonly)
Returns the value of attribute ack_map.
26 27 28 |
# File 'lib/bbk/amqp/publisher.rb', line 26 def ack_map @ack_map end |
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
26 27 28 |
# File 'lib/bbk/amqp/publisher.rb', line 26 def channel @channel end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
26 27 28 |
# File 'lib/bbk/amqp/publisher.rb', line 26 def connection @connection end |
#domains ⇒ Object (readonly)
Returns the value of attribute domains.
26 27 28 |
# File 'lib/bbk/amqp/publisher.rb', line 26 def domains @domains end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
26 27 28 |
# File 'lib/bbk/amqp/publisher.rb', line 26 def logger @logger end |
#sended_messages ⇒ Object (readonly)
Returns the value of attribute sended_messages.
26 27 28 |
# File 'lib/bbk/amqp/publisher.rb', line 26 def @sended_messages end |
Instance Method Details
#close ⇒ Object
Close publisher - try close amqp channel
49 50 51 52 53 54 55 56 |
# File 'lib/bbk/amqp/publisher.rb', line 49 def close @channel.tap do |c| return nil unless c @channel = nil c.close end end |
#protocols ⇒ Array<Symbol>
Returned supported protocols list
44 45 46 |
# File 'lib/bbk/amqp/publisher.rb', line 44 def protocols PROTOCOLS end |
#publish(result) ⇒ Object
Publish dispatcher result
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/bbk/amqp/publisher.rb', line 60 def publish(result) logger.debug "Try publish dispatcher result #{result.inspect}" route = result.route result_domain = route.domain raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme) raise "Unknown domain #{result_domain}" unless domains.has?(result_domain) domain = domains[result_domain] raise ArgumentError.new("Unknown route domain #{resutl_domain}") if domain.nil? route_info = domain.call(route) logger.debug "Route #{route.inspect} transformed to #{route_info.inspect}" = result. (route_info.routing_key, PublishedMessage.new({**.headers, **route_info.headers}, .payload), exchange: route_info.exchange) end |
#publish_message(routing_key, message, exchange:, options: {}) ⇒ Object
Publish message
81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/bbk/amqp/publisher.rb', line 81 def (routing_key, , exchange:, options: {}) logger.debug "Try publish message #{.headers.inspect}" properties = { persistent: true, mandatory: true, routing_key: routing_key, headers: .headers, # user_id: client_name, **.headers.select {|k| HEADER_PROP_FIELDS.include?(k.to_sym) }.compact }.merge().symbolize_keys properties[:user_id] = client_name if .headers[:user_id].blank? (exchange, routing_key, .payload, properties) end |
#raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) ⇒ Object
Publish raw payload
100 101 102 103 104 105 106 107 108 |
# File 'lib/bbk/amqp/publisher.rb', line 100 def raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) logger.debug "Publish raw message #{headers.inspect}" properties = properties.deep_dup properties[:headers] = properties.fetch(:headers, {}).merge headers properties = properties.merge(headers.select do |k| HEADER_PROP_FIELDS.include? k end.compact).symbolize_keys (exchange, routing_key, payload, properties) end |