Method: NATS#request

Defined in:
lib/nats/client.rb

#request(subject, data = nil, opts = {}, &cb) ⇒ Object

Send a request and have the response delivered to the supplied callback.

Parameters:

  • subject (String)
  • msg (Object)
  • callback (Block)

Returns:

  • (Object)

    sid



623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
# File 'lib/nats/client.rb', line 623

def request(subject, data=nil, opts={}, &cb)
  return unless subject

  # In case of using async request then fallback to auto unsubscribe
  # based request/response and not break compatibility too much since
  # new request/response style can only be used with fibers.
  if cb
    inbox = "_INBOX.#{@nuid.next}"
    s = subscribe(inbox, opts) { |msg, reply|
      case cb.arity
      when 0 then cb.call
      when 1 then cb.call(msg)
      else cb.call(msg, reply)
      end
    }
    publish(subject, data, inbox)
    return s
  end

  # If this is the first request being made, then need to start
  # the responses mux handler that handles the responses.
  start_resp_mux_sub! unless @resp_sub_prefix

  # Generate unique token for the reply subject.
  token = @nuid.next
  inbox = "#{@resp_sub_prefix}.#{token}"

  # Synchronous request/response requires using a Fiber
  # to be able to await the response.
  f = Fiber.current
  @resp_map[token][:fiber] = f

  # If awaiting more than a single response then use array
  # to include all that could be gathered before the deadline.
  expected = opts[:max] ||= 1
  @resp_map[token][:expected] = expected
  @resp_map[token][:msgs] = [] if expected > 1

  # Announce the request with the inbox using the token.
  publish(subject, data, inbox)

  # If deadline expires, then discard the token and resume fiber
  opts[:timeout] ||= 0.5
  t = EM.add_timer(opts[:timeout]) do
    if expected > 1
      f.resume @resp_map[token][:msgs]
    else
      f.resume
    end

    @resp_map.delete(token)
  end

  # Wait for the response and cancel timeout callback if received.
  if expected > 1
    # Wait to receive all replies that can get before deadline.
    msgs = Fiber.yield
    EM.cancel_timer(t)

    # Slice and throwaway responses that are not needed.
    return msgs.slice(0, expected)
  else
    msg = Fiber.yield
    EM.cancel_timer(t)
    return msg
  end
end