Class: AngieCoreApi::Messaging::Exchange
- Inherits:
-
Object
- Object
- AngieCoreApi::Messaging::Exchange
- Defined in:
- lib/angie-core-api/messaging/exchange.rb
Class Method Summary collapse
- .exchange(exchange_name, type:, routing_key:, queue: nil, exchange_options: {}, queue_options: {}) ⇒ Object
- .publish(payload, options = {}) ⇒ Object
- .subscribe(options = { manual_ack: false }) ⇒ Object
Class Method Details
.exchange(exchange_name, type:, routing_key:, queue: nil, exchange_options: {}, queue_options: {}) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/angie-core-api/messaging/exchange.rb', line 6 def self.exchange(exchange_name, type:, routing_key:, queue: nil, exchange_options: {}, queue_options: {}) return unless AMQPClient.instance.connection&.connected? channel = AMQPClient.instance.channel @exchange_name = exchange_name # TODO: implement direct and fanout @exchange, @queue = case type when :topic x = channel.topic(exchange_name, { durable: true }.merge()) q = channel.queue(queue, ).bind(x, routing_key:) if queue [x, q] end end |
.publish(payload, options = {}) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/angie-core-api/messaging/exchange.rb', line 33 def self.publish(payload, = {}) return unless AMQPClient.instance.connection&.connected? # TODO: show usage? raise "Exchange is not defined" unless @exchange raise "Routing key is not defined" unless .has_key?(:routing_key) @exchange.publish(payload, { routing_key: [:routing_key], message_id: SecureRandom.uuid, persistent: true, content_type: "application/json", content_encoding: "UTF-8" }.merge()) end |
.subscribe(options = { manual_ack: false }) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/angie-core-api/messaging/exchange.rb', line 20 def self.subscribe( = { manual_ack: false }) return unless AMQPClient.instance.connection&.connected? # TODO: show usage? raise "Exchange is not defined" unless @exchange raise "Queue is not defined" unless @queue @queue.subscribe({ block: false }.merge()) do |delivery_info, properties, payload| r = { delivery_info:, properties:, payload: } yield r AMQPClient.instance.channel.acknowledge(delivery_info.delivery_tag) if [:manual_ack] end end |