Method: NATS#request
- Defined in:
- lib/nats/client.rb
#request(subject, data = nil, opts = {}, &cb) ⇒ Object
Send a request and have the response delivered to the supplied callback.
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 |
# File 'lib/nats/client.rb', line 623 def request(subject, data=nil, opts={}, &cb) return unless subject # In case of using async request then fallback to auto unsubscribe # based request/response and not break compatibility too much since # new request/response style can only be used with fibers. if cb inbox = "_INBOX.#{@nuid.next}" s = subscribe(inbox, opts) { |msg, reply| case cb.arity when 0 then cb.call when 1 then cb.call(msg) else cb.call(msg, reply) end } publish(subject, data, inbox) return s end # If this is the first request being made, then need to start # the responses mux handler that handles the responses. start_resp_mux_sub! unless @resp_sub_prefix # Generate unique token for the reply subject. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Synchronous request/response requires using a Fiber # to be able to await the response. f = Fiber.current @resp_map[token][:fiber] = f # If awaiting more than a single response then use array # to include all that could be gathered before the deadline. expected = opts[:max] ||= 1 @resp_map[token][:expected] = expected @resp_map[token][:msgs] = [] if expected > 1 # Announce the request with the inbox using the token. publish(subject, data, inbox) # If deadline expires, then discard the token and resume fiber opts[:timeout] ||= 0.5 t = EM.add_timer(opts[:timeout]) do if expected > 1 f.resume @resp_map[token][:msgs] else f.resume end @resp_map.delete(token) end # Wait for the response and cancel timeout callback if received. if expected > 1 # Wait to receive all replies that can get before deadline. msgs = Fiber.yield EM.cancel_timer(t) # Slice and throwaway responses that are not needed. return msgs.slice(0, expected) else msg = Fiber.yield EM.cancel_timer(t) return msg end end |