Class: ZooKeeper::Session
- Inherits:
-
Object
- Object
- ZooKeeper::Session
- Includes:
- Slf4r::Logger
- Defined in:
- lib/zkruby/session.rb
Overview
Represents an session that may span connections
Constant Summary collapse
- DEFAULT_TIMEOUT =
4
- DEFAULT_CONNECT_DELAY =
0.2
- DEFAULT_PORT =
2181
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
-
#ping_interval ⇒ Object
readonly
Returns the value of attribute ping_interval.
-
#ping_logger ⇒ Object
readonly
Returns the value of attribute ping_logger.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#watcher ⇒ Object
Returns the value of attribute watcher.
Instance Method Summary collapse
- #chroot(path) ⇒ Object
- #close(&callback) ⇒ Object
-
#closed? ⇒ Boolean
close won’t run your block if the connection is already closed, so this is how you can check.
-
#connected? ⇒ Boolean
Connection API - testing whether to send a ping.
-
#disconnected ⇒ Object
Connection API - called when the connection has dropped from either end.
-
#initialize(binding, addresses, options = nil) ⇒ Session
constructor
A new instance of Session.
-
#ping ⇒ Object
Connection API - called when no data has been received for #ping_interval.
-
#prime_connection(conn) ⇒ Object
Connection API - Injects a new connection that is ready to receive records.
- #queue_request(request, op, opcode, response = nil, watch_type = nil, watcher = nil, ptype = Packet, &callback) ⇒ Object
-
#receive_records(io) ⇒ Object
Connection API - called when data is available, reads and processes one packet/event.
-
#start ⇒ Object
Start the session - called by the ProtocolBinding.
- #unchroot(path) ⇒ Object
Constructor Details
#initialize(binding, addresses, options = nil) ⇒ Session
Returns a new instance of Session.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/zkruby/session.rb', line 19 def initialize(binding,addresses,=nil) @binding = binding @addresses = parse_addresses(addresses) () # These are the server states # :disconnected, :connected, :auth_failed, :expired @keeper_state = nil # Client state is # :ready, :closing, :closed @client_state = :ready @xid=0 @pending_queue = [] # Create the watch list # hash by watch type of hashes by path of set of watchers @watches = [ :children, :data, :exists ].inject({}) do |ws,wtype| ws[wtype] = Hash.new() { |h,k| h[k] = Set.new() } ws end @watcher = nil @ping_logger = Slf4r::LoggerFacade.new("ZooKeeper::Session::Ping") end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
16 17 18 |
# File 'lib/zkruby/session.rb', line 16 def conn @conn end |
#ping_interval ⇒ Object (readonly)
Returns the value of attribute ping_interval.
13 14 15 |
# File 'lib/zkruby/session.rb', line 13 def ping_interval @ping_interval end |
#ping_logger ⇒ Object (readonly)
Returns the value of attribute ping_logger.
14 15 16 |
# File 'lib/zkruby/session.rb', line 14 def ping_logger @ping_logger end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
15 16 17 |
# File 'lib/zkruby/session.rb', line 15 def timeout @timeout end |
#watcher ⇒ Object
Returns the value of attribute watcher.
17 18 19 |
# File 'lib/zkruby/session.rb', line 17 def watcher @watcher end |
Instance Method Details
#chroot(path) ⇒ Object
50 51 52 |
# File 'lib/zkruby/session.rb', line 50 def chroot(path) return @chroot + path end |
#close(&callback) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/zkruby/session.rb', line 146 def close(&callback) case @client_state when :ready # we keep the requested block in a close packet @close_packet = ClosePacket.new(next_xid(),:close,-11,nil,nil,nil,nil,callback) close_packet = @close_packet @client_state = :closing # If there are other requests in flight, then we wait for them to finish # before sending the close packet since it immediately causes the socket # to close. queue_close_packet_if_necessary() @close_packet when :closed, :closing raise ProtocolError, "Already closed" else raise ProtocolError, "Unexpected state #{@client_state}" end end |
#closed? ⇒ Boolean
close won’t run your block if the connection is already closed, so this is how you can check
61 62 63 |
# File 'lib/zkruby/session.rb', line 61 def closed? @client_state == :closed end |
#connected? ⇒ Boolean
Connection API - testing whether to send a ping
66 67 68 |
# File 'lib/zkruby/session.rb', line 66 def connected?() @keeper_state == :connected end |
#disconnected ⇒ Object
Connection API - called when the connection has dropped from either end
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/zkruby/session.rb', line 103 def disconnected() @conn = nil logger.info { "Disconnected id=#{@session_id}, keeper=:#{@keeper_state}, client=:#{@client_state}" } # We keep trying to reconnect until the session expiration time is reached @disconnect_time = Time.now if @keeper_state == :connected time_since_first_disconnect = (Time.now - @disconnect_time) if @client_state == :closed || time_since_first_disconnect > timeout session_expired() else # if we are connected then everything in the pending queue has been sent so # we must clear # if not, then we'll keep them and hope the next reconnect works if @keeper_state == :connected clear_pending_queue(:disconnected) invoke_watch(@watcher,KeeperState::DISCONNECTED,nil,WatchEvent::NONE) if @watcher end @keeper_state = :disconnected reconnect() end end |
#ping ⇒ Object
Connection API - called when no data has been received for #ping_interval
94 95 96 97 98 99 100 |
# File 'lib/zkruby/session.rb', line 94 def ping() if @keeper_state == :connected ping_logger.debug { "Ping send" } hdr = Proto::RequestHeader.new(:xid => -2, :_type => 11) conn.send_records(hdr) end end |
#prime_connection(conn) ⇒ Object
Connection API - Injects a new connection that is ready to receive records
72 73 74 75 76 77 |
# File 'lib/zkruby/session.rb', line 72 def prime_connection(conn) @conn = conn send_session_connect() send_auth_data() reset_watches() end |
#queue_request(request, op, opcode, response = nil, watch_type = nil, watcher = nil, ptype = Packet, &callback) ⇒ Object
135 136 137 138 139 140 141 142 143 144 |
# File 'lib/zkruby/session.rb', line 135 def queue_request(request,op,opcode,response=nil,watch_type=nil,watcher=nil,ptype=Packet,&callback) raise Error.SESSION_EXPIRED, "Session expired due to client state #{@client_state}" unless @client_state == :ready watch_type, watcher = resolve_watcher(watch_type,watcher) xid = next_xid packet = ptype.new(xid,op,opcode,request,response,watch_type,watcher, callback) queue_packet(packet) end |
#receive_records(io) ⇒ Object
Connection API - called when data is available, reads and processes one packet/event
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/zkruby/session.rb', line 82 def receive_records(io) case @keeper_state when :disconnected complete_connection(io) when :connected process_reply(io) else logger.warn { "Receive packet for closed session #{@keeper_state}" } end end |
#start ⇒ Object
Start the session - called by the ProtocolBinding
127 128 129 130 131 132 133 |
# File 'lib/zkruby/session.rb', line 127 def start() raise ProtocolError, "Already started!" unless @keeper_state.nil? @keeper_state = :disconnected @disconnect_time = Time.now logger.debug("Starting new zookeeper client session") reconnect() end |
#unchroot(path) ⇒ Object
54 55 56 57 |
# File 'lib/zkruby/session.rb', line 54 def unchroot(path) return path unless path path.slice(@chroot.length..-1) end |