Class: AngieCoreApi::Messaging::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/angie-core-api/messaging/exchange.rb

Class Method Summary collapse

Class Method Details

.exchange(exchange_name, type:, routing_key:, queue: nil, exchange_options: {}, queue_options: {}) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/angie-core-api/messaging/exchange.rb', line 6

def self.exchange(exchange_name, type:, routing_key:, queue: nil, exchange_options: {}, queue_options: {})
  return unless AMQPClient.instance.connection&.connected?
  channel = AMQPClient.instance.channel
  @exchange_name = exchange_name

  # TODO: implement direct and fanout
  @exchange, @queue = case type
                      when :topic
                        x = channel.topic(exchange_name, { durable: true }.merge(exchange_options))
                        q = channel.queue(queue, queue_options).bind(x, routing_key:) if queue
                        [x, q]
                      end
end

.publish(payload, options = {}) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/angie-core-api/messaging/exchange.rb', line 33

def self.publish(payload, options = {})
  return unless AMQPClient.instance.connection&.connected?
  # TODO: show usage?
  raise "Exchange is not defined" unless @exchange
  raise "Routing key is not defined" unless options.has_key?(:routing_key)

  @exchange.publish(payload,
                    {
                      routing_key: options[:routing_key],
                      message_id: SecureRandom.uuid,
                      persistent: true,
                      content_type: "application/json",
                      content_encoding: "UTF-8"
                    }.merge(options))
end

.subscribe(options = { manual_ack: false }) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/angie-core-api/messaging/exchange.rb', line 20

def self.subscribe(options = { manual_ack: false })
  return unless AMQPClient.instance.connection&.connected?
  # TODO: show usage?
  raise "Exchange is not defined" unless @exchange
  raise "Queue is not defined" unless @queue

  @queue.subscribe({ block: false }.merge(options)) do |delivery_info, properties, payload|
    r = { delivery_info:, properties:, payload: }
    yield r
    AMQPClient.instance.channel.acknowledge(delivery_info.delivery_tag) if options[:manual_ack]
  end
end