Class: NATS::Client
- Inherits:
-
Object
- Object
- NATS::Client
- Includes:
- MonitorMixin, Status
- Defined in:
- lib/nats/io/client.rb
Overview
Client creates a connection to the NATS Server.
Constant Summary collapse
- DEFAULT_PORT =
{ nats: 4222, ws: 80, wss: 443 }.freeze
- DEFAULT_URI =
("nats://localhost:#{DEFAULT_PORT[:nats]}".freeze)
- CR_LF =
("\r\n".freeze)
- CR_LF_SIZE =
(CR_LF.bytesize)
- PING_REQUEST =
("PING#{CR_LF}".freeze)
- PONG_RESPONSE =
("PONG#{CR_LF}".freeze)
- NATS_HDR_LINE =
("NATS/1.0#{CR_LF}".freeze)
- STATUS_MSG_LEN =
3
- STATUS_HDR =
("Status".freeze)
- DESC_HDR =
("Description".freeze)
- NATS_HDR_LINE_SIZE =
(NATS_HDR_LINE.bytesize)
- SUB_OP =
('SUB'.freeze)
- EMPTY_MSG =
(''.freeze)
Constants included from Status
Status::CLOSED, Status::CONNECTED, Status::CONNECTING, Status::DISCONNECTED, Status::DRAINING_PUBS, Status::DRAINING_SUBS, Status::RECONNECTING
Class Attribute Summary collapse
Instance Attribute Summary collapse
-
#connected_server ⇒ Object
readonly
Returns the value of attribute connected_server.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#reloader ⇒ Object
readonly
Returns the value of attribute reloader.
-
#server_info ⇒ Object
readonly
Returns the value of attribute server_info.
-
#server_pool ⇒ Object
(also: #servers)
readonly
Returns the value of attribute server_pool.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#subscription_executor ⇒ Object
readonly
Returns the value of attribute subscription_executor.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Class Method Summary collapse
-
.after_fork ⇒ Object
Re-establish connection in a new process after forking to start new threads.
Instance Method Summary collapse
-
#close ⇒ Object
Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.
- #closed? ⇒ Boolean
-
#connect(uri = nil, opts = {}) ⇒ Object
Prepare connecting to NATS, but postpone real connection until first usage.
- #connected? ⇒ Boolean
- #connecting? ⇒ Boolean
- #disconnected? ⇒ Boolean
-
#discovered_servers ⇒ Object
discovered_servers returns the NATS Servers that have been discovered via INFO protocol updates.
-
#drain ⇒ Object
drain will put a connection into a drain state.
- #draining? ⇒ Boolean
-
#flush(timeout = 10) ⇒ Object
Send a ping and wait for a pong back within a timeout.
-
#initialize(uri = nil, opts = {}) ⇒ Client
constructor
A new instance of Client.
-
#jetstream(opts = {}) ⇒ NATS::JetStream
(also: #JetStream, #jsm)
Create a JetStream context.
- #last_error ⇒ Object
-
#new_inbox ⇒ String
new_inbox returns a unique inbox used for subscriptions.
-
#old_request(subject, payload, opts = {}, &blk) ⇒ Object
Sends a request creating an ephemeral subscription for the request, expecting a single response or raising a timeout in case the request is not retrieved within the specified deadline.
- #on_close(&callback) ⇒ Object
- #on_disconnect(&callback) ⇒ Object
- #on_error(&callback) ⇒ Object
- #on_reconnect(&callback) ⇒ Object
- #publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk) ⇒ Object
-
#publish_msg(msg) ⇒ Object
Publishes a NATS::Msg that may include headers.
- #reconnecting? ⇒ Boolean
-
#request(subject, payload = "", **opts, &blk) ⇒ Object
Sends a request using expecting a single response using a single subscription per connection for receiving the responses.
-
#request_msg(msg, **opts) ⇒ Object
request_msg makes a NATS request using a NATS::Msg that may include headers.
-
#subscribe(subject, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously messages to a callback.
Constructor Details
#initialize(uri = nil, opts = {}) ⇒ Client
Returns a new instance of Client.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/nats/io/client.rb', line 152 def initialize(uri = nil, opts = {}) super() # required to initialize monitor @initial_uri = uri @initial_options = opts # Read/Write IO @io = nil # Queues for coalescing writes of commands we need to send to server. @flush_queue = nil @pending_queue = nil # Parser with state @parser = NATS::Protocol::Parser.new(self) # Threads for both reading and flushing command @flusher_thread = nil @read_loop_thread = nil @ping_interval_thread = nil # Info that we get from the server @server_info = { } # URI from server to which we are currently connected @uri = nil @server_pool = [] @status = nil # Subscriptions @subs = { } @ssid = 0 # Ping interval @pings_outstanding = 0 @pongs_received = 0 @pongs = [] @pongs.extend(MonitorMixin) # Accounting @pending_size = 0 @stats = { in_msgs: 0, out_msgs: 0, in_bytes: 0, out_bytes: 0, reconnects: 0 } # Sticky error @last_err = nil # Async callbacks, no ops by default. @err_cb = proc { } @close_cb = proc { } @disconnect_cb = proc { } @reconnect_cb = proc { } # Secure TLS options @tls = nil # Hostname of current server; used for when TLS host # verification is enabled. @hostname = nil @single_url_connect_used = false # Track whether connect has been already been called. @connect_called = false # New style request/response implementation. @resp_sub = nil @resp_map = nil @resp_sub_prefix = nil @nuid = NATS::NUID.new # NKEYS @user_credentials = nil @nkeys_seed = nil @user_nkey_cb = nil @user_jwt_cb = nil @signature_cb = nil # Tokens @auth_token = nil @inbox_prefix = "_INBOX" # Draining @drain_t = nil # Prepare for calling connect or automatic delayed connection if uri || opts.any? # Keep track of all client instances to handle them after process forking in Ruby 3.1+ INSTANCES[self] = self if !defined?(Ractor) || Ractor.current == Ractor.main # Ractors doesn't work in forked processes @reloader = opts.fetch(:reloader, self.class.default_reloader) end |
Class Attribute Details
.default_reloader ⇒ Object
131 132 133 |
# File 'lib/nats/io/client.rb', line 131 def default_reloader @default_reloader ||= proc { |&block| block.call }.tap { |r| Ractor.make_shareable(r) if defined? Ractor } end |
Instance Attribute Details
#connected_server ⇒ Object (readonly)
Returns the value of attribute connected_server.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def connected_server @connected_server end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def @options end |
#reloader ⇒ Object (readonly)
Returns the value of attribute reloader.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def reloader @reloader end |
#server_info ⇒ Object (readonly)
Returns the value of attribute server_info.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def server_info @server_info end |
#server_pool ⇒ Object (readonly) Also known as: servers
Returns the value of attribute server_pool.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def server_pool @server_pool end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def stats @stats end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def status @status end |
#subscription_executor ⇒ Object (readonly)
Returns the value of attribute subscription_executor.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def subscription_executor @subscription_executor end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
103 104 105 |
# File 'lib/nats/io/client.rb', line 103 def uri @uri end |
Class Method Details
.after_fork ⇒ Object
Re-establish connection in a new process after forking to start new threads.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/nats/io/client.rb', line 136 def after_fork INSTANCES.each do |client| if client.[:reconnect] was_connected = !client.disconnected? client.send(:close_connection, Status::DISCONNECTED, true) client.connect if was_connected else client.send(:err_cb_call, self, NATS::IO::ForkDetectedError, nil) client.close end rescue => e warn "nats: Error during handling after_fork callback: #{e}" # TODO: Report as async error via error callback? end end |
Instance Method Details
#close ⇒ Object
Close connection to NATS, flushing in case connection is alive and there are any pending messages, should not be used while holding the lock.
748 749 750 |
# File 'lib/nats/io/client.rb', line 748 def close close_connection(CLOSED, true) end |
#closed? ⇒ Boolean
778 779 780 |
# File 'lib/nats/io/client.rb', line 778 def closed? @status == CLOSED end |
#connect(uri = nil, opts = {}) ⇒ Object
Prepare connecting to NATS, but postpone real connection until first usage.
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/nats/io/client.rb', line 252 def connect(uri=nil, opts={}) if uri || opts.any? @initial_uri = uri @initial_options = opts end synchronize do # In case it has been connected already, then do not need to call this again. return if @connect_called @connect_called = true end establish_connection! self end |
#connected? ⇒ Boolean
766 767 768 |
# File 'lib/nats/io/client.rb', line 766 def connected? @status == CONNECTED end |
#connecting? ⇒ Boolean
770 771 772 |
# File 'lib/nats/io/client.rb', line 770 def connecting? @status == CONNECTING end |
#disconnected? ⇒ Boolean
762 763 764 |
# File 'lib/nats/io/client.rb', line 762 def disconnected? !@status or @status == DISCONNECTED end |
#discovered_servers ⇒ Object
discovered_servers returns the NATS Servers that have been discovered via INFO protocol updates.
741 742 743 |
# File 'lib/nats/io/client.rb', line 741 def discovered_servers servers.select {|s| s[:discovered] } end |
#drain ⇒ Object
drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ‘on_close` callback option to know when the connection has moved from draining to closed.
822 823 824 825 826 827 828 |
# File 'lib/nats/io/client.rb', line 822 def drain return if draining? synchronize do @drain_t ||= Thread.new { do_drain } end end |
#draining? ⇒ Boolean
782 783 784 785 786 787 788 789 790 791 792 793 |
# File 'lib/nats/io/client.rb', line 782 def draining? if @status == DRAINING_PUBS or @status == DRAINING_SUBS return true end is_draining = false synchronize do is_draining = true if @drain_t end is_draining end |
#flush(timeout = 10) ⇒ Object
Send a ping and wait for a pong back within a timeout.
721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 |
# File 'lib/nats/io/client.rb', line 721 def flush(timeout=10) # Schedule sending a PING, and block until we receive PONG back, # or raise a timeout in case the response is past the deadline. pong = @pongs.new_cond @pongs.synchronize do @pongs << pong # Flush once pong future has been prepared @pending_queue << PING_REQUEST @flush_queue << :ping MonotonicTime::with_nats_timeout(timeout) do pong.wait(timeout) end end end |
#jetstream(opts = {}) ⇒ NATS::JetStream Also known as: JetStream, jsm
Create a JetStream context.
836 837 838 |
# File 'lib/nats/io/client.rb', line 836 def jetstream(opts={}) ::NATS::JetStream.new(self, opts) end |
#last_error ⇒ Object
811 812 813 814 815 |
# File 'lib/nats/io/client.rb', line 811 def last_error synchronize do @last_err end end |
#new_inbox ⇒ String
new_inbox returns a unique inbox used for subscriptions.
754 755 756 |
# File 'lib/nats/io/client.rb', line 754 def new_inbox "#{@inbox_prefix}.#{@nuid.next}" end |
#old_request(subject, payload, opts = {}, &blk) ⇒ Object
Sends a request creating an ephemeral subscription for the request, expecting a single response or raising a timeout in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 |
# File 'lib/nats/io/client.rb', line 657 def old_request(subject, payload, opts={}, &blk) return unless subject inbox = new_inbox # If a callback was passed, then have it process # the messages asynchronously and return the sid. if blk opts[:max] ||= 1 s = subscribe(inbox, opts) do |msg| case blk.arity when 0 then blk.call when 1 then blk.call(msg) when 2 then blk.call(msg.data, msg.reply) when 3 then blk.call(msg.data, msg.reply, msg.subject) else blk.call(msg.data, msg.reply, msg.subject, msg.header) end end publish(subject, payload, inbox) return s end # In case block was not given, handle synchronously # with a timeout and only allow a single response. timeout = opts[:timeout] ||= 0.5 opts[:max] = 1 sub = Subscription.new sub.subject = inbox sub.received = 0 future = sub.new_cond sub.future = future sub.nc = self sid = nil synchronize do sid = (@ssid += 1) sub.sid = sid @subs[sid] = sub end send_command("SUB #{inbox} #{sid}#{CR_LF}") @flush_queue << :sub unsubscribe(sub, 1) sub.synchronize do # Publish the request and then wait for the response... publish(subject, payload, inbox) MonotonicTime::with_nats_timeout(timeout) do future.wait(timeout) end end response = sub.response if response and response.header status = response.header[STATUS_HDR] raise NATS::IO::NoRespondersError if status == "503" end response end |
#on_close(&callback) ⇒ Object
807 808 809 |
# File 'lib/nats/io/client.rb', line 807 def on_close(&callback) @close_cb = callback end |
#on_disconnect(&callback) ⇒ Object
799 800 801 |
# File 'lib/nats/io/client.rb', line 799 def on_disconnect(&callback) @disconnect_cb = callback end |
#on_error(&callback) ⇒ Object
795 796 797 |
# File 'lib/nats/io/client.rb', line 795 def on_error(&callback) @err_cb = callback end |
#on_reconnect(&callback) ⇒ Object
803 804 805 |
# File 'lib/nats/io/client.rb', line 803 def on_reconnect(&callback) @reconnect_cb = callback end |
#publish(subject, msg = EMPTY_MSG, opt_reply = nil, **options, &blk) ⇒ Object
452 453 454 455 456 457 458 459 460 461 462 463 464 465 |
# File 'lib/nats/io/client.rb', line 452 def publish(subject, msg=EMPTY_MSG, opt_reply=nil, **, &blk) raise NATS::IO::BadSubject if !subject or subject.empty? if [:header] return publish_msg(NATS::Msg.new(subject: subject, data: msg, reply: opt_reply, header: [:header])) end # Accounting msg_size = msg.bytesize @stats[:out_msgs] += 1 @stats[:out_bytes] += msg_size send_command("PUB #{subject} #{opt_reply} #{msg_size}\r\n#{msg}\r\n") @flush_queue << :pub if @flush_queue.empty? end |
#publish_msg(msg) ⇒ Object
Publishes a NATS::Msg that may include headers.
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/nats/io/client.rb', line 468 def publish_msg(msg) raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg) raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty? msg.reply ||= '' msg.data ||= '' msg_size = msg.data.bytesize # Accounting @stats[:out_msgs] += 1 @stats[:out_bytes] += msg_size if msg.header hdr = '' hdr << NATS_HDR_LINE msg.header.each do |k, v| hdr << "#{k}: #{v}#{CR_LF}" end hdr << CR_LF hdr_len = hdr.bytesize total_size = msg_size + hdr_len send_command("HPUB #{msg.subject} #{msg.reply} #{hdr_len} #{total_size}\r\n#{hdr}#{msg.data}\r\n") else send_command("PUB #{msg.subject} #{msg.reply} #{msg_size}\r\n#{msg.data}\r\n") end @flush_queue << :pub if @flush_queue.empty? end |
#reconnecting? ⇒ Boolean
774 775 776 |
# File 'lib/nats/io/client.rb', line 774 def reconnecting? @status == RECONNECTING end |
#request(subject, payload = "", **opts, &blk) ⇒ Object
Sends a request using expecting a single response using a single subscription per connection for receiving the responses. It times out in case the request is not retrieved within the specified deadline. If given a callback, then the request happens asynchronously.
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 |
# File 'lib/nats/io/client.rb', line 542 def request(subject, payload="", **opts, &blk) raise NATS::IO::BadSubject if !subject or subject.empty? # If a block was given then fallback to method using auto unsubscribe. return old_request(subject, payload, opts, &blk) if blk return old_request(subject, payload, opts) if opts[:old_style] if opts[:header] return request_msg(NATS::Msg.new(subject: subject, data: payload, header: opts[:header]), **opts) end token = nil inbox = nil future = nil response = nil timeout = opts[:timeout] ||= 0.5 synchronize do start_resp_mux_sub! unless @resp_sub_prefix # Create token for this request. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Create the a future for the request that will # get signaled when it receives the request. future = @resp_sub.new_cond @resp_map[token][:future] = future end # Publish request and wait for reply. publish(subject, payload, inbox) begin MonotonicTime::with_nats_timeout(timeout) do @resp_sub.synchronize do future.wait(timeout) end end rescue NATS::Timeout => e synchronize { @resp_map.delete(token) } raise e end # Check if there is a response already. synchronize do result = @resp_map[token] response = result[:response] @resp_map.delete(token) end if response and response.header status = response.header[STATUS_HDR] raise NATS::IO::NoRespondersError if status == "503" end response end |
#request_msg(msg, **opts) ⇒ Object
request_msg makes a NATS request using a NATS::Msg that may include headers.
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 |
# File 'lib/nats/io/client.rb', line 600 def request_msg(msg, **opts) raise TypeError, "nats: expected NATS::Msg, got #{msg.class.name}" unless msg.is_a?(Msg) raise NATS::IO::BadSubject if !msg.subject or msg.subject.empty? token = nil inbox = nil future = nil response = nil timeout = opts[:timeout] ||= 0.5 synchronize do start_resp_mux_sub! unless @resp_sub_prefix # Create token for this request. token = @nuid.next inbox = "#{@resp_sub_prefix}.#{token}" # Create the a future for the request that will # get signaled when it receives the request. future = @resp_sub.new_cond @resp_map[token][:future] = future end msg.reply = inbox msg.data ||= '' msg_size = msg.data.bytesize # Publish request and wait for reply. publish_msg(msg) begin MonotonicTime::with_nats_timeout(timeout) do @resp_sub.synchronize do future.wait(timeout) end end rescue NATS::Timeout => e synchronize { @resp_map.delete(token) } raise e end # Check if there is a response already. synchronize do result = @resp_map[token] response = result[:response] @resp_map.delete(token) end if response and response.header status = response.header[STATUS_HDR] raise NATS::IO::NoRespondersError if status == "503" end response end |
#subscribe(subject, opts = {}, &callback) ⇒ Object
Create subscription which is dispatched asynchronously messages to a callback.
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 |
# File 'lib/nats/io/client.rb', line 499 def subscribe(subject, opts={}, &callback) raise NATS::IO::ConnectionDrainingError.new("nats: connection draining") if draining? sid = nil sub = nil synchronize do sid = (@ssid += 1) sub = @subs[sid] = Subscription.new sub.nc = self sub.sid = sid end opts[:pending_msgs_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT opts[:pending_bytes_limit] ||= NATS::IO::DEFAULT_SUB_PENDING_BYTES_LIMIT sub.subject = subject sub.callback = callback sub.received = 0 sub.queue = opts[:queue] if opts[:queue] sub.max = opts[:max] if opts[:max] sub.pending_msgs_limit = opts[:pending_msgs_limit] sub.pending_bytes_limit = opts[:pending_bytes_limit] sub.pending_queue = SizedQueue.new(sub.pending_msgs_limit) sub.processing_concurrency = opts[:processing_concurrency] if opts.key?(:processing_concurrency) send_command("SUB #{subject} #{opts[:queue]} #{sid}#{CR_LF}") @flush_queue << :sub # Setup server support for auto-unsubscribe when receiving enough messages sub.unsubscribe(opts[:max]) if opts[:max] unless callback cond = sub.new_cond sub.wait_for_msgs_cond = cond end sub end |