Class: BBK::AMQP::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/amqp/publisher.rb

Overview

Publisher send amqp messages

Defined Under Namespace

Classes: PublishedMessage

Constant Summary collapse

HEADER_PROP_FIELDS =
%i[message_id reply_to correlation_id].freeze
PROTOCOLS =
%w[mq amqp amqps].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, domains, logger: BBK::AMQP.logger) ⇒ Publisher

Returns a new instance of Publisher.



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/bbk/amqp/publisher.rb', line 28

def initialize(connection, domains, logger: BBK::AMQP.logger)
  @connection = connection
  @channel = connection.channel
  @domains = domains

  logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
  @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, "Ch##{@channel.id}"])

  @ack_map = Concurrent::Map.new
  @sended_messages = Concurrent::Map.new
  @configured_exchanges = Set.new
  initialize_callbacks
end

Instance Attribute Details

#ack_mapObject (readonly)

Returns the value of attribute ack_map.



26
27
28
# File 'lib/bbk/amqp/publisher.rb', line 26

def ack_map
  @ack_map
end

#channelObject (readonly)

Returns the value of attribute channel.



26
27
28
# File 'lib/bbk/amqp/publisher.rb', line 26

def channel
  @channel
end

#connectionObject (readonly)

Returns the value of attribute connection.



26
27
28
# File 'lib/bbk/amqp/publisher.rb', line 26

def connection
  @connection
end

#domainsObject (readonly)

Returns the value of attribute domains.



26
27
28
# File 'lib/bbk/amqp/publisher.rb', line 26

def domains
  @domains
end

#loggerObject (readonly)

Returns the value of attribute logger.



26
27
28
# File 'lib/bbk/amqp/publisher.rb', line 26

def logger
  @logger
end

#sended_messagesObject (readonly)

Returns the value of attribute sended_messages.



26
27
28
# File 'lib/bbk/amqp/publisher.rb', line 26

def sended_messages
  @sended_messages
end

Instance Method Details

#closeObject

Close publisher - try close amqp channel



49
50
51
52
53
54
55
56
# File 'lib/bbk/amqp/publisher.rb', line 49

def close
  @channel.tap do |c|
    return nil unless c

    @channel = nil
    c.close
  end
end

#protocolsArray<Symbol>

Returned supported protocols list

Returns:

  • (Array<Symbol>)


44
45
46
# File 'lib/bbk/amqp/publisher.rb', line 44

def protocols
  PROTOCOLS
end

#publish(result) ⇒ Object

Publish dispatcher result

Parameters:

  • result (BBK::App::Dispatcher::Result)

    sended result

Raises:

  • (ArgumentError)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/bbk/amqp/publisher.rb', line 60

def publish(result)
  logger.debug "Try publish dispatcher result #{result.inspect}"
  route = result.route
  result_domain = route.domain
  raise "Unsupported protocol #{route.scheme}" unless PROTOCOLS.include?(route.scheme)
  raise "Unknown domain #{result_domain}" unless domains.has?(result_domain)

  domain = domains[result_domain]
  raise ArgumentError.new("Unknown route domain #{resutl_domain}") if domain.nil?

  route_info = domain.call(route)
  logger.debug "Route #{route.inspect} transformed to #{route_info.inspect}"
  message = result.message
  publish_message(route_info.routing_key, PublishedMessage.new({**message.headers, **route_info.headers}, message.payload), exchange: route_info.exchange)
end

#publish_message(routing_key, message, exchange:, options: {}) ⇒ Object

Publish message

Parameters:

  • routing_key (String)

    message routing key

  • message (Object)

    (object with headers and payload method)

  • exchange (String)

    exchange for sending message

  • options (Hash) (defaults to: {})

    message properties



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/bbk/amqp/publisher.rb', line 81

def publish_message(routing_key, message, exchange:, options: {})
  logger.debug "Try publish message #{message.headers.inspect}"
  properties = {
    persistent:  true,
    mandatory:   true,
    routing_key: routing_key,
    headers:     message.headers,
    # user_id:     client_name,
    **message.headers.select {|k| HEADER_PROP_FIELDS.include?(k.to_sym) }.compact
  }.merge(options).symbolize_keys
  properties[:user_id] = client_name if message.headers[:user_id].blank?
  send_message(exchange, routing_key, message.payload, properties)
end

#raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {}) ⇒ Object

Publish raw payload

Parameters:

  • routing_key (String)

    routing key for sending data

  • exchange (String)

    exchange name

  • properties (Hash) (defaults to: {})

    amqp message properties

  • headers (Messag) (defaults to: {})


100
101
102
103
104
105
106
107
108
# File 'lib/bbk/amqp/publisher.rb', line 100

def raw_publish(routing_key, exchange:, properties: {}, headers: {}, payload: {})
  logger.debug "Publish raw message #{headers.inspect}"
  properties = properties.deep_dup
  properties[:headers] = properties.fetch(:headers, {}).merge headers
  properties = properties.merge(headers.select do |k|
                                  HEADER_PROP_FIELDS.include? k
                                end.compact).symbolize_keys
  send_message(exchange, routing_key, payload, properties)
end