Class: ZooKeeper::Client
- Inherits:
-
Object
- Object
- ZooKeeper::Client
- Includes:
- Operations
- Defined in:
- lib/zkruby/client.rb,
lib/zkruby/util.rb
Overview
Client API
All calls operate asynchronously or synchronously based on whether a block is supplied
Without a block, requests are executed synchronously and either return results directly or raise a Error
With a block, the request returns immediately with a AsyncOp. When the server responds the block is passed the results. Errors will be sent to an error callback if registered on the AsyncOp
Requests that take a watch argument can be passed either…
* An object that quacks like a {Watcher}
* A Proc will be invoked with arguments state, path, event
* The literal value "true" refers to the default watcher registered with the session
Registered watches will be fired exactly once for a given path with either the expected event or with state :expired and event :none when the session is finalised
Constant Summary
Constants included from Operations
Instance Method Summary collapse
-
#children(path, watch = nil, &callback) ⇒ Object
Retrieve the list of children at the given path.
-
#close(&blk) ⇒ Object
Close the session.
-
#create(path, data, acl, *modeopts, &callback) ⇒ Object
Create a node.
-
#delete(path, version, &callback) ⇒ Object
Delete path.
-
#exists(path, watch = nil, &blk) ⇒ Object
(also: #exists?, #stat)
Retrieve the Data::Stat of a path, or nil if the path does not exist.
-
#get(path, watch = nil, &blk) ⇒ Object
Retrieve data.
-
#get_acl(path, &blk) ⇒ Object
Get ACl.
-
#initialize(binding) ⇒ Client
constructor
private
See connect.
-
#mkpath(path, acl = ZK::ACL_OPEN_UNSAFE, &callback) ⇒ Object
Recursive make path.
-
#multi(ops, &callback) ⇒ Object
private
See #transaction.
-
#rmpath(path, version = -1,, &callback) ⇒ Object
Recursive delete.
-
#set(path, data, version, &callback) ⇒ Object
Set Data.
-
#set_acl(path, acl, version, &blk) ⇒ Object
Set ACL.
-
#sync(path, &blk) ⇒ Object
Synchronise path between session and leader.
-
#timeout ⇒ Object
Session timeout, initially as supplied, but once connected is the negotiated timeout with the server.
-
#transaction {|txn| ... } ⇒ Object
Perform multiple operations in a transaction.
-
#watcher ⇒ Object
The currently registered default watcher.
-
#watcher=(watcher) ⇒ Object
Assign the watcher to the session.
Constructor Details
#initialize(binding) ⇒ Client
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
263 264 265 |
# File 'lib/zkruby/client.rb', line 263 def initialize(binding) @binding = binding end |
Instance Method Details
#children(path, watch = nil) ⇒ Data::Stat, Array<String> #children(path, watch = nil) {|stat, children| ... } ⇒ AsyncOp
Retrieve the list of children at the given path
295 296 297 298 299 300 301 302 303 |
# File 'lib/zkruby/client.rb', line 295 def children(path,watch=nil,&callback) return synchronous_call(:children,path,watch) unless block_given? path = chroot(path) req = Proto::GetChildren2Request.new(:path => path, :watch => watch) queue_request(req,:get_children2,12,Proto::GetChildren2Response,:children,watch) do | response | callback.call(response.stat, response.children.to_a) end end |
#close ⇒ Object #close { ... } ⇒ AsyncOp
Close the session
452 453 454 455 |
# File 'lib/zkruby/client.rb', line 452 def close(&blk) return synchronous_call(:close) unless block_given? @binding.close(&blk) end |
#create(path, data, acl, *modeopts) ⇒ String #create(path, data, acl, *modeopts) {|path| ... } ⇒ AsyncOp
Create a node
317 318 319 |
# File 'lib/zkruby/client.rb', line 317 def create(path,data,acl,*modeopts,&callback) op_create(path,data,acl,*modeopts,&callback) end |
#delete(path, version) ⇒ Object #delete(path, version) { ... } ⇒ AsyncOp
Delete path
372 373 374 |
# File 'lib/zkruby/client.rb', line 372 def delete(path,version,&callback) op_delete(path,version,&callback) end |
#exists(path, watch = nil) ⇒ Data::Stat #exists(path, watch = nil) {|stat| ... } ⇒ AsyncOp Also known as: exists?, stat
Retrieve the Data::Stat of a path, or nil if the path does not exist
351 352 353 354 355 356 357 358 359 |
# File 'lib/zkruby/client.rb', line 351 def exists(path,watch=nil,&blk) return synchronous_call(:exists,path,watch)[0] unless block_given? path = chroot(path) req = Proto::ExistsRequest.new(:path => path, :watch => watch) queue_request(req,:exists,3,Proto::ExistsResponse,:exists,watch,ExistsPacket) do | response | blk.call( response.nil? ? nil : response.stat ) end end |
#get(path, watch = nil) ⇒ Data::Stat, String #get(path, watch = nil) {|stat, data| ... } ⇒ AsyncOp
Retrieve data
331 332 333 334 335 336 337 338 339 340 |
# File 'lib/zkruby/client.rb', line 331 def get(path,watch=nil,&blk) return synchronous_call(:get,path,watch) unless block_given? path = chroot(path) req = Proto::GetDataRequest.new(:path => path, :watch => watch) queue_request(req,:get,4,Proto::GetDataResponse,:data,watch) do | response | blk.call( response.stat, response.data.to_s) end end |
#get_acl(path) ⇒ Array<Data::ACL> #get_acl(path) {|list| ... } ⇒ AsyncOp
Get ACl
398 399 400 401 402 403 404 405 406 |
# File 'lib/zkruby/client.rb', line 398 def get_acl(path,&blk) return synchronous_call(:get_acl,path)[0] unless block_given? path = chroot(path) req = Proto::GetACLRequest.new(:path => path) queue_request(req,:get_acl,6,Proto::GetACLResponse) do | response | blk.call( response.acl ) end end |
#mkpath(path, acl = ZK::ACL_OPEN_UNSAFE, &callback) ⇒ Object
Recursive make path
This will send parallel creates for ALL nodes up to the root and then ignore any NODE_EXISTS errors.
You generally want to call this after receiving a NO_NODE error from a simple #create
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/zkruby/util.rb', line 17 def mkpath(path,acl=ZK::ACL_OPEN_UNSAFE,&callback) return synchronous_call(:mkpath,path,acl) unless block_given? connection_lost = false path_comp = path.split("/") # we only care about the result of the final create op last_op = nil (1-path_comp.length..-1).each do |i| sub_path = path_comp[0..i].join("/") op = create(sub_path,"",acl) { if i == -1 then callback.call() else :true end } op.errback do |err| if i == -1 if ZK::Error::NODE_EXISTS === err callback.call() elsif ZK::Error::CONNECTION_LOST === err || ( ZK::Error::NO_NODE && connection_lost ) # try again mkpath(path,acl) callback.call() else raise err end elsif ZK::Error::CONNECTION_LOST === err connection_lost = true :connection_lost else # we don't care about any other errors, but we will log them logger.warn { "Error creating #{sub_path}, #{err}" } end end last_op = op if (i == -1) end return WrappedOp.new(last_op) end |
#multi(ops, &callback) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
See #transaction
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/zkruby/client.rb', line 459 def multi(ops,&callback) return synchronous_call(:multi,ops) unless block_given? req = Proto::MultiRequest.new() ops.each do |op| req.requests << { :header => { :_type => op.opcode, :done => false, :err=> 0 }, :request => op.request } end req.requests << { :header => { :_type => -1 , :done => true, :err => -1 } } logger.debug("Multi #{req}") queue_request(req,:multi,14,Proto::MultiResponse) do |response| exception = nil response.responses.each_with_index() do |multi_response,index| next if multi_response.done? op = ops[index] if multi_response.header._type == -1 errcode = multi_response.header.err.to_i if (errcode != 0) exception = Error.lookup(errcode).exception("Transaction error for op ##{index} - #{op.op} (#{op.path})") #TODO just raises the first exception raise exception end else callback_args = if multi_response.has_response? then [ multi_response.response ] else [] end ops[index].callback.call(*callback_args) end end end end |
#rmpath(path, version) ⇒ Object #rmpath(path, version) { ... } ⇒ AsyncOp
Recursive delete
Although this method itself can be called synchronously all the zk activity is asynchronous, ie all subnodes are removed in parallel
Will retry on connection loss, or if some other activity is adding nodes in parallel. If you get a session expiry in the middle of this you will end up with a partially completed recursive delete. In all other circumstances it will eventually complete.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/zkruby/util.rb', line 78 def rmpath(path,version = -1, &callback) return synchronous_call(:rmpath,path,version) unless block_given? del_op = delete(path,version) { callback.call() } del_op.errback do |err| # we don't leave this method unless we get an exception # or have completed and called the callback case err when ZK::Error::NO_NODE when ZK::Error::CONNECTION_LOST rmpath(path,version) when ZK::Error::NOT_EMPTY stat, child_list = children(path) unless child_list.empty? child_ops = {} child_list.each do |child| child_path = "#{path}/#{child}" rm_op = rmpath(child_path,-1) { :success } rm_op.errback { |err| child_results[child_path] = err } child_ops[child_path] = rm_op end # Wait until all our children are done (or error'd) child_ops.each { |child_path,op| op.value } rmpath(path,version) end else raise err end callback.call() end return WrappedOp.new(del_op) end |
#set(path, data, version) ⇒ Data::Stat #set(path, data, version) {|stat| ... } ⇒ AsyncOp
Set Data
386 387 388 |
# File 'lib/zkruby/client.rb', line 386 def set(path,data,version,&callback) op_set(path,data,version,&callback) end |
#set_acl(path, acl, version) ⇒ Object #set_acl(path, acl, version) {|new| ... } ⇒ AsyncOp
Set ACL
418 419 420 421 422 423 424 425 426 427 |
# File 'lib/zkruby/client.rb', line 418 def set_acl(path,acl,version,&blk) return synchronous_call(:set_acl,acl,version)[0] unless block_given? path = chroot(path) req = Proto::SetACLRequest.new(:path => path, :acl => acl, :version => version) queue_request(req,:set_acl,7,Proto::SetACLResponse) do | response | blk.call( response.stat ) end end |
#sync(path) ⇒ String #sync(path) {|path| ... } ⇒ AsyncOp
Synchronise path between session and leader
437 438 439 440 441 442 443 444 |
# File 'lib/zkruby/client.rb', line 437 def sync(path,&blk) return synchronous_call(:sync,path)[0] unless block_given? path = chroot(path) req = Proto::SyncRequest.new(:path => path) queue_request(req,:sync,9,Proto::SyncResponse) do | response | blk.call(unchroot(response.path)) end end |
#timeout ⇒ Object
Session timeout, initially as supplied, but once connected is the negotiated timeout with the server.
269 270 271 |
# File 'lib/zkruby/client.rb', line 269 def timeout @binding.session.timeout end |
#transaction ⇒ Transaction #transaction {|txn| ... } ⇒ Object
Perform multiple operations in a transaction
497 498 499 500 501 502 503 |
# File 'lib/zkruby/client.rb', line 497 def transaction(&block) txn = Transaction.new(self,session) return txn unless block_given? yield txn txn.commit end |
#watcher ⇒ Object
The currently registered default watcher
274 275 276 |
# File 'lib/zkruby/client.rb', line 274 def watcher @binding.session.watcher end |
#watcher=(watcher) ⇒ Object
Assign the watcher to the session. This watcher will receive session connect/disconnect/expired events as well as any path based watches registered to the API calls using the literal value “true”
281 282 283 |
# File 'lib/zkruby/client.rb', line 281 def watcher=(watcher) @binding.session.watcher=watcher end |