Class: Agent99::AmqpMessageClient
- Inherits:
-
Object
- Object
- Agent99::AmqpMessageClient
- Defined in:
- lib/agent99/amqp_message_client.rb
Constant Summary collapse
- CONFIG =
{ host: "127.0.0.1", port: 5672, ssl: false, vhost: "/", user: "guest", pass: "guest", heartbeat: :server, # will use RabbitMQ setting frame_max: 131072, auth_mechanism: "PLAIN" }
- QUEUE_TTL =
60 seconds TTL
60_000
Instance Attribute Summary collapse
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
-
#logger ⇒ Object
Returns the value of attribute logger.
Class Method Summary collapse
Instance Method Summary collapse
- #create_queue(agent_id) ⇒ Object
- #delete_queue(queue_name) ⇒ Object
-
#initialize(config: CONFIG, logger: Logger.new($stdout)) ⇒ AmqpMessageClient
constructor
A new instance of AmqpMessageClient.
- #listen_for_messages(queue, request_handler:, response_handler:, control_handler:) ⇒ Object
- #publish(message) ⇒ Object
- #setup(agent_id:, logger:) ⇒ Object
Constructor Details
#initialize(config: CONFIG, logger: Logger.new($stdout)) ⇒ AmqpMessageClient
Returns a new instance of AmqpMessageClient.
33 34 35 36 37 38 39 40 41 |
# File 'lib/agent99/amqp_message_client.rb', line 33 def initialize( config: CONFIG, logger: Logger.new($stdout)) @config = config @connection = create_amqp_connection @channel = @connection.create_channel @exchange = @channel.default_exchange @logger = logger end |
Instance Attribute Details
#channel ⇒ Object
Returns the value of attribute channel.
31 32 33 |
# File 'lib/agent99/amqp_message_client.rb', line 31 def channel @channel end |
#exchange ⇒ Object
Returns the value of attribute exchange.
31 32 33 |
# File 'lib/agent99/amqp_message_client.rb', line 31 def exchange @exchange end |
#logger ⇒ Object
Returns the value of attribute logger.
31 32 33 |
# File 'lib/agent99/amqp_message_client.rb', line 31 def logger @logger end |
Class Method Details
.instance ⇒ Object
26 27 28 |
# File 'lib/agent99/amqp_message_client.rb', line 26 def instance @instance ||= new end |
Instance Method Details
#create_queue(agent_id) ⇒ Object
52 53 54 55 |
# File 'lib/agent99/amqp_message_client.rb', line 52 def create_queue(agent_id) queue_name = "#{agent_id}" @channel.queue(queue_name, expires: QUEUE_TTL) end |
#delete_queue(queue_name) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/agent99/amqp_message_client.rb', line 124 def delete_queue(queue_name) return logger.warn("Attempted to delete queue with nil name") if queue_name.nil? begin queue = @channel.queue(queue_name, passive: true) queue.delete logger.info "Queue #{queue_name} was deleted" rescue Bunny::NotFound logger.warn "Queue #{queue_name} not found" rescue StandardError => e logger.error "Error deleting queue #{queue_name}: #{e.}" end end |
#listen_for_messages(queue, request_handler:, response_handler:, control_handler:) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/agent99/amqp_message_client.rb', line 57 def ( queue, request_handler:, response_handler:, control_handler: ) queue.subscribe(block: true) do |delivery_info, properties, body| = JSON.parse(body, symbolize_names: true) logger.debug "Received message: #{.inspect}" type = .dig(:header, :type) case type when "request" request_handler.call() when "response" response_handler.call() when "control" control_handler.call() else raise NotImplementedError, "Unsupported message type: #{type}" end end end |
#publish(message) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/agent99/amqp_message_client.rb', line 83 def publish() queue_name = .dig(:header, :to_uuid) begin # FIXME: message.to_json json_payload = JSON.generate() exchange.publish(json_payload, routing_key: queue_name) logger.info "#{.dig(:header,:type).to_s.upcase} message published successfully to queue: #{queue_name}" # Return a success status { success: true, message: "Message published successfully" } rescue JSON::GeneratorError => e logger.error "Failed to convert payload to JSON: #{e.}" { success: false, error: "JSON conversion error: #{e.}" } rescue Bunny::ConnectionClosedError, Bunny::ChannelAlreadyClosed => e logger.error "Failed to publish message: #{e.}" { success: false, error: "Publishing error: #{e.}" } rescue StandardError => e logger.error "Unexpected error while publishing message: #{e.}" { success: false, error: "Unexpected error: #{e.}" } end end |
#setup(agent_id:, logger:) ⇒ Object
43 44 45 46 47 48 49 50 |
# File 'lib/agent99/amqp_message_client.rb', line 43 def setup(agent_id:, logger:) queue = create_queue(agent_id) logger.info "Created queue for agent_id: #{agent_id}" # Returning the queue to be used in the Base class queue end |