Method: NATS#drain

Defined in:
lib/nats/client.rb

#drain(&blk) ⇒ Object

Drain gracefully closes the connection.

Parameters:

  • blk (Block)

    called when drain is done and connection is closed.



551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/nats/client.rb', line 551

def drain(&blk)
  return if draining? or closing?
  @draining = true

  # Remove interest in all subjects to stop receiving messages.
  @subs.each do |sid, _|
    send_command("UNSUB #{sid} #{CR_LF}")
  end

  # Roundtrip to ensure no more messages are received.
  flush do
    drain_timeout_timer, draining_timer = nil, nil
    drain_timeout_timer = EM.add_timer(options[:drain_timeout]) do
      EM.cancel_timer(draining_timer)

      # Report the timeout via the error callback and just close
      err_cb.call(NATS::ClientError.new("Drain Timeout"))
      @draining = false
      close unless closing?
      blk.call if blk
    end

    # Periodically check for the pending data to be empty.
    draining_timer = EM.add_periodic_timer(0.1) do
      next unless closing? or @buf.nil? or @buf.empty?

      # Subscriptions have been drained already so disallow publishing.
      @drained_subs = true
      next unless pending_data_size == 0
      EM.cancel_timer(draining_timer)
      EM.cancel_timer(drain_timeout_timer)

      # We're done draining and can close now.
      @draining = false
      close unless closing?
      blk.call if blk
    end
  end
end