Class: Bosh::Director::NatsRpc
Overview
Remote procedure call client wrapping NATS
Instance Method Summary collapse
-
#cancel_request(request_id) ⇒ Object
Stops listening for a response.
- #generate_request_id ⇒ Object
-
#initialize(nats_uri) ⇒ NatsRpc
constructor
A new instance of NatsRpc.
-
#nats ⇒ Object
Returns a lazily connected NATS client.
-
#send_message(client, payload) ⇒ Object
Publishes a payload (encoded as JSON) without expecting a response.
-
#send_request(client, request, &callback) ⇒ Object
Sends a request (encoded as JSON) and listens for the response.
Constructor Details
#initialize(nats_uri) ⇒ NatsRpc
Returns a new instance of NatsRpc.
5 6 7 8 9 10 11 |
# File 'lib/bosh/director/nats_rpc.rb', line 5 def initialize(nats_uri) @nats_uri = nats_uri @logger = Config.logger @lock = Mutex.new @inbox_name = "director.#{Config.process_uuid}" @requests = {} end |
Instance Method Details
#cancel_request(request_id) ⇒ Object
Stops listening for a response
44 45 46 |
# File 'lib/bosh/director/nats_rpc.rb', line 44 def cancel_request(request_id) @lock.synchronize { @requests.delete(request_id) } end |
#generate_request_id ⇒ Object
48 49 50 |
# File 'lib/bosh/director/nats_rpc.rb', line 48 def generate_request_id SecureRandom.uuid end |
#nats ⇒ Object
Returns a lazily connected NATS client
14 15 16 |
# File 'lib/bosh/director/nats_rpc.rb', line 14 def nats @nats ||= connect end |
#send_message(client, payload) ⇒ Object
Publishes a payload (encoded as JSON) without expecting a response
19 20 21 22 23 24 25 |
# File 'lib/bosh/director/nats_rpc.rb', line 19 def (client, payload) = JSON.generate(payload) @logger.debug("SENT: #{client} #{}") EM.schedule do nats.publish(client, ) end end |
#send_request(client, request, &callback) ⇒ Object
Sends a request (encoded as JSON) and listens for the response
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/bosh/director/nats_rpc.rb', line 28 def send_request(client, request, &callback) request_id = generate_request_id request["reply_to"] = "#{@inbox_name}.#{request_id}" @lock.synchronize do @requests[request_id] = callback end = JSON.generate(request) @logger.debug("SENT: #{client} #{}") EM.schedule do subscribe_inbox nats.publish(client, ) end request_id end |