Method: NATS#drain
- Defined in:
- lib/nats/client.rb
#drain(&blk) ⇒ Object
Drain gracefully closes the connection.
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 |
# File 'lib/nats/client.rb', line 551 def drain(&blk) return if draining? or closing? @draining = true # Remove interest in all subjects to stop receiving messages. @subs.each do |sid, _| send_command("UNSUB #{sid} #{CR_LF}") end # Roundtrip to ensure no more messages are received. flush do drain_timeout_timer, draining_timer = nil, nil drain_timeout_timer = EM.add_timer([:drain_timeout]) do EM.cancel_timer(draining_timer) # Report the timeout via the error callback and just close err_cb.call(NATS::ClientError.new("Drain Timeout")) @draining = false close unless closing? blk.call if blk end # Periodically check for the pending data to be empty. draining_timer = EM.add_periodic_timer(0.1) do next unless closing? or @buf.nil? or @buf.empty? # Subscriptions have been drained already so disallow publishing. @drained_subs = true next unless pending_data_size == 0 EM.cancel_timer(draining_timer) EM.cancel_timer(drain_timeout_timer) # We're done draining and can close now. @draining = false close unless closing? blk.call if blk end end end |