Class: Agent99::NatsMessageClient

Inherits:
Object
  • Object
show all
Defined in:
lib/agent99/nats_message_client.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger: Logger.new($stdout)) ⇒ NatsMessageClient

Returns a new instance of NatsMessageClient.



18
19
20
21
# File 'lib/agent99/nats_message_client.rb', line 18

def initialize(logger: Logger.new($stdout))
  @nats = create_nats_connection
  @logger = logger
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



16
17
18
# File 'lib/agent99/nats_message_client.rb', line 16

def logger
  @logger
end

#natsObject

Returns the value of attribute nats.



16
17
18
# File 'lib/agent99/nats_message_client.rb', line 16

def nats
  @nats
end

Class Method Details

.instanceObject



11
12
13
# File 'lib/agent99/nats_message_client.rb', line 11

def instance
  @instance ||= new
end

Instance Method Details

#delete_queue(queue_name) ⇒ Object



84
85
86
87
88
# File 'lib/agent99/nats_message_client.rb', line 84

def delete_queue(queue_name)
  # NATS doesn't have the concept of deleting queues.
  # Subjects are automatically cleaned up when there are no more subscribers.
  logger.info "NATS doesn't require explicit queue deletion. Subject #{queue_name} will be automatically cleaned up."
end

#listen_for_messages(queue, request_handler:, response_handler:, control_handler:) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/agent99/nats_message_client.rb', line 29

def listen_for_messages(
  queue,
  request_handler:,
  response_handler:,
  control_handler:
)
  @nats.subscribe(queue) do |msg|
    message = JSON.parse(msg.data, symbolize_names: true)
    logger.debug "Received message: #{message.inspect}"

    type = message.dig(:header, :type)

    case type
    when "request"
      request_handler.call(message)
    when "response"
      response_handler.call(message)
    when "control"
      control_handler.call(message)
    else
      raise NotImplementedError, "Unsupported message type: #{type}"
    end
  end

  # Keep the connection open
  loop { sleep 1 }
end

#publish(message) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/agent99/nats_message_client.rb', line 57

def publish(message)
  queue_name = message.dig(:header, :to_uuid)

  begin
    json_payload = JSON.generate(message)

    @nats.publish(queue_name, json_payload)

    logger.info "Message published successfully to queue: #{queue_name}"
    
    # Return a success status
    { success: true, message: "Message published successfully" }
  
  rescue JSON::GeneratorError => e
    logger.error "Failed to convert payload to JSON: #{e.message}"
    { success: false, error: "JSON conversion error: #{e.message}" }
  
  rescue NATS::IO::TimeoutError => e
    logger.error "Failed to publish message: #{e.message}"
    { success: false, error: "Publishing error: #{e.message}" }
  
  rescue StandardError => e
    logger.error "Unexpected error while publishing message: #{e.message}"
    { success: false, error: "Unexpected error: #{e.message}" }
  end
end

#setup(agent_id:, logger:) ⇒ Object



23
24
25
26
27
# File 'lib/agent99/nats_message_client.rb', line 23

def setup(agent_id:, logger:)
  @logger = logger
  # NATS doesn't require explicit queue creation, so we'll just return the agent_id
  agent_id
end