Class: ZK::Client::Threaded

Inherits:
Base
  • Object
show all
Includes:
Conveniences, StateMixin, Unixisms, Logging
Defined in:
lib/zk/client/threaded.rb

Overview

This is the default client that ZK will use. In the zk-eventmachine gem, there is an Evented client.

If you want to register on_* callbacks (see StateMixin) then you should pass a block, which will be called before the connection is set up (this way you can get the on_connected event). See the 'Register on_connected callback' example.

A note on event delivery. There has been some confusion, caused by incorrect documentation (which I'm very sorry about), about how many threads are delivering events. The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1.

If you use the threadpool/event callbacks to perform work, you may be interested in registering an on_exception callback that will receive all exceptions that occur on the threadpool that are not handled (i.e. that bubble up to top of a block).

Examples:

Register on_connected callback.


# the nice thing about this pattern is that in the case of a call to #reopen
# all your watches will be re-established

ZK::Client::Threaded.new('localhost:2181') do |zk|
  # do not do anything in here except register callbacks

  zk.on_connected do |event|
    zk.stat('/foo/bar', watch: true)
    zk.stat('/baz', watch: true)
  end
end

Constant Summary collapse

DEFAULT_THREADPOOL_SIZE =
1

Instance Attribute Summary

Attributes inherited from Base

#event_handler

Instance Method Summary collapse

Methods included from Logging

#logger, set_default

Methods included from Conveniences

#election_candidate, #election_observer, #locker, #queue, #shared_locker, #with_lock

Methods included from Unixisms

#block_until_node_deleted, #find, #mkdir_p, #rm_rf

Methods included from StateMixin

#associating?, #connected?, #connecting?, #expired_session?, #on_connected, #on_connecting, #on_expired_session, #on_state_change, #state

Methods inherited from Base

#children, #delete, #event_dispatch_thread?, #exists?, #get, #get_acl, #register, #session_id, #session_passwd, #set, #set_acl, #stat, #watcher

Constructor Details

#initialize(host, opts = {}) {|self| ... } ⇒ Threaded

Note:

The :timeout argument here is not the session_timeout for the connection. rather it is the amount of time we wait for the connection to be established. The session timeout exchanged with the server is set to 10s by default in the C implemenation, and as of version 0.8.0 of slyphon-zookeeper has yet to be exposed as an option. That feature is planned.

Note:

The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1 and users are discouraged from adjusting the value due to the complexity this introduces. In 1.1 there is a better option for achieving higher concurrency (see the :thread option)

The Management apologizes for any confusion this may have caused.

Construct a new threaded client.

Pay close attention to the :threaded option, and have a look at the EventDeliveryModel page in the wiki for a discussion of the relative advantages and disadvantages of the choices available. The default is safe, but the alternative will likely provide better performance.

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

  • host (String)

    should be a string of comma-separated host:port pairs. You can also supply an optional "chroot" suffix that will act as an implicit prefix to all paths supplied.

Options Hash (opts):

  • :reconnect (true, false) — default: true

    if true, we will register the equivalent of on_session_expired { zk.reopen } so that in the case of an expired session, we will keep trying to reestablish the connection. You almost definately want to leave this at the default. The only reason not to is if you already have a handler registered that does something application specific, and you want to avoid a conflict.

  • :retry_duration (Fixnum) — default: nil

    for how long (in seconds) should we wait to re-attempt a synchronous operation after we receive a ZK::Exceptions::Retryable error. This exception (or really, group of exceptions) is raised when there has been an unintentional network connection or session loss, so retrying an operation in this situation is like saying "If we are disconnected, How long should we wait for the connection to become available before attempthing this operation?"

    The default nil means automatic retry is not attempted.

    This is a global option, and will be used for all operations on this connection, however it can be overridden for any individual operation.

  • :thread (:single, :per_callback) — default: :single

    choose your event delivery model:

    • :single: There is one thread, and only one callback is called at a time. This is the default mode (for now), and will provide the most safety for your app. All events will be delivered as received, to callbacks in the order they were registered. This safety has the tradeoff that if one of your callbacks performs some action that blocks the delivery thread, you will not recieve other events until it returns. You're also limiting the concurrency of your app. This should be fine for most simple apps, and is a good choice to start with when developing your application

    • :per_callback: This option will use a new-style Actor model (inspired by Celluloid) that uses a per-callback queue and thread to allow for greater concurrency in your app, whille still maintaining some kind of sanity. By choosing this option your callbacks will receive events in order, and will receive only one at a time, but in parallel with other callbacks. This model has the advantage you can have all of your callbacks making progress in parallel, and if one of them happens to block, it will not affect the others.

    • see the wiki for a discussion and demonstration of the effect of this setting.

  • :timeout (Fixnum)

    used as a default for calls to #reopen and #connect (including the initial default immediate connection)

  • :connect (true, false) — default: true

    Immediately connect to the server. It may be useful to pass false if you wish to do callback setup without passing a block. You must then call #connect explicitly.

Yields:

  • (self)

    calls the block with the new instance after the event handler and threadpool have been set up, but before any connections have been made. This allows the client to register watchers for session events like connected. You cannot perform any other operations with the client as you will get a NoMethodError (the underlying connection is nil).

See Also:

Since:

  • 1.1: Instead of adjusting the threadpool, users are strongly encouraged to use the :thread => :per_callback option to increase the parallelism of event delivery safely and sanely. Please see this wiki article for more information and a demonstration.



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/zk/client/threaded.rb', line 153

def initialize(host, opts={}, &b)
  super(host, opts)

  tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE)
  @threadpool = Threadpool.new(tp_size)

  @connection_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass?
  @event_handler   = EventHandler.new(self, opts)

  @reconnect = opts.fetch(:reconnect, true)

  setup_locks

  @client_state = RUNNING # this is to distinguish between *our* state and the underlying connection state

  # this is the last status update we've received from the underlying connection
  @last_cnx_state = nil

  @retry_duration = opts.fetch(:retry_duration, nil).to_i

  yield self if block_given?

  @fork_subs = [
    ForkHook.prepare_for_fork(method(:pause_before_fork_in_parent)),
    ForkHook.after_fork_in_parent(method(:resume_after_fork_in_parent)),
    ForkHook.after_fork_in_child(method(:reopen)),
  ]

  ObjectSpace.define_finalizer(self, self.class.finalizer(@fork_subs))

  connect if opts.fetch(:connect, true)
end

Instance Method Details

#closeObject

ZK::Client::Base#close



351
352
353
354
355
356
# File 'lib/zk/client/threaded.rb', line 351

def close
  super
  subs, @fork_subs = @fork_subs, []
  subs.each(&:unsubscribe)
  nil
end

#close!Object

Note:

We will make our best effort to do the right thing if you call this method while in the threadpool. It is a much better idea to call us from the main thread, or at least a thread we're not going to be trying to shut down as part of closing the connection and threadpool.

close the underlying connection and clear all pending events.



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/zk/client/threaded.rb', line 312

def close!
  @mutex.synchronize do 
    return if [:closed, :close_requested].include?(@client_state)
    logger.debug { "moving to :close_requested state" }
    @client_state = CLOSE_REQ
    @cond.broadcast
  end

  join_and_clear_reconnect_thread

  on_tpool = on_threadpool?

  # Ok, so the threadpool will wait up to N seconds while joining each thread.
  # If _we're on a threadpool thread_, have it wait until we're ready to jump
  # out of this method, and tell it to wait up to 5 seconds to let us get
  # clear, then do the rest of the shutdown of the connection 
  #
  # if the user *doesn't* hate us, then we just join the shutdown_thread immediately
  # and wait for it to exit
  #
  shutdown_thread = Thread.new do
    Thread.current[:name] = 'shutdown'
    @threadpool.shutdown(10)

    # this will call #close
    super

    @mutex.synchronize do
      logger.debug { "moving to :closed state" }
      @client_state = CLOSED
      @last_cnx_state = nil
      @cond.broadcast
    end
  end

  on_tpool ? shutdown_thread : shutdown_thread.join(30)
end

#closed?Boolean

Returns:

  • (Boolean)


368
369
370
371
# File 'lib/zk/client/threaded.rb', line 368

def closed?
  return true if @mutex.synchronize { @client_state == CLOSED }
  super
end

#connect(opts = {}) ⇒ Object

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :timeout (Fixnum)

    how long we will wait for the connection to be established. If timeout is nil, we will wait forever: use carefully.



203
204
205
# File 'lib/zk/client/threaded.rb', line 203

def connect(opts={})
  @mutex.synchronize { unlocked_connect(opts) }
end

#create(path, *args) ⇒ Object

this is where the :on option is implemented for Base#create



374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/zk/client/threaded.rb', line 374

def create(path, *args)
  opts = args.extract_options!

  or_opt = opts.delete(:or)
  args << opts

  if or_opt
    hash = parse_create_args(path, *args)
 
    raise ArgumentError, "valid options for :or are nil or :set, not #{or_opt.inspect}" unless or_opt == :set 
    raise ArgumentError, "you cannot create an ephemeral node when using the :or option" if hash[:ephemeral]
    raise ArgumentError, "you cannot create an sequence node when using the :or option"  if hash[:sequence]
   
    mkdir_p(path, :data => hash[:data])
    path
  else
    # ok, none of our business, hand it up to mangement
    super(path, *args)
  end
end

#on_exception(&blk) ⇒ Object

Note:

if your exception callback block itself raises an exception, I will make fun of you.

register a block to be called back with unhandled exceptions that occur in the threadpool.



364
365
366
# File 'lib/zk/client/threaded.rb', line 364

def on_exception(&blk)
  @threadpool.on_exception(&blk)
end

#on_threadpool?Boolean

returns true if the current thread is one of the threadpool threads

Returns:

  • (Boolean)


359
360
361
# File 'lib/zk/client/threaded.rb', line 359

def on_threadpool?
  @threadpool and @threadpool.on_threadpool?
end

#reopen(timeout = nil) ⇒ Symbol

reopen the underlying connection

The timeout param is here mainly for legacy support.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    how long should we wait for the connection to reach a connected state before returning. Note that the method will not raise and will return whether the connection reaches the 'connected' state or not. The default is actually to use the same value that was passed to the constructor for 'timeout'

Returns:

  • (Symbol)

    state of connection after operation



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/zk/client/threaded.rb', line 208

def reopen(timeout=nil)
  # If we've forked, then we can call all sorts of normally dangerous 
  # stuff because we're the only thread. 
  if forked?
    # ok, just to sanity check here
    raise "[BUG] we hit the fork-reopening code in JRuby!!" if defined?(::JRUBY_VERSION)

    logger.debug { "reopening everything, fork detected!" }

    setup_locks

    @pid        = Process.pid
    @client_state  = RUNNING                     # reset state to running if we were paused

    old_cnx, @cnx = @cnx, nil
    old_cnx.close! if old_cnx # && !old_cnx.closed?

    join_and_clear_reconnect_thread

    @last_cnx_state = nil

    @mutex.synchronize do
      # it's important that we're holding the lock, as access to 'cnx' is
      # synchronized, and we want to avoid a race where event handlers
      # might see a nil connection. I've seen this exception occur *once*
      # so it's pretty rare (it was on 1.8.7 too), but just to be double
      # extra paranoid

      @event_handler.reopen_after_fork!
      @threadpool.reopen_after_fork!          # prune dead threadpool threads after a fork()

      unlocked_connect
    end
  else
    @mutex.synchronize do
      if @client_state == PAUSED
        # XXX: what to do in this case? does it matter?
      end

      logger.debug { "reopening, no fork detected" }
      @last_cnx_state = nil
      @cnx.reopen(timeout)                # ok, we werent' forked, so just reopen
    end
  end

  state
end