Module: Fraggle::Client

Includes:
Logger, Protocol
Included in:
Test::TestClient
Defined in:
lib/fraggle/client.rb

Defined Under Namespace

Classes: Error

Constant Summary collapse

MinTag =
0
MaxTag =
(1<<32)

Constants included from Logger

Logger::DEBUG, Logger::ERROR, Logger::INFO

Instance Attribute Summary

Attributes included from Logger

#level, #writer

Attributes included from Protocol

#last_received

Instance Method Summary collapse

Methods included from Logger

#debug, #error, #info, #log

Methods included from Protocol

#receive_data, #send_data, #send_request

Instance Method Details

#__cancel__(what, &blk) ⇒ Object

Be careful with this. It is recommended you use #cancel on the Request returned to ensure you don’t run into a race-condition where you cancel an operation you may have thought was something else.



180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fraggle/client.rb', line 180

def __cancel__(what, &blk)
  req = Request.new
  req.verb = Request::Verb::CANCEL
  req.id = what.tag

  # Hold on to the tag as unavaiable for reuse until the cancel succeeds.
  @cbx[what.tag] = nil

  send(req) do |res|
    # Do not send any more responses from the server to this request.
    @cbx.delete(what.tag)
    blk.call(res) if blk
  end
end

#cancelable(req) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/fraggle/client.rb', line 228

def cancelable(req)
  c   = self
  can = true

  req.metadef :cancel do
    if can
      can = false
      c.__cancel__(self)
    end
  end

  req.metadef :canceled? do
    !can
  end

  req
end

#casify(cas) ⇒ Object



315
316
317
318
319
320
321
# File 'lib/fraggle/client.rb', line 315

def casify(cas)
  case cas
  when :missing then Response::Missing
  when :clobber then Response::Clobber
  else cas
  end
end

#checkin(path, cas, &blk) ⇒ Object



59
60
61
62
63
64
65
66
# File 'lib/fraggle/client.rb', line 59

def checkin(path, cas, &blk)
  req = Request.new
  req.verb  = Request::Verb::CHECKIN
  req.path  = path
  req.cas   = casify(cas)

  send(req, &blk)
end

#del(path, cas, &blk) ⇒ Object



129
130
131
132
133
134
135
136
# File 'lib/fraggle/client.rb', line 129

def del(path, cas, &blk)
  req = Request.new
  req.verb  = Request::Verb::DEL
  req.path  = path
  req.cas   = casify(cas)

  send(req, &blk)
end

#delsnap(sid, &blk) ⇒ Object



162
163
164
165
166
167
168
# File 'lib/fraggle/client.rb', line 162

def delsnap(sid, &blk)
  req = Request.new
  req.verb = Request::Verb::DELSNAP
  req.id = sid

  send(req, &blk)
end

#get(sid, path, &blk) ⇒ Object



90
91
92
93
94
95
96
97
# File 'lib/fraggle/client.rb', line 90

def get(sid, path, &blk)
  req = Request.new
  req.verb = Request::Verb::GET
  req.id   = sid if sid != 0 # wire optimization
  req.path = path

  send(req, &blk)
end

#getdir(sid, path, offset, limit, &blk) ⇒ Object



108
109
110
111
112
113
114
115
116
117
# File 'lib/fraggle/client.rb', line 108

def getdir(sid, path, offset, limit, &blk)
  req = Request.new
  req.verb   = Request::Verb::GETDIR
  req.id     = sid    if sid != 0
  req.offset = offset if offset != 0
  req.limit  = limit  if limit  != 0
  req.path   = path

  send(req, &blk)
end

#initialize(uri) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/fraggle/client.rb', line 21

def initialize(uri)
  # Simplied for now.  Later we'll take a real uri
  # and disect it to init the addrs list
  uri = URI(uri.to_s)

  @addr  = [uri.host, uri.port] * ":"
  @addrs = {}
  @shun  = {}
  @cbx   = {}

  # Logging
  @level   = ERROR
  @writer  = $stderr
end

#noop(&blk) ⇒ Object



170
171
172
173
174
175
# File 'lib/fraggle/client.rb', line 170

def noop(&blk)
  req = Request.new
  req.verb = Request::Verb::NOOP

  send(req, &blk)
end

#post_initObject



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/fraggle/client.rb', line 246

def post_init
  info "successfully connected to #{@addr}"

  @last_received = Time.now

  EM.add_periodic_timer(2) do
    if (n = Time.now - last_received) >= 3
      error("timeout talking to #{@addr}")
      close_connection
    else
      debug("ping")
      get(0, "/ping") { debug("pong") }
    end
  end

  waw = Proc.new do |e|
    if e.value == ""
      addr = @addrs.delete(e.path)
      if addr
        error "noticed #{addr} is gone; removing"
      end
    else
      get 0, "/doozer/info/#{e.value}/public-addr" do |a|
        if @shun.has_key?(a.value)
          if (n = Time.now - @shun[a.value]) > 3
            info "pardoning #{a.value} after #{n} secs"
            @shun.delete(a.value)
          else
            info "ignoring shunned addr #{a.value}"
            next
          end
        end
        # TODO: Be defensive and check the addr value is valid
        @addrs[e.path] = a.value
        info("added #{e.path} addr #{a.value}")
      end
    end
  end

  watch    "/doozer/slot/*", &waw
  walk  0, "/doozer/slot/*", &waw
end

#receive_response(res) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fraggle/client.rb', line 36

def receive_response(res)
  debug "received response: #{res.inspect}"

  if res.err_code
    if req = @cbx.delete(res.tag)
      req.emit(:error, res)
      return
    end
  end

  if (res.flags & Response::Flag::VALID) > 0
    if req = @cbx[res.tag]
      req.emit(:valid, res)
    end
  end

  if (res.flags & Response::Flag::DONE) > 0
    if req = @cbx.delete(res.tag)
      req.emit(:done)
    end
  end
end

#send(req, &blk) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/fraggle/client.rb', line 195

def send(req, &blk)
  tag = MinTag

  while @cbx.has_key?(tag)
    tag += 1
    if tag > MaxTag
      tag = MinTag
    end
  end

  req.tag = tag

  if blk
    req.valid(&blk)
  end

  # Setup a default error handler that gives useful information
  req.error do |e|
    raise Error.new("'error (%d) (%s)' for: %s" % [
      e.err_code,
      e.err_detail.inspect,
      req.inspect
    ])
  end

  @cbx[req.tag] = req

  debug "sending request:   #{req.inspect}"
  send_request(req)

  req
end

#session(prefix = nil, &blk) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fraggle/client.rb', line 68

def session(prefix=nil, &blk)
  nibbles = "0123456789abcdef"
  postfix = (0...16).map { nibbles[rand(nibbles.length)].chr }.join
  name    = prefix ? prefix+"."+postfix : postfix
  estab   = false

  f = Proc.new do |e|
    # If this is the first response from the server, it's go-time.
    if ! estab
      blk.call
    end

    # We've successfully established a session.  Say so.
    estab = true

    # Get back to the server ASAP
    checkin(name, e.cas, &f)
  end

  checkin(name, 0, &f)
end

#set(path, value, cas, &blk) ⇒ Object



119
120
121
122
123
124
125
126
127
# File 'lib/fraggle/client.rb', line 119

def set(path, value, cas, &blk)
  req = Request.new
  req.verb  = Request::Verb::SET
  req.path  = path
  req.value = value
  req.cas   = casify(cas)

  send(req, &blk)
end

#snap(&blk) ⇒ Object



155
156
157
158
159
160
# File 'lib/fraggle/client.rb', line 155

def snap(&blk)
  req = Request.new
  req.verb = Request::Verb::SNAP

  send(req, &blk)
end

#stat(sid, path, &blk) ⇒ Object



99
100
101
102
103
104
105
106
# File 'lib/fraggle/client.rb', line 99

def stat(sid, path, &blk)
  req = Request.new
  req.verb = Request::Verb::STAT
  req.id   = sid if sid != 0 # wire optimization
  req.path = path

  send(req, &blk)
end

#unbindObject

What happens when a connection is closed for any reason.



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/fraggle/client.rb', line 290

def unbind
  error "disconnected from #{@addr}"

  # Shun the address we were currently attempting/connected to.
  @shun[@addr] = Time.now
  @addrs.delete_if {|_, v| v == @addr }

  # We don't want the timer to race us while
  # we're trying to reconnect.  Once the reconnect
  # has been complete, we'll start the timer again.
  EM.cancel_timer(@timer)

  _, @addr = @addrs.shift rescue nil

  if ! @addr
    # We are all out of addresses to try
    raise "No more doozers!"
  end

  host, port = @addr.split(":")
  info "attempting reconnect to #{host}:#{port}"
  reconnect(host, port.to_i)
  post_init
end

#walk(sid, glob, &blk) ⇒ Object



138
139
140
141
142
143
144
145
# File 'lib/fraggle/client.rb', line 138

def walk(sid, glob, &blk)
  req = Request.new
  req.verb = Request::Verb::WALK
  req.id   = sid if sid != 0 # wire optimization
  req.path = glob

  cancelable(send(req, &blk))
end

#watch(glob, &blk) ⇒ Object



147
148
149
150
151
152
153
# File 'lib/fraggle/client.rb', line 147

def watch(glob, &blk)
  req = Request.new
  req.verb = Request::Verb::WATCH
  req.path = glob

  cancelable(send(req, &blk))
end