Class: ZK::Client::Threaded
- Includes:
- Conveniences, StateMixin, Unixisms, Logger
- Defined in:
- lib/zk/client/threaded.rb
Overview
This is the default client that ZK will use. In the zk-eventmachine gem, there is an Evented client.
If you want to register on_*
callbacks (see StateMixin)
then you should pass a block, which will be called before the
connection is set up (this way you can get the on_connected
event). See
the 'Register on_connected callback' example.
A note on event delivery. There has been some confusion, caused by incorrect documentation (which I'm very sorry about), about how many threads are delivering events. The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1.
If you use the threadpool/event callbacks to perform work, you may be
interested in registering an on_exception
callback that will receive
all exceptions that occur on the threadpool that are not handled (i.e.
that bubble up to top of a block).
Constant Summary collapse
- DEFAULT_THREADPOOL_SIZE =
1
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- #associating? ⇒ Boolean
-
#close ⇒ Object
ZK::Client::Base#close.
-
#close! ⇒ Object
close the underlying connection and clear all pending events.
- #closed? ⇒ Boolean
- #connect(opts = {}) ⇒ Object
-
#connected? ⇒ Boolean
this overrides the implementation in StateMixin.
- #connecting? ⇒ Boolean
-
#create(path, *args) ⇒ Object
this is where the :on option is implemented for Base#create.
- #expired_session? ⇒ Boolean
-
#initialize(host, opts = {}) {|self| ... } ⇒ Threaded
constructor
Construct a new threaded client.
-
#on_exception(&blk) ⇒ Object
register a block to be called back with unhandled exceptions that occur in the threadpool.
-
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads.
-
#reopen(timeout = nil) ⇒ Symbol
reopen the underlying connection.
- #state ⇒ Object
Methods included from Logger
#logger, wrapped_logger, wrapped_logger=
Methods included from Conveniences
#election_candidate, #election_observer, #locker, #queue, #shared_locker, #with_lock
Methods included from Unixisms
#block_until_node_deleted, #find, #mkdir_p, #rm_rf
Methods included from StateMixin
#on_connected, #on_connecting, #on_expired_session, #on_state_change
Methods inherited from Base
#add_auth, #children, #delete, #event_dispatch_thread?, #exists?, #get, #get_acl, #register, #session_id, #session_passwd, #set, #set_acl, #stat, #watcher
Constructor Details
#initialize(host, opts = {}) {|self| ... } ⇒ Threaded
The :timeout
argument here is not the session_timeout for the
connection. rather it is the amount of time we wait for the connection
to be established. The session timeout exchanged with the server is
set to 10s by default in the C implemenation, and as of version 0.8.0
of slyphon-zookeeper has yet to be exposed as an option. That feature
is planned.
The documentation for 0.9.0 was incorrect in stating the number
of threads used to deliver events. There was one, unconfigurable,
event dispatch thread. In 1.0 the number of event delivery threads is
configurable, but still defaults to 1 and users are discouraged from
adjusting the value due to the complexity this introduces. In 1.1
there is a better option for achieving higher concurrency (see the
:thread
option)
The Management apologizes for any confusion this may have caused.
Construct a new threaded client.
Pay close attention to the :threaded
option, and have a look at the
EventDeliveryModel
page in the wiki for a discussion of the relative advantages and
disadvantages of the choices available. The default is safe, but the
alternative will likely provide better performance.
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/zk/client/threaded.rb', line 153 def initialize(host, opts={}, &b) super(host, opts) tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE) @threadpool = Threadpool.new(tp_size) @connection_timeout = opts[:timeout] || DEFAULT_TIMEOUT # maybe move this into superclass? @event_handler = EventHandler.new(self, opts) @reconnect = opts.fetch(:reconnect, true) setup_locks @client_state = RUNNING # this is to distinguish between *our* state and the underlying connection state # this is the last status update we've received from the underlying connection @last_cnx_state = nil @retry_duration = opts.fetch(:retry_duration, nil).to_i yield self if block_given? @fork_subs = [ ForkHook.prepare_for_fork(method(:pause_before_fork_in_parent)), ForkHook.after_fork_in_parent(method(:resume_after_fork_in_parent)), ForkHook.after_fork_in_child(method(:reopen)), ] ObjectSpace.define_finalizer(self, self.class.finalizer(@fork_subs)) connect(opts) if opts.fetch(:connect, true) end |
Instance Method Details
#associating? ⇒ Boolean
367 368 369 |
# File 'lib/zk/client/threaded.rb', line 367 def associating? @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_ASSOCIATING_STATE } end |
#close ⇒ Object
ZK::Client::Base#close
394 395 396 397 398 399 |
# File 'lib/zk/client/threaded.rb', line 394 def close super subs, @fork_subs = @fork_subs, [] subs.each(&:unsubscribe) nil end |
#close! ⇒ Object
We will make our best effort to do the right thing if you call this method while in the threadpool. It is a much better idea to call us from the main thread, or at least a thread we're not going to be trying to shut down as part of closing the connection and threadpool.
close the underlying connection and clear all pending events.
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/zk/client/threaded.rb', line 324 def close! @mutex.synchronize do return if [:closed, :close_requested].include?(@client_state) logger.debug { "moving to :close_requested state" } @client_state = CLOSE_REQ @cond.broadcast end join_and_clear_reconnect_thread on_tpool = on_threadpool? # Ok, so the threadpool will wait up to N seconds while joining each thread. # If _we're on a threadpool thread_, have it wait until we're ready to jump # out of this method, and tell it to wait up to 5 seconds to let us get # clear, then do the rest of the shutdown of the connection # # if the user *doesn't* hate us, then we just join the shutdown_thread immediately # and wait for it to exit # shutdown_thread = Thread.new do Thread.current[:name] = 'shutdown' @threadpool.shutdown(10) # this will call #close super @mutex.synchronize do logger.debug { "moving to :closed state" } @client_state = CLOSED @last_cnx_state = nil @cond.broadcast end end on_tpool ? shutdown_thread : shutdown_thread.join(30) end |
#closed? ⇒ Boolean
411 412 413 414 |
# File 'lib/zk/client/threaded.rb', line 411 def closed? return true if @mutex.synchronize { @client_state == CLOSED } super end |
#connect(opts = {}) ⇒ Object
203 204 205 |
# File 'lib/zk/client/threaded.rb', line 203 def connect(opts={}) @mutex.synchronize { unlocked_connect(opts) } end |
#connected? ⇒ Boolean
this overrides the implementation in StateMixin
363 364 365 |
# File 'lib/zk/client/threaded.rb', line 363 def connected? @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE } end |
#connecting? ⇒ Boolean
371 372 373 |
# File 'lib/zk/client/threaded.rb', line 371 def connecting? @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTING_STATE } end |
#create(path, *args) ⇒ Object
this is where the :on option is implemented for Base#create
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/zk/client/threaded.rb', line 417 def create(path, *args) opts = args. or_opt = opts.delete(:or) args << opts if or_opt hash = parse_create_args(path, *args) raise ArgumentError, "valid options for :or are nil or :set, not #{or_opt.inspect}" unless or_opt == :set raise ArgumentError, "you cannot create an ephemeral node when using the :or option" if hash[:ephemeral] raise ArgumentError, "you cannot create an sequence node when using the :or option" if hash[:sequence] mkdir_p(path, :data => hash[:data]) path else # ok, none of our business, hand it up to mangement super(path, *args) end end |
#expired_session? ⇒ Boolean
375 376 377 378 379 380 381 382 383 384 385 |
# File 'lib/zk/client/threaded.rb', line 375 def expired_session? @mutex.synchronize do return false unless @cnx and running? if defined?(::JRUBY_VERSION) !@cnx.state.alive? else @last_cnx_state == Zookeeper::ZOO_EXPIRED_SESSION_STATE end end end |
#on_exception(&blk) ⇒ Object
if your exception callback block itself raises an exception, I will make fun of you.
register a block to be called back with unhandled exceptions that occur in the threadpool.
407 408 409 |
# File 'lib/zk/client/threaded.rb', line 407 def on_exception(&blk) @threadpool.on_exception(&blk) end |
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads
402 403 404 |
# File 'lib/zk/client/threaded.rb', line 402 def on_threadpool? @threadpool and @threadpool.on_threadpool? end |
#reopen(timeout = nil) ⇒ Symbol
reopen the underlying connection
The timeout
param is here mainly for legacy support.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/zk/client/threaded.rb', line 208 def reopen(timeout=nil) # Clear outstanding watch restrictions @event_handler.clear_outstanding_watch_restrictions! # If we've forked, then we can call all sorts of normally dangerous # stuff because we're the only thread. if forked? # ok, just to sanity check here raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION) logger.debug { "reopening everything, fork detected!" } setup_locks @pid = Process.pid @client_state = RUNNING # reset state to running if we were paused old_cnx, @cnx = @cnx, nil old_cnx.close! if old_cnx # && !old_cnx.closed? join_and_clear_reconnect_thread @mutex.synchronize do # it's important that we're holding the lock, as access to 'cnx' is # synchronized, and we want to avoid a race where event handlers # might see a nil connection. I've seen this exception occur *once* # so it's pretty rare (it was on 1.8.7 too), but just to be double # extra paranoid @event_handler.reopen_after_fork! @threadpool.reopen_after_fork! # prune dead threadpool threads after a fork() unlocked_connect end else @mutex.synchronize do if @client_state == PAUSED # XXX: what to do in this case? does it matter? end logger.debug { "reopening, no fork detected" } @last_cnx_state = Zookeeper::ZOO_CONNECTING_STATE @client_state = RUNNING # reset state to running if we were paused or closed timeout ||= @connection_timeout # or @connection_timeout here is the docuemnted behavior on Base#reopen @cnx.reopen(timeout) # ok, we werent' forked, so just reopen # this is a bit of a hack, because we need to wait until the event thread # delivers the connected event, which we used to be able to rely on just the # connection doing. since we don't want to call the @cnx.state method to check # (rather use the cached @last_cnx_state), we wait for consistency's sake (timeout) end end state end |
#state ⇒ Object
387 388 389 390 391 |
# File 'lib/zk/client/threaded.rb', line 387 def state @mutex.synchronize do STATE_SYM_MAP.fetch(@last_cnx_state) { |k| raise IndexError, "unrecognized state: #{k.inspect}" } end end |