Method: NATS#process_connect

Defined in:
lib/nats/client.rb

#process_connectObject

:nodoc:

[View source]

1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
# File 'lib/nats/client.rb', line 1062

def process_connect #:nodoc:
  # Reset reconnect attempts since TCP connection has been successful at this point.
  current = server_pool.first
  current[:was_connected] = true
  current[:reconnect_attempts] ||= 0
  cancel_reconnect_timer if reconnecting?

  # Whip through any pending SUB commands since we replay
  # all subscriptions already done anyway.
  @pending.delete_if { |sub| sub[0..2] == SUB_OP } if @pending
  @subs.each_pair { |k, v| send_command("SUB #{v[:subject]} #{v[:queue]} #{k}#{CR_LF}") }

  unless user_err_cb? or reconnecting?
    @err_cb = proc { |e| raise e }
  end

  # We have validated the connection at this point so send CONNECT
  # and any other pending commands which we need to the server.
  flush_pending

  if (connect_cb and not @conn_cb_called)
    # We will round trip the server here to make sure all state from any pending commands
    # has been processed before calling the connect callback.
    queue_server_rt do
      connect_cb.call(self)
      @conn_cb_called = true
    end
  end

  # Notify via reconnect callback that we are again plugged again into the system.
  if reconnecting?
    @reconnecting = false
    @reconnect_cb.call(self) unless @reconnect_cb.nil?
  end

  # Initialize ping timer and processing
  @pings_outstanding = 0
  @pongs_received = 0
  @ping_timer = EM.add_periodic_timer(@options[:ping_interval]) do
    send_ping
  end
end