Class: ZK::Client::Threaded
- Inherits:
-
Base
- Object
- Base
- ZK::Client::Threaded
- Includes:
- Conveniences, StateMixin, Unixisms, Logging
- Defined in:
- lib/zk/client/threaded.rb
Overview
This is the default client that ZK will use. In the zk-eventmachine gem, there is an Evented client.
If you want to register on_* callbacks (see StateMixin)
then you should pass a block, which will be called before the
connection is set up (this way you can get the on_connected event). See
the 'Register on_connected callback' example.
A note on event delivery. There has been some confusion, caused by incorrect documentation (which I'm very sorry about), about how many threads are delivering events. The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1.
If you use the threadpool/event callbacks to perform work, you may be
interested in registering an on_exception callback that will receive
all exceptions that occur on the threadpool that are not handled (i.e.
that bubble up to top of a block).
Constant Summary
- DEFAULT_THREADPOOL_SIZE =
1
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary (collapse)
-
- (Object) close!
close the underlying connection and clear all pending events.
-
- (Threaded) initialize(host, opts = {}) {|self| ... }
constructor
Construct a new threaded client.
-
- (Object) on_exception(&blk)
register a block to be called back with unhandled exceptions that occur in the threadpool.
-
- (Boolean) on_threadpool?
returns true if the current thread is one of the threadpool threads.
-
- (Symbol) reopen(timeout = nil)
reopen the underlying connection.
Methods included from Conveniences
#election_candidate, #election_observer, #locker, #queue, #shared_locker, #with_lock
Methods included from Unixisms
#block_until_node_deleted, #find, #mkdir_p, #rm_rf
Methods included from StateMixin
#associating?, #connected?, #connecting?, #expired_session?, #on_connected, #on_connecting, #on_expired_session, #state, #wrap_state_closed_error
Methods inherited from Base
#children, #closed?, #create, #delete, #event_dispatch_thread?, #exists?, #get, #get_acl, #register, #safe_session_id, #session_id, #session_passwd, #set, #set_acl, #stat, #watcher
Constructor Details
- (Threaded) initialize(host, opts = {}) {|self| ... }
The :timeout argument here is not the session_timeout for the
connection. rather it is the amount of time we wait for the connection
to be established. The session timeout exchanged with the server is
set to 10s by default in the C implemenation, and as of version 0.8.0
of slyphon-zookeeper has yet to be exposed as an option. That feature
is planned.
The documentation for 0.9.0 was incorrect in stating the number
of threads used to deliver events. There was one, unconfigurable,
event dispatch thread. In 1.0 the number of event delivery threads is
configurable, but still defaults to 1 and users are discouraged from
adjusting the value due to the complexity this introduces. In 1.1
there is a better option for achieving higher concurrency (see the
:thread option)
The Management apologizes for any confusion this may have caused.
Construct a new threaded client.
Pay close attention to the :threaded option, and have a look at the
EventDeliveryModel
page in the wiki for a discussion of the relative advantages and
disadvantages of the choices available. The default is safe, but the
alternative will likely provide better performance.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/zk/client/threaded.rb', line 127 def initialize(host, opts={}, &b) super(host, opts) tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE) @threadpool = Threadpool.new(tp_size) @session_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass? @event_handler = EventHandler.new(self, opts) @reconnect = opts.fetch(:reconnect, true) @mutex = Mutex.new @close_requested = false yield self if block_given? @cnx = create_connection(host, @session_timeout, @event_handler.get_default_watcher_block) end |
Instance Method Details
- (Object) close!
We will make our best effort to do the right thing if you call this method while in the threadpool. It is a much better idea to call us from the main thread, or at least a thread we're not going to be trying to shut down as part of closing the connection and threadpool.
close the underlying connection and clear all pending events.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/zk/client/threaded.rb', line 161 def close! @mutex.synchronize do return if @close_requested @close_requested = true end on_tpool = on_threadpool? # Ok, so the threadpool will wait up to N seconds while joining each thread. # If _we're on a threadpool thread_, have it wait until we're ready to jump # out of this method, and tell it to wait up to 5 seconds to let us get # clear, then do the rest of the shutdown of the connection # # if the user *doesn't* hate us, then we just join the shutdown_thread immediately # and wait for it to exit # shutdown_thread = Thread.new do @threadpool.shutdown(2) super end shutdown_thread.join unless on_tpool nil end |
- (Object) on_exception(&blk)
if your exception callback block itself raises an exception, I will make fun of you.
register a block to be called back with unhandled exceptions that occur in the threadpool.
193 194 195 |
# File 'lib/zk/client/threaded.rb', line 193 def on_exception(&blk) @threadpool.on_exception(&blk) end |
- (Boolean) on_threadpool?
returns true if the current thread is one of the threadpool threads
188 189 190 |
# File 'lib/zk/client/threaded.rb', line 188 def on_threadpool? @threadpool and @threadpool.on_threadpool? end |
- (Symbol) reopen(timeout = nil)
reopen the underlying connection
The timeout param is here mainly for legacy support.
148 149 150 151 |
# File 'lib/zk/client/threaded.rb', line 148 def reopen(timeout=nil) @mutex.synchronize { @close_requested = false } super end |