Class: Zookeeper::Continuation
- Inherits:
-
Object
- Object
- Zookeeper::Continuation
- Defined in:
- lib/zookeeper/continuation.rb
Overview
sigh, slightly different than the userland callbacks, the continuation provides sync call semantics around an async api
Defined Under Namespace
Classes: Registry
Constant Summary collapse
- OPERATION_TIMEOUT =
seconds
30
- CALLBACK_ARG_IDX =
sigh what is the index in the *args array of the ‘callback’ param
{ :get => 2, :set => 3, :exists => 2, :create => 3, :delete => 3, :get_acl => 2, :set_acl => 3, :get_children => 2, :state => 0, :add_auth => 2 }
- METH_TO_ASYNC_RESULT_KEYS =
maps the method name to the async return hash keys it should use to deliver the results
{ :get => [:rc, :data, :stat], :set => [:rc, :stat], :exists => [:rc, :stat], :create => [:rc, :string], :delete => [:rc], :get_acl => [:rc, :acl, :stat], :set_acl => [:rc], :get_children => [:rc, :strings, :stat], :add_auth => [:rc] }
Constants included from Constants
Zookeeper::Constants::CONNECTED_EVENT_VALUES, Zookeeper::Constants::EVENT_TYPE_NAMES, Zookeeper::Constants::STATE_NAMES, Zookeeper::Constants::ZAPIERROR, Zookeeper::Constants::ZAUTHFAILED, Zookeeper::Constants::ZBADARGUMENTS, Zookeeper::Constants::ZBADVERSION, Zookeeper::Constants::ZCLOSING, Zookeeper::Constants::ZCONNECTIONLOSS, Zookeeper::Constants::ZDATAINCONSISTENCY, Zookeeper::Constants::ZINVALIDACL, Zookeeper::Constants::ZINVALIDCALLBACK, Zookeeper::Constants::ZINVALIDSTATE, Zookeeper::Constants::ZKRB_ASYNC_CONTN_ID, Zookeeper::Constants::ZKRB_GLOBAL_CB_REQ, Zookeeper::Constants::ZMARSHALLINGERROR, Zookeeper::Constants::ZNOAUTH, Zookeeper::Constants::ZNOCHILDRENFOREPHEMERALS, Zookeeper::Constants::ZNODEEXISTS, Zookeeper::Constants::ZNONODE, Zookeeper::Constants::ZNOTEMPTY, Zookeeper::Constants::ZNOTHING, Zookeeper::Constants::ZOK, Zookeeper::Constants::ZOO_ASSOCIATING_STATE, Zookeeper::Constants::ZOO_AUTH_FAILED_STATE, Zookeeper::Constants::ZOO_CHANGED_EVENT, Zookeeper::Constants::ZOO_CHILD_EVENT, Zookeeper::Constants::ZOO_CLOSED_STATE, Zookeeper::Constants::ZOO_CONNECTED_STATE, Zookeeper::Constants::ZOO_CONNECTING_STATE, Zookeeper::Constants::ZOO_CREATED_EVENT, Zookeeper::Constants::ZOO_DELETED_EVENT, Zookeeper::Constants::ZOO_EPHEMERAL, Zookeeper::Constants::ZOO_EXPIRED_SESSION_STATE, Zookeeper::Constants::ZOO_LOG_LEVEL_DEBUG, Zookeeper::Constants::ZOO_LOG_LEVEL_ERROR, Zookeeper::Constants::ZOO_LOG_LEVEL_INFO, Zookeeper::Constants::ZOO_LOG_LEVEL_WARN, Zookeeper::Constants::ZOO_NOTWATCHING_EVENT, Zookeeper::Constants::ZOO_SEQUENCE, Zookeeper::Constants::ZOO_SESSION_EVENT, Zookeeper::Constants::ZOPERATIONTIMEOUT, Zookeeper::Constants::ZRUNTIMEINCONSISTENCY, Zookeeper::Constants::ZSESSIONEXPIRED, Zookeeper::Constants::ZSESSIONMOVED, Zookeeper::Constants::ZSYSTEMERROR, Zookeeper::Constants::ZUNIMPLEMENTED
Constants included from ACLs::Constants
ACLs::Constants::ZOO_ANYONE_ID_UNSAFE, ACLs::Constants::ZOO_AUTH_IDS, ACLs::Constants::ZOO_CREATOR_ALL_ACL, ACLs::Constants::ZOO_OPEN_ACL_UNSAFE, ACLs::Constants::ZOO_PERM_ADMIN, ACLs::Constants::ZOO_PERM_ALL, ACLs::Constants::ZOO_PERM_CREATE, ACLs::Constants::ZOO_PERM_DELETE, ACLs::Constants::ZOO_PERM_READ, ACLs::Constants::ZOO_PERM_WRITE, ACLs::Constants::ZOO_READ_ACL_UNSAFE
Instance Attribute Summary collapse
-
#args ⇒ Object
readonly
Returns the value of attribute args.
-
#block ⇒ Object
Returns the value of attribute block.
-
#meth ⇒ Object
Returns the value of attribute meth.
-
#rval ⇒ Object
Returns the value of attribute rval.
Instance Method Summary collapse
-
#call(hash) ⇒ Object
receive the response from the server, set @rval, notify caller.
-
#initialize(meth, *args) ⇒ Continuation
constructor
A new instance of Continuation.
- #req_id ⇒ Object
-
#shutdown! ⇒ Object
interrupt the sleeping thread with a NotConnected error.
- #state_call? ⇒ Boolean
-
#submit(czk) ⇒ Object
this method is called by the event thread to submit the request passed the CZookeeper instance, makes the async call and deals with the results.
- #user_callback? ⇒ Boolean
-
#value ⇒ Object
the caller calls this method and receives the response from the async loop this method has a hard-coded 30 second timeout as a safety feature.
Methods included from Logger
included, wrapped_logger, wrapped_logger=
Methods included from Constants
#event_by_value, #state_by_value
Constructor Details
#initialize(meth, *args) ⇒ Continuation
Returns a new instance of Continuation.
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/zookeeper/continuation.rb', line 85 def initialize(meth, *args) @meth = meth @args = args.freeze @mutex = Monitor.new @cond = @mutex.new_cond @rval = nil # make this error reporting more robust if necessary, right now, just set to state @error = nil end |
Instance Attribute Details
#args ⇒ Object (readonly)
Returns the value of attribute args.
83 84 85 |
# File 'lib/zookeeper/continuation.rb', line 83 def args @args end |
#block ⇒ Object
Returns the value of attribute block.
81 82 83 |
# File 'lib/zookeeper/continuation.rb', line 81 def block @block end |
#meth ⇒ Object
Returns the value of attribute meth.
81 82 83 |
# File 'lib/zookeeper/continuation.rb', line 81 def meth @meth end |
#rval ⇒ Object
Returns the value of attribute rval.
81 82 83 |
# File 'lib/zookeeper/continuation.rb', line 81 def rval @rval end |
Instance Method Details
#call(hash) ⇒ Object
receive the response from the server, set @rval, notify caller
141 142 143 144 145 146 |
# File 'lib/zookeeper/continuation.rb', line 141 def call(hash) logger.debug { "continuation req_id #{req_id}, got hash: #{hash.inspect}" } @rval = hash.values_at(*METH_TO_ASYNC_RESULT_KEYS.fetch(meth)) logger.debug { "delivering result #{@rval.inspect}" } deliver! end |
#req_id ⇒ Object
180 181 182 |
# File 'lib/zookeeper/continuation.rb', line 180 def req_id @args.first end |
#shutdown! ⇒ Object
interrupt the sleeping thread with a NotConnected error
189 190 191 192 193 194 195 |
# File 'lib/zookeeper/continuation.rb', line 189 def shutdown! @mutex.synchronize do return if @rval or @error @error = :shutdown @cond.broadcast end end |
#state_call? ⇒ Boolean
184 185 186 |
# File 'lib/zookeeper/continuation.rb', line 184 def state_call? @meth == :state end |
#submit(czk) ⇒ Object
this method is called by the event thread to submit the request passed the CZookeeper instance, makes the async call and deals with the results
BTW: in case you were wondering this is a completely stupid implementation, but it’s more important to get something working and passing specs, then refactor to make everything sane
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/zookeeper/continuation.rb', line 160 def submit(czk) state = czk.zkrb_state # check the state of the connection if @meth == :state # if the method is a state call @rval = [state] # we're done, no error return deliver! elsif state != ZOO_CONNECTED_STATE # otherwise, we must be connected @error = state # so set the error return deliver! # and we're out end rc, *_ = czk.__send__(:"zkrb_#{@meth}", *async_args) if user_callback? or (rc != ZOK) # async call, or we failed to submit it @rval = [rc] # create the repsonse deliver! # wake the caller and we're out end end |
#user_callback? ⇒ Boolean
148 149 150 |
# File 'lib/zookeeper/continuation.rb', line 148 def user_callback? !!@args.at(callback_arg_idx) end |
#value ⇒ Object
the caller calls this method and receives the response from the async loop this method has a hard-coded 30 second timeout as a safety feature. No call should take more than 20s (as the session timeout is set to 20s) so if any call takes longer than that, something has gone horribly wrong.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/zookeeper/continuation.rb', line 103 def value time_to_stop = Time.now + OPERATION_TIMEOUT now = nil @mutex.synchronize do while true now = Time.now break if @rval or @error or (now > time_to_stop) deadline = time_to_stop.to_f - now.to_f @cond.wait(deadline) end if (now > time_to_stop) and !@rval and !@error raise Exceptions::ContinuationTimeoutError, "response for meth: #{meth.inspect}, args: #{@args.inspect}, not received within #{OPERATION_TIMEOUT} seconds" end case @error when nil # ok, nothing to see here, carry on when :shutdown raise Exceptions::NotConnected, "the connection is shutting down" when ZOO_EXPIRED_SESSION_STATE raise Exceptions::SessionExpired, "connection has expired" else raise Exceptions::NotConnected, "connection state is #{STATE_NAMES[@error]}" end case @rval.length when 1 return @rval.first else return @rval end end end |