Class: ZK::Client::Base

Inherits:
Object
  • Object
show all
Includes:
Conveniences, StateMixin, Unixisms
Defined in:
lib/z_k/client/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Conveniences

#defer, #election_candidate, #election_observer, #locker, #queue, #shared_locker, #with_lock

Methods included from Unixisms

#find, #mkdir_p, #rm_rf

Methods included from StateMixin

#associating?, #connected?, #connecting?, #expired_session?, #on_connecting, #on_expired_session, #state, #wrap_state_closed_error

Constructor Details

#initialize(host, opts = {}) {|_self| ... } ⇒ Base

Create a new client and connect to the zookeeper server.

host should be a string of comma-separated host:port pairs. You can also supply an optional “chroot” suffix that will act as an implicit prefix to all paths supplied.

example:

ZK::Client.new("zk01:2181,zk02:2181/chroot/path")

Yields:

  • (_self)

Yield Parameters:



24
25
26
27
28
29
# File 'lib/z_k/client/base.rb', line 24

def initialize(host, opts={})
  @event_handler = EventHandler.new(self)
  yield self if block_given?
  @cnx = ::Zookeeper.new(host, DEFAULT_TIMEOUT, @event_handler.get_default_watcher_block)
  @threadpool = Threadpool.new
end

Instance Attribute Details

#event_handlerObject (readonly)

Returns the value of attribute event_handler.



8
9
10
# File 'lib/z_k/client/base.rb', line 8

def event_handler
  @event_handler
end

Instance Method Details

#check_rc(hash, inputs = nil) ⇒ Object (protected)



677
678
679
680
681
682
683
684
# File 'lib/z_k/client/base.rb', line 677

def check_rc(hash, inputs=nil)
  hash.tap do |h|
    if code = h[:rc]
      msg = inputs ? "inputs: #{inputs.inspect}" : nil
      raise Exceptions::KeeperException.by_code(code), msg unless code == Zookeeper::ZOK
    end
  end
end

#children(path, opts = {}) ⇒ Object

Return the list of the children of the node of the given path.

If the watch is true and the call is successful (no exception is thrown), registered watchers of the children of the node will be enabled. The watch will be triggered by a successful operation that deletes the node of the given path or creates/delete a child under the node. See watcher for documentation on how to register blocks to be called when a watch event is fired.

Examples:

get children for path


zk.create("/path", :data => "foo")
zk.create("/path/child_0", :data => "child0")
zk.create("/path/child_1", :data => "child1")
zk.children("/path")
# => ["child_0", "child_1"]

get children and set watch


# same setup as above

zk.children("/path", :watch => true)
# => ["child_0", "child_1"]

Parameters:

  • path (String)

    absolute path of the znode

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :watch (bool) — default: false

    set to true if you want your registered callbacks for this node to be called on change

  • :callback (ZookeeperCallbacks::StringsCallback)

    to make this call asynchronously

  • :context (Object)

    an object passed to the :callback given as the context param

Raises:



497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
# File 'lib/z_k/client/base.rb', line 497

def children(path, opts={})
  # ===== get children asynchronously
  #
  #   class ChildrenCallback
  #     def process_result(return_code, path, context, children)
  #       # do processing here
  #     end
  #   end
  #  
  #   callback = ChildrenCallback.new
  #   context = Object.new
  #   zk.children("/path", :callback => callback, :context => context)


  h = { :path => path }.merge(opts)

  setup_watcher!(:child, h)

  rv = check_rc(@cnx.get_children(h), h)
  opts[:callback] ? nil : rv[:children]
end

#close!Object

closes the underlying connection and deregisters all callbacks



73
74
75
76
77
78
# File 'lib/z_k/client/base.rb', line 73

def close!
  @event_handler.clear!
  wrap_state_closed_error { @cnx.close }
  @threadpool.shutdown
  nil
end

#closed?Boolean

returns true if the connection has been closed – XXX: should this be our idea of closed or ZOO_CLOSED_STATE ?

Returns:

  • (Boolean)


39
40
41
# File 'lib/z_k/client/base.rb', line 39

def closed?
  defined?(::JRUBY_VERSION) ? jruby_closed? : mri_closed?
end

#create(path, data = '', opts = {}) ⇒ String

TODO:

clean up the verbiage around watchers

TODO:

Document the asynchronous methods

Create a node with the given path. The node data will be the given data. The path is returned.

If the ephemeral option is given, the znode creaed will be removed by the server automatically when the session associated with the creation of the node expires. Note that ephemeral nodes cannot have children.

The sequence option, if true, will cause the server to create a sequential node. The actual path name of a sequential node will be the given path plus a suffix “_i” where i is the current sequential number of the node. Once such a node is created, the sequential number for the path will be incremented by one (i.e. the generated path will be unique across all clients).

Note that since a different actual path is used for each invocation of creating sequential node with the same path argument, the call will never throw a NodeExists exception.

This operation, if successful, will trigger all the watches left on the node of the given path by exists and get API calls, and the watches left on the parent node by children API calls.

If a node is created successfully, the ZooKeeper server will trigger the watches on the path left by exists calls, and the watches on the parent of the node by children calls.

Examples:

create node, no data, persistent


zk.create("/path")
# => "/path"

create node, ACL will default to ACL::OPEN_ACL_UNSAFE


zk.create("/path", "foo")
# => "/path"

create ephemeral node


zk.create("/path", '', :mode => :ephemeral)
# => "/path"

create sequential node


zk.create("/path", '', :sequential => true)
# => "/path0"

# or you can also do:

zk.create("/path", '', :mode => :persistent_sequence)
# => "/path0"

create ephemeral and sequential node


zk.create("/path", '', :sequential => true, :ephemeral => true)
# => "/path0"

# or you can also do:

zk.create("/path", "foo", :mode => :ephemeral_sequence)
# => "/path0"

create a child path


zk.create("/path/child", "bar")
# => "/path/child"

create a sequential child path


zk.create("/path/child", "bar", :sequential => true, :ephemeral => true)
# => "/path/child0"

# or you can also do:

zk.create("/path/child", "bar", :mode => :ephemeral_sequence)
# => "/path/child0"

Parameters:

  • path (String)

    absolute path of the znode

  • data (String) (defaults to: '')

    the data to create the znode with

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :acl (Integer)

    defaults to ZookeeperACLs::ZOO_OPEN_ACL_UNSAFE, otherwise the ACL for the node. Should be a ZOO_* constant defined under the ZookeeperACLs module in the zookeeper gem.

  • :ephemeral (bool) — default: false

    if true, the created node will be ephemeral

  • :sequence (bool) — default: false

    if true, the created node will be sequential

  • :callback (ZookeeperCallbacks::StringCallback) — default: nil

    provide a callback object that will be called when the znode has been created

  • :context (Object) — default: nil

    an object passed to the :callback given as the context param

  • :mode (:ephemeral_sequential, :persistent_sequential, :persistent, :ephemeral) — default: nil

    may be specified instead of :ephemeral and :sequence options. If :mode and either of the :ephermeral or :sequential options are given, the :mode option will win

Returns:

  • (String)

    the path created on the server

Raises:



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/z_k/client/base.rb', line 214

def create(path, data='', opts={})
  h = { :path => path, :data => data, :ephemeral => false, :sequence => false }.merge(opts)

  if mode = h.delete(:mode)
    mode = mode.to_sym

    case mode
    when :ephemeral_sequential
      h[:ephemeral] = h[:sequence] = true
    when :persistent_sequential
      h[:ephemeral] = false
      h[:sequence] = true
    when :persistent
      h[:ephemeral] = false
    when :ephemeral
      h[:ephemeral] = true
    else
      raise ArgumentError, "Unknown mode: #{mode.inspect}"
    end
  end

  rv = check_rc(@cnx.create(h), h)

  h[:callback] ? rv : rv[:path]
end

#delete(path, opts = {}) ⇒ Object

Delete the node with the given path. The call will succeed if such a node exists, and the given version matches the node’s version (if the given version is -1, it matches any node’s versions), and the node has no children.

This operation, if successful, will trigger all the watches on the node of the given path left by exists API calls, and the watches on the parent node left by children API calls.

Can be called with just the path, otherwise a hash with the arguments set. Supports being executed asynchronousy by passing a callback object.

A KeeperException with error code KeeperException::NotEmpty will be thrown if the node has children.

Examples:

delete a node

zk.delete("/path")

delete a node with a specific version

zk.delete("/path", :version => 5)

Parameters:

  • path (String)

    absolute path of the znode

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :version (Integer) — default: -1

    matches all versions of a node if the default is used, otherwise acts as an assertion that the znode has the supplied version.

  • :callback (ZookeeperCallbacks::VoidCallback)

    will be called asynchronously when the operation is complete

  • :context (Object)

    an object passed to the :callback given as the context param

Raises:



558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# File 'lib/z_k/client/base.rb', line 558

def delete(path, opts={})
  # ===== delete node asynchronously
  #
  #   class VoidCallback
  #     def process_result(return_code, path, context)
  #       # do processing here
  #     end
  #   end
  #  
  #   callback = VoidCallback.new
  #   context = Object.new
  #
  #   zk.delete(/path", :callback => callback, :context => context)


  h = { :path => path, :version => -1 }.merge(opts)
  rv = check_rc(@cnx.delete(h), h)
  nil
end

#exists?(path, opts = {}) ⇒ Boolean

sugar around stat

only works for the synchronous version of stat. for async version, this method will act exactly like stat

Examples:


# instead of:

zk.stat('/path').exists?
# => true

# you can do:

zk.exists?('/path')
# => true

Returns:

  • (Boolean)


455
456
457
458
# File 'lib/z_k/client/base.rb', line 455

def exists?(path, opts={})
  rv = stat(path, opts)
  opts[:callback] ? rv : rv.exists?
end

#get(path, opts = {}) ⇒ Array

TODO:

fix references to Watcher documentation

Return the data and stat of the node of the given path.

If :watch is true and the call is successful (no exception is raised), registered watchers on the node will be ‘armed’. The watch will be triggered by a successful operation that sets data on the node, or deletes the node. See watcher for documentation on how to register blocks to be called when a watch event is fired.

Supports being executed asynchronousy by passing a callback object.

Examples:

get data for path


zk.get("/path")
# => ['this is the data', #<ZookeeperStat::Stat>]

get data and set watch on node


zk.get("/path", :watch => true)
# => ['this is the data', #<ZookeeperStat::Stat>]

Parameters:

  • path (String)

    absolute path of the znode

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :watch (bool) — default: false

    set to true if you want your registered callbacks for this node to be called on change

  • :callback (ZookeeperCallbacks::DataCallback)

    to make this call asynchronously

  • :context (Object)

    an object passed to the :callback given as the context param

Returns:

  • (Array)

    a two-element array of [‘node data’, #<ZookeeperStat::Stat>]

Raises:



292
293
294
295
296
297
298
299
300
# File 'lib/z_k/client/base.rb', line 292

def get(path, opts={})
  h = { :path => path }.merge(opts)

  setup_watcher!(:data, h)

  rv = check_rc(@cnx.get(h), h)

  opts[:callback] ? rv : rv.values_at(:data, :stat)
end

#get_acl(path, opts = {}) ⇒ Object

TODO:

this method is pretty much untested, YMMV

Return the ACL and stat of the node of the given path.

Examples:

get acl


zk.get_acl("/path")
# => [ACL]

get acl with stat


stat = ZK::Stat.new
zk.get_acl("/path", :stat => stat)
# => [ACL]

Parameters:

  • path (String)

    absolute path of the znode

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • (nil) (ZookeeperStat::Stat)

    provide a Stat object that will be set with the Stat information of the node path

  • (nil) (ZookeeperCallback::AclCallback)

    :callback for an asynchronous call to occur

  • :context (Object) — default: nil

    an object passed to the :callback given as the context param

Raises:



606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
# File 'lib/z_k/client/base.rb', line 606

def get_acl(path, opts={})
  # ===== get acl asynchronously
  #
  #   class AclCallback
  #     def processResult(return_code, path, context, acl, stat)
  #       # do processing here
  #     end
  #   end
  #  
  #   callback = AclCallback.new
  #   context = Object.new
  #   zk.acls("/path", :callback => callback, :context => context)

  h = { :path => path }.merge(opts)
  rv = check_rc(@cnx.get_acl(h), h)
  opts[:callback] ? nil : rv.values_at(:children, :stat)
end

#inspectObject



43
44
45
# File 'lib/z_k/client/base.rb', line 43

def inspect
  "#<#{self.class.name}:#{object_id} ...>"
end

#reopen(timeout = 10) ⇒ Object

reopen the underlying connection returns state of connection after operation



66
67
68
69
70
# File 'lib/z_k/client/base.rb', line 66

def reopen(timeout=10)
  @cnx.reopen(timeout, @event_handler.get_default_watcher_block)
  @threadpool.start!  # restart the threadpool if previously stopped by close!
  state
end

#set(path, data, opts = {}) ⇒ Object

Set the data for the node of the given path if such a node exists and the given version matches the version of the node (if the given version is -1, it matches any node’s versions). Passing the version allows you to perform optimistic locking, in that if someone changes the node’s data “behind your back”, your update will fail. Since #create does not return a ZookeeperStat::Stat object, you should be aware that nodes are created with version == 0.

This operation, if successful, will trigger all the watches on the node of the given path left by get calls.

Called with a hash of arguments set. Supports being executed asynchronousy by passing a callback object.

Examples:

unconditionally set the data of “/path”


zk.set("/path", "foo")

set the data of “/path” only if the version is 0


zk.set("/path", "foo", :version => 0)

Parameters:

  • path (String)

    absolute path of the znode

  • data (String)

    the data to be set on the znode. Note that setting the data to the exact same value currently on the node still increments the node’s version and causes watches to be fired.

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :version (Integer) — default: -1

    matches all versions of a node if the default is used, otherwise acts as an assertion that the znode has the supplied version.

  • :callback (ZookeeperCallbacks::StatCallback)

    will recieve the ZookeeperStat::Stat object asynchronously

  • :context (Object)

    an object passed to the :callback given as the context param

Raises:



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/z_k/client/base.rb', line 345

def set(path, data, opts={})
  # ===== set data asynchronously
  #
  #   class StatCallback
  #     def process_result(return_code, path, context, stat)
  #       # do processing here
  #     end
  #   end
  #  
  #   callback = StatCallback.new
  #   context = Object.new
  #
  #   zk.set("/path", "foo", :callback => callback, :context => context)

  h = { :path => path, :data => data }.merge(opts)

  rv = check_rc(@cnx.set(h), h)

  opts[:callback] ? nil : rv[:stat]
end

#set_acl(path, acls, opts = {}) ⇒ Object

Set the ACL for the node of the given path if such a node exists and the given version matches the version of the node. Return the stat of the node.

@todo: TBA - waiting on clarification of method use

Parameters:

  • path (String)

    absolute path of the znode

  • acls (ZookeeperACLs)

    the acls to set on the znode

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :version (Integer) — default: -1

    matches all versions of a node if the default is used, otherwise acts as an assertion that the znode has the supplied version.

  • :callback (ZookeeperCallbacks::VoidCallback)

    will be called asynchronously when the operation is complete

  • :context (Object)

    an object passed to the :callback given as the context param

Raises:



649
650
651
652
653
# File 'lib/z_k/client/base.rb', line 649

def set_acl(path, acls, opts={})
  h = { :path => path, :acl => acls }.merge(opts)
  rv = check_rc(@cnx.set_acl(h), h)
  opts[:callback] ? nil : rv[:stat]
end

#setup_watcher!(watch_type, opts) ⇒ Object (protected)



686
687
688
# File 'lib/z_k/client/base.rb', line 686

def setup_watcher!(watch_type, opts)
  event_handler.setup_watcher!(watch_type, opts)
end

#stat(path, opts = {}) ⇒ ZookeeperStat::Stat

Return the stat of the node of the given path. Return nil if the node doesn’t exist.

If the watch is true and the call is successful (no exception is thrown), a watch will be left on the node with the given path. The watch will be triggered by a successful operation that creates/delete the node or sets the data on the node.

Can be called with just the path, otherwise a hash with the arguments set. Supports being executed asynchronousy by passing a callback object.

Examples:

get stat for for path

>> zk.stat("/path")
# => ZK::Stat

get stat for path and enable watchers

>> zk.stat("/path", :watch => true)
# => ZK::Stat

exists for non existent path


>> stat = zk.stat("/non_existent_path")
# => #<ZookeeperStat::Stat:0x000001eb54 @exists=false>
>> stat.exists?
# => false

Parameters:

  • path (String)

    absolute path of the znode

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :watch (bool) — default: false

    set to true if you want to enable registered watches on this node

  • :callback (ZookeeperCallbacks::StatCallback)

    will recieve the ZookeeperStat::Stat object asynchronously

  • :context (Object)

    an object passed to the :callback given as the context param

Returns:

  • (ZookeeperStat::Stat)

    a stat object of the specified node



406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# File 'lib/z_k/client/base.rb', line 406

def stat(path, opts={})
  # ===== exist node asynchronously
  #
  #   class StatCallback
  #     def process_result(return_code, path, context, stat)
  #       # do processing here
  #     end
  #   end
  #  
  #   callback = StatCallback.new
  #   context = Object.new
  #
  #   zk.exists?("/path", :callback => callback, :context => context)


  h = { :path => path }.merge(opts)

  setup_watcher!(:data, h)

  rv = @cnx.stat(h)

  return rv if opts[:callback] 

  case rv[:rc] 
  when Zookeeper::ZOK, Zookeeper::ZNONODE
    rv[:stat]
  else
    check_rc(rv, h) # throws the appropriate error
  end
end

#watcherObject

@deprecated: for backwards compatibility only



32
33
34
# File 'lib/z_k/client/base.rb', line 32

def watcher
  event_handler
end