Class: Bosh::Director::NatsRpc
Instance Method Summary collapse
- #cancel_request(request_id) ⇒ Object
- #generate_request_id ⇒ Object
-
#initialize ⇒ NatsRpc
constructor
A new instance of NatsRpc.
- #send_request(client, request, &block) ⇒ Object
- #subscribe_inbox ⇒ Object
Constructor Details
Instance Method Details
#cancel_request(request_id) ⇒ Object
45 46 47 |
# File 'lib/bosh/director/nats_rpc.rb', line 45 def cancel_request(request_id) @lock.synchronize { @requests.delete(request_id) } end |
#generate_request_id ⇒ Object
49 50 51 |
# File 'lib/bosh/director/nats_rpc.rb', line 49 def generate_request_id SecureRandom.uuid end |
#send_request(client, request, &block) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/bosh/director/nats_rpc.rb', line 31 def send_request(client, request, &block) request_id = generate_request_id request["reply_to"] = "#{@inbox_name}.#{request_id}" @lock.synchronize do @requests[request_id] = block end = Yajl::Encoder.encode(request) @logger.debug("SENT: #{client} #{}") EM.next_tick do @nats.publish(client, ) end request_id end |
#subscribe_inbox ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/bosh/director/nats_rpc.rb', line 15 def subscribe_inbox @nats.subscribe("#{@inbox_name}.>") do |, _, subject| @logger.debug("RECEIVED: #{subject} #{}") begin request_id = subject.split(".").last callback = @lock.synchronize { @requests.delete(request_id) } if callback = Yajl::Parser.new.parse() callback.call() end rescue Exception => e @logger.warn(e.) end end end |