Class: AngieCoreApi::Message::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/angie-core-api/message/topic.rb

Class Method Summary collapse

Class Method Details

.exchangeObject



10
11
12
# File 'lib/angie-core-api/message/topic.rb', line 10

def self.exchange
  @exchange ||= AMQPClient.instance.channel.topic(topic_name, durable: true, auto_delete: true)
end

.publish(routing_key, message_id, payload) ⇒ Object



26
27
28
29
30
31
32
33
# File 'lib/angie-core-api/message/topic.rb', line 26

def self.publish(routing_key, message_id, payload)
  exchange.publish(payload,
                   message_id:       message_id,
                   routing_key:      routing_key,
                   persistent:       true,
                   content_type:     "application/json",
                   content_encoding: "UTF-8")
end

.subscribe(routing_key, options = { manual_ack: false }) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/angie-core-api/message/topic.rb', line 14

def self.subscribe(routing_key, options = { manual_ack: false })
  queue = AMQPClient.instance.channel.queue(queue_name(routing_key), durable: true)
  queue.bind(exchange, routing_key: routing_key)

  queue.subscribe(options.merge(block: false)) do |delivery_info, properties, payload|
    yield delivery_info, properties, payload
    AMQPClient.instance.channel.acknowledge(delivery_info.delivery_tag) if options[:manual_ack]
  end
rescue Interrupt => _
  AMQPClient.reconnect!
end

.topic_nameObject



6
7
8
# File 'lib/angie-core-api/message/topic.rb', line 6

def self.topic_name
  @topic_name ||= name.underscore.dasherize
end