Class: ZK::ZKEventMachine::Client

Inherits:
Client::Base
  • Object
show all
Includes:
Deferred::Accessors, Logging, Unixisms
Defined in:
lib/z_k/z_k_event_machine/client.rb

Overview

Examples:

use of on_connecting


def handle_connecting_event(event=nil)

  # this (re-)registers this hook for the next time this event is called
  @zkem.on_connecting(&:handle_connecting_event)

  return unless event     # nil is the initial registration case

  logger.warn { "Oh no! got a connecting event, taking evasive action!" }

  # do stuff
end

# your setup method would then look like
def run
  @zkem.connect do
    handle_connecting_event

    do_the_rest_of_your_stuff
  end
end

Constant Summary collapse

DEFAULT_TIMEOUT =
10

Instance Method Summary collapse

Methods included from Unixisms

#mkdir_p, #rm_rf

Constructor Details

#initialize(host, opts = {}) ⇒ Client

Takes same options as ZK::Client::Base



93
94
95
96
97
98
# File 'lib/z_k/z_k_event_machine/client.rb', line 93

def initialize(host, opts={})
  @host = host
  @event_handler  = EventHandlerEM.new(self)
  @closing        = false
  register_default_event_handlers!
end

Instance Method Details

#children(path, opts = {}, &block) ⇒ Object



197
198
199
200
201
202
203
# File 'lib/z_k/z_k_event_machine/client.rb', line 197

def children(path, opts={}, &block)
  Callback.new_children_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#close!(&blk) ⇒ Object Also known as: close



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/z_k/z_k_event_machine/client.rb', line 121

def close!(&blk)
  on_close(&blk)
  return on_close if @closing
  @closing = true

  if @cnx
    logger.debug { "#{self.class.name}: in close! clearing event_handler" }
    event_handler.clear!

    logger.debug { "#{self.class.name}: calling @cnx.close" }
    @cnx.close do
      logger.debug { "firing on_close handler" }
      on_close.succeed
      @cnx = nil
    end
  else
    on_close.succeed
  end

  on_close
end

#connect(&blk) ⇒ Object

open a ZK connection, attach it to the reactor. returns an EM::Deferrable that will be called when the connection is ready for use



108
109
110
111
112
113
114
# File 'lib/z_k/z_k_event_machine/client.rb', line 108

def connect(&blk)
  # XXX: maybe move this into initialize, need to figure out how to schedule it properly
  @cnx ||= (
    ZookeeperEM::Client.new(@host, DEFAULT_TIMEOUT, event_handler.get_default_watcher_block)
  )
  @cnx.on_attached(&blk)
end

#create(path, data = '', opts = {}, &block) ⇒ Object



157
158
159
160
161
162
163
# File 'lib/z_k/z_k_event_machine/client.rb', line 157

def create(path, data='', opts={}, &block)
  Callback.new_create_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :data => data, :opts => opts }
    super(path, data, opts.merge(:callback => cb))
  end
end

#delete(path, opts = {}, &block) ⇒ Object



189
190
191
192
193
194
195
# File 'lib/z_k/z_k_event_machine/client.rb', line 189

def delete(path, opts={}, &block)
  Callback.new_delete_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#exists?(path, opts = {}, &block) ⇒ Boolean

Returns:

  • (Boolean)


185
186
187
# File 'lib/z_k/z_k_event_machine/client.rb', line 185

def exists?(path, opts={}, &block)
  stat(path, opts.merge(:cb_style => 'exists'), &block)
end

#get(path, opts = {}, &block) ⇒ Callback

get data at path, optionally enabling a watch on the node

Returns:

  • (Callback)

    returns a Callback which is an EM::Deferred (so you can assign callbacks/errbacks) see Callback::Base for discussion



149
150
151
152
153
154
155
# File 'lib/z_k/z_k_event_machine/client.rb', line 149

def get(path, opts={}, &block)
  Callback.new_get_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#get_acl(path, opts = {}, &block) ⇒ Object



205
206
207
208
209
210
211
# File 'lib/z_k/z_k_event_machine/client.rb', line 205

def get_acl(path, opts={}, &block)
  Callback.new_get_acl_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#on_closeDeferred::Default

called back once the connection has been closed.

Returns:

  • (Deferred::Default)


90
# File 'lib/z_k/z_k_event_machine/client.rb', line 90

deferred_event :close

#on_connectedDeferred::Default

Note:

this is experimental currently. This may or may not fire for the initial connection.

Registers a one-shot callback for the ZOO_CONNECTED_STATE event.

it’s purpose is to warn an already-existing client with watches that a connection has been re-established (with session information saved). From the ZooKeeper Programmers’ Guide:

If you are using watches, you must look for the connected watch event.
When a ZooKeeper client disconnects from a server, you will not receive
notification of changes until reconnected. If you are watching for a
znode to come into existance, you will miss the event if the znode is
created and deleted while you are disconnected.

once this deferred has been fired, it will be replaced with a new deferred, so callbacks must be re-registered, and should be re-registered within the callback to avoid missing events

Returns:

  • (Deferred::Default)


68
# File 'lib/z_k/z_k_event_machine/client.rb', line 68

deferred_event :connected

#on_connectingDeferred::Default Also known as: on_disconnection

Note:

this would more accurately be called ‘on_disconnection`, but because it’s fired also when the client is starting up, it has the name it does. there’s an alias for this ‘on_disconnection`. (It’s not ‘on_disconnected` because the ’-ion’ seems to indicate that the condition may be temporary)

Registers a one-shot callback for the ZOO_CONNECTING_STATE event

This event is triggered when we have become disconnected from the cluster and are in the process of reconnecting.

Returns:

  • (Deferred::Default)


82
# File 'lib/z_k/z_k_event_machine/client.rb', line 82

deferred_event :connecting

#on_connection_lostDeferred::Default

Note:

if you want to be notified when the connection state has not become invalid but you are in a possibly recoverable state, then you should hook the #on_connecting method

If we get a ZK::Exceptions::ConnectionLoss exeption back from any call, or a EXPIRED_SESSION_STATE event, we will call back any handlers registered here with the exception instance as the argument.

once this deferred has been fired, it will be replaced with a new deferred, so callbacks must be re-registered, and should be re-registered within the callback to avoid missing events

Returns:

  • (Deferred::Default)


48
# File 'lib/z_k/z_k_event_machine/client.rb', line 48

deferred_event :connection_lost

#session_idFixnum

Returns The underlying connection’s session_id.

Returns:

  • (Fixnum)

    The underlying connection’s session_id



222
223
224
225
# File 'lib/z_k/z_k_event_machine/client.rb', line 222

def session_id
  return nil unless @cnx
  @cnx.session_id
end

#session_passwdString

Returns The underlying connection’s session passwd (an opaque value).

Returns:

  • (String)

    The underlying connection’s session passwd (an opaque value)



228
229
230
231
# File 'lib/z_k/z_k_event_machine/client.rb', line 228

def session_passwd
  return nil unless @cnx
  @cnx.session_passwd
end

#set(path, data, opts = {}, &block) ⇒ Object



165
166
167
168
169
170
171
# File 'lib/z_k/z_k_event_machine/client.rb', line 165

def set(path, data, opts={}, &block)
  Callback.new_set_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :data => data, :opts => opts }
    super(path, data, opts.merge(:callback => cb))
  end
end

#set_acl(path, acls, opts = {}, &block) ⇒ Object



213
214
215
216
217
218
219
# File 'lib/z_k/z_k_event_machine/client.rb', line 213

def set_acl(path, acls, opts={}, &block)
  Callback.new_set_acl_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :acls => acls, :opts => opts }
    super(path, acls, opts.merge(:callback => cb))
  end
end

#stat(path, opts = {}, &block) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
# File 'lib/z_k/z_k_event_machine/client.rb', line 173

def stat(path, opts={}, &block)
  cb_style = opts.delete(:cb_style) { |_| 'stat' }

  meth = :"new_#{cb_style}_cb"

  Callback.__send__(meth, block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :meth => meth, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end