Class: AngieCoreApi::Message::Topic
- Inherits:
-
Object
- Object
- AngieCoreApi::Message::Topic
- Defined in:
- lib/angie-core-api/message/topic.rb
Class Method Summary collapse
- .exchange ⇒ Object
- .publish(routing_key, message_id, payload) ⇒ Object
- .subscribe(routing_key, options = { manual_ack: false }) ⇒ Object
- .topic_name ⇒ Object
Class Method Details
.exchange ⇒ Object
10 11 12 |
# File 'lib/angie-core-api/message/topic.rb', line 10 def self.exchange @exchange ||= AMQPClient.instance.channel.topic(topic_name, durable: true, auto_delete: true) end |
.publish(routing_key, message_id, payload) ⇒ Object
26 27 28 29 30 31 32 33 |
# File 'lib/angie-core-api/message/topic.rb', line 26 def self.publish(routing_key, , payload) exchange.publish(payload, message_id: , routing_key: routing_key, persistent: true, content_type: "application/json", content_encoding: "UTF-8") end |
.subscribe(routing_key, options = { manual_ack: false }) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/angie-core-api/message/topic.rb', line 14 def self.subscribe(routing_key, = { manual_ack: false }) queue = AMQPClient.instance.channel.queue(queue_name(routing_key), durable: true) queue.bind(exchange, routing_key: routing_key) queue.subscribe(.merge(block: false)) do |delivery_info, properties, payload| yield delivery_info, properties, payload AMQPClient.instance.channel.acknowledge(delivery_info.delivery_tag) if [:manual_ack] end rescue Interrupt => _ AMQPClient.reconnect! end |
.topic_name ⇒ Object
6 7 8 |
# File 'lib/angie-core-api/message/topic.rb', line 6 def self.topic_name @topic_name ||= name.underscore.dasherize end |