Method: NATS#process_connect
- Defined in:
- lib/nats/client.rb
permalink #process_connect ⇒ Object
:nodoc:
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 |
# File 'lib/nats/client.rb', line 1062 def process_connect #:nodoc: # Reset reconnect attempts since TCP connection has been successful at this point. current = server_pool.first current[:was_connected] = true current[:reconnect_attempts] ||= 0 cancel_reconnect_timer if reconnecting? # Whip through any pending SUB commands since we replay # all subscriptions already done anyway. @pending.delete_if { |sub| sub[0..2] == SUB_OP } if @pending @subs.each_pair { |k, v| send_command("SUB #{v[:subject]} #{v[:queue]} #{k}#{CR_LF}") } unless user_err_cb? or reconnecting? @err_cb = proc { |e| raise e } end # We have validated the connection at this point so send CONNECT # and any other pending commands which we need to the server. flush_pending if (connect_cb and not @conn_cb_called) # We will round trip the server here to make sure all state from any pending commands # has been processed before calling the connect callback. queue_server_rt do connect_cb.call(self) @conn_cb_called = true end end # Notify via reconnect callback that we are again plugged again into the system. if reconnecting? @reconnecting = false @reconnect_cb.call(self) unless @reconnect_cb.nil? end # Initialize ping timer and processing @pings_outstanding = 0 @pongs_received = 0 @ping_timer = EM.add_periodic_timer(@options[:ping_interval]) do send_ping end end |