Class: ZK::Client::Threaded

Inherits:
Base
  • Object
show all
Includes:
Conveniences, StateMixin, Unixisms, Logger
Defined in:
lib/zk/client/threaded.rb

Overview

This is the default client that ZK will use. In the zk-eventmachine gem, there is an Evented client.

If you want to register on_* callbacks (see StateMixin) then you should pass a block, which will be called before the connection is set up (this way you can get the on_connected event). See the 'Register on_connected callback' example.

A note on event delivery. There has been some confusion, caused by incorrect documentation (which I'm very sorry about), about how many threads are delivering events. The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1.

If you use the threadpool/event callbacks to perform work, you may be interested in registering an on_exception callback that will receive all exceptions that occur on the threadpool that are not handled (i.e. that bubble up to top of a block).

Examples:

Register on_connected callback.


# the nice thing about this pattern is that in the case of a call to #reopen
# all your watches will be re-established

ZK::Client::Threaded.new('localhost:2181') do |zk|
  # do not do anything in here except register callbacks

  zk.on_connected do |event|
    zk.stat('/foo/bar', watch: true)
    zk.stat('/baz', watch: true)
  end
end

Constant Summary collapse

DEFAULT_THREADPOOL_SIZE =
1

Instance Attribute Summary

Attributes inherited from Base

#event_handler

Instance Method Summary collapse

Methods included from Logger

#logger, wrapped_logger, wrapped_logger=

Methods included from Conveniences

#election_candidate, #election_observer, #locker, #queue, #shared_locker, #with_lock

Methods included from Unixisms

#block_until_node_deleted, #find, #mkdir_p, #rm_rf

Methods included from StateMixin

#on_connected, #on_connecting, #on_expired_session, #on_state_change

Methods inherited from Base

#add_auth, #children, #delete, #event_dispatch_thread?, #exists?, #get, #get_acl, #register, #session_id, #session_passwd, #set, #set_acl, #stat, #watcher

Constructor Details

#initialize(host, opts = {}) {|self| ... } ⇒ Threaded

Note:

The :timeout argument here is not the session_timeout for the connection. rather it is the amount of time we wait for the connection to be established. The session timeout exchanged with the server is set to 10s by default in the C implemenation, and as of version 0.8.0 of slyphon-zookeeper has yet to be exposed as an option. That feature is planned.

Note:

The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1 and users are discouraged from adjusting the value due to the complexity this introduces. In 1.1 there is a better option for achieving higher concurrency (see the :thread option)

The Management apologizes for any confusion this may have caused.

Construct a new threaded client.

Pay close attention to the :threaded option, and have a look at the EventDeliveryModel page in the wiki for a discussion of the relative advantages and disadvantages of the choices available. The default is safe, but the alternative will likely provide better performance.

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

  • host (String)

    should be a string of comma-separated host:port pairs. You can also supply an optional "chroot" suffix that will act as an implicit prefix to all paths supplied.

Options Hash (opts):

  • :reconnect (true, false) — default: true

    if true, we will register the equivalent of on_session_expired { zk.reopen } so that in the case of an expired session, we will keep trying to reestablish the connection. You almost definately want to leave this at the default. The only reason not to is if you already have a handler registered that does something application specific, and you want to avoid a conflict.

  • :retry_duration (Fixnum) — default: nil

    for how long (in seconds) should we wait to re-attempt a synchronous operation after we receive a ZK::Exceptions::Retryable error. This exception (or really, group of exceptions) is raised when there has been an unintentional network connection or session loss, so retrying an operation in this situation is like saying "If we are disconnected, How long should we wait for the connection to become available before attempthing this operation?"

    The default nil means automatic retry is not attempted.

    This is a global option, and will be used for all operations on this connection, however it can be overridden for any individual operation.

  • :thread (:single, :per_callback) — default: :single

    choose your event delivery model:

    • :single: There is one thread, and only one callback is called at a time. This is the default mode (for now), and will provide the most safety for your app. All events will be delivered as received, to callbacks in the order they were registered. This safety has the tradeoff that if one of your callbacks performs some action that blocks the delivery thread, you will not recieve other events until it returns. You're also limiting the concurrency of your app. This should be fine for most simple apps, and is a good choice to start with when developing your application

    • :per_callback: This option will use a new-style Actor model (inspired by Celluloid) that uses a per-callback queue and thread to allow for greater concurrency in your app, whille still maintaining some kind of sanity. By choosing this option your callbacks will receive events in order, and will receive only one at a time, but in parallel with other callbacks. This model has the advantage you can have all of your callbacks making progress in parallel, and if one of them happens to block, it will not affect the others.

    • see the wiki for a discussion and demonstration of the effect of this setting.

  • :timeout (Fixnum)

    used as a default for calls to #reopen and #connect (including the initial default immediate connection)

  • :connect (true, false) — default: true

    Immediately connect to the server. It may be useful to pass false if you wish to do callback setup without passing a block. You must then call #connect explicitly.

Yields:

  • (self)

    calls the block with the new instance after the event handler and threadpool have been set up, but before any connections have been made. This allows the client to register watchers for session events like connected. You cannot perform any other operations with the client as you will get a NoMethodError (the underlying connection is nil).

See Also:

Since:

  • 1.1: Instead of adjusting the threadpool, users are strongly encouraged to use the :thread => :per_callback option to increase the parallelism of event delivery safely and sanely. Please see this wiki article for more information and a demonstration.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/zk/client/threaded.rb', line 153

def initialize(host, opts={}, &b)
  super(host, opts)

  tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE)
  @threadpool = Threadpool.new(tp_size)

  @connection_timeout = opts[:timeout] || DEFAULT_TIMEOUT # maybe move this into superclass?
  @event_handler   = EventHandler.new(self, opts)

  @reconnect = opts.fetch(:reconnect, true)

  setup_locks

  @client_state = RUNNING # this is to distinguish between *our* state and the underlying connection state

  # this is the last status update we've received from the underlying connection
  @last_cnx_state = nil

  @retry_duration = opts.fetch(:retry_duration, nil).to_i

  yield self if block_given?

  @fork_subs = [
    ForkHook.prepare_for_fork(method(:pause_before_fork_in_parent)),
    ForkHook.after_fork_in_parent(method(:resume_after_fork_in_parent)),
    ForkHook.after_fork_in_child(method(:reopen)),
  ]

  ObjectSpace.define_finalizer(self, self.class.finalizer(@fork_subs))

  connect(opts) if opts.fetch(:connect, true)
end

Instance Method Details

#associating?Boolean

Returns:

  • (Boolean)


367
368
369
# File 'lib/zk/client/threaded.rb', line 367

def associating?
  @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_ASSOCIATING_STATE }
end

#closeObject

ZK::Client::Base#close



394
395
396
397
398
399
# File 'lib/zk/client/threaded.rb', line 394

def close
  super
  subs, @fork_subs = @fork_subs, []
  subs.each(&:unsubscribe)
  nil
end

#close!Object

Note:

We will make our best effort to do the right thing if you call this method while in the threadpool. It is a much better idea to call us from the main thread, or at least a thread we're not going to be trying to shut down as part of closing the connection and threadpool.

close the underlying connection and clear all pending events.



324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/zk/client/threaded.rb', line 324

def close!
  @mutex.synchronize do 
    return if [:closed, :close_requested].include?(@client_state)
    logger.debug { "moving to :close_requested state" }
    @client_state = CLOSE_REQ
    @cond.broadcast
  end

  join_and_clear_reconnect_thread

  on_tpool = on_threadpool?

  # Ok, so the threadpool will wait up to N seconds while joining each thread.
  # If _we're on a threadpool thread_, have it wait until we're ready to jump
  # out of this method, and tell it to wait up to 5 seconds to let us get
  # clear, then do the rest of the shutdown of the connection 
  #
  # if the user *doesn't* hate us, then we just join the shutdown_thread immediately
  # and wait for it to exit
  #
  shutdown_thread = Thread.new do
    Thread.current[:name] = 'shutdown'
    @threadpool.shutdown(10)

    # this will call #close
    super

    @mutex.synchronize do
      logger.debug { "moving to :closed state" }
      @client_state = CLOSED
      @last_cnx_state = nil
      @cond.broadcast
    end
  end

  on_tpool ? shutdown_thread : shutdown_thread.join(30)
end

#closed?Boolean

Returns:

  • (Boolean)


411
412
413
414
# File 'lib/zk/client/threaded.rb', line 411

def closed?
  return true if @mutex.synchronize { @client_state == CLOSED }
  super
end

#connect(opts = {}) ⇒ Object

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :timeout (Fixnum)

    how long we will wait for the connection to be established. If timeout is nil, we will wait forever: use carefully.



203
204
205
# File 'lib/zk/client/threaded.rb', line 203

def connect(opts={})
  @mutex.synchronize { unlocked_connect(opts) }
end

#connected?Boolean

this overrides the implementation in StateMixin

Returns:

  • (Boolean)


363
364
365
# File 'lib/zk/client/threaded.rb', line 363

def connected?
  @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTED_STATE }
end

#connecting?Boolean

Returns:

  • (Boolean)


371
372
373
# File 'lib/zk/client/threaded.rb', line 371

def connecting?
  @mutex.synchronize { running? && @last_cnx_state == Zookeeper::ZOO_CONNECTING_STATE }
end

#create(path, *args) ⇒ Object

this is where the :on option is implemented for Base#create



417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/zk/client/threaded.rb', line 417

def create(path, *args)
  opts = args.extract_options!

  or_opt = opts.delete(:or)
  args << opts

  if or_opt
    hash = parse_create_args(path, *args)
 
    raise ArgumentError, "valid options for :or are nil or :set, not #{or_opt.inspect}" unless or_opt == :set 
    raise ArgumentError, "you cannot create an ephemeral node when using the :or option" if hash[:ephemeral]
    raise ArgumentError, "you cannot create an sequence node when using the :or option"  if hash[:sequence]
   
    mkdir_p(path, :data => hash[:data])
    path
  else
    # ok, none of our business, hand it up to mangement
    super(path, *args)
  end
end

#expired_session?Boolean

Returns:

  • (Boolean)


375
376
377
378
379
380
381
382
383
384
385
# File 'lib/zk/client/threaded.rb', line 375

def expired_session?
  @mutex.synchronize do
    return false unless @cnx and running?

    if defined?(::JRUBY_VERSION)
      !@cnx.state.alive?
    else
      @last_cnx_state == Zookeeper::ZOO_EXPIRED_SESSION_STATE
    end
  end
end

#on_exception(&blk) ⇒ Object

Note:

if your exception callback block itself raises an exception, I will make fun of you.

register a block to be called back with unhandled exceptions that occur in the threadpool.



407
408
409
# File 'lib/zk/client/threaded.rb', line 407

def on_exception(&blk)
  @threadpool.on_exception(&blk)
end

#on_threadpool?Boolean

returns true if the current thread is one of the threadpool threads

Returns:

  • (Boolean)


402
403
404
# File 'lib/zk/client/threaded.rb', line 402

def on_threadpool?
  @threadpool and @threadpool.on_threadpool?
end

#reopen(timeout = nil) ⇒ Symbol

reopen the underlying connection

The timeout param is here mainly for legacy support.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    how long should we wait for the connection to reach a connected state before returning. Note that the method will not raise and will return whether the connection reaches the 'connected' state or not. The default is actually to use the same value that was passed to the constructor for 'timeout'

Returns:

  • (Symbol)

    state of connection after operation



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/zk/client/threaded.rb', line 208

def reopen(timeout=nil)
  # Clear outstanding watch restrictions
  @event_handler.clear_outstanding_watch_restrictions!

  # If we've forked, then we can call all sorts of normally dangerous 
  # stuff because we're the only thread. 
  if forked?
    # ok, just to sanity check here
    raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION)

    logger.debug { "reopening everything, fork detected!" }

    setup_locks

    @pid           = Process.pid
    @client_state  = RUNNING                     # reset state to running if we were paused

    old_cnx, @cnx = @cnx, nil
    old_cnx.close! if old_cnx # && !old_cnx.closed?

    join_and_clear_reconnect_thread

    @mutex.synchronize do
      # it's important that we're holding the lock, as access to 'cnx' is
      # synchronized, and we want to avoid a race where event handlers
      # might see a nil connection. I've seen this exception occur *once*
      # so it's pretty rare (it was on 1.8.7 too), but just to be double
      # extra paranoid

      @event_handler.reopen_after_fork!
      @threadpool.reopen_after_fork!          # prune dead threadpool threads after a fork()

      unlocked_connect
    end
  else
    @mutex.synchronize do
      if @client_state == PAUSED
        # XXX: what to do in this case? does it matter?
      end

      logger.debug { "reopening, no fork detected" }
      @last_cnx_state = Zookeeper::ZOO_CONNECTING_STATE
      
      @client_state  = RUNNING # reset state to running if we were paused or closed

      timeout ||= @connection_timeout     # or @connection_timeout here is the docuemnted behavior on Base#reopen

      @cnx.reopen(timeout)                # ok, we werent' forked, so just reopen

      # this is a bit of a hack, because we need to wait until the event thread
      # delivers the connected event, which we used to be able to rely on just the
      # connection doing. since we don't want to call the @cnx.state method to check
      # (rather use the cached @last_cnx_state), we wait for consistency's sake
      wait_until_connected_or_dying(timeout)
    end
  end

  state
end

#stateObject



387
388
389
390
391
# File 'lib/zk/client/threaded.rb', line 387

def state
  @mutex.synchronize do
    STATE_SYM_MAP.fetch(@last_cnx_state) { |k| raise IndexError, "unrecognized state: #{k.inspect}" }
  end
end