Class: MogileFS::Backend

Inherits:
Object
  • Object
show all
Defined in:
lib/mogilefs/backend.rb

Overview

This class communicates with the MogileFS trackers. You should not have to use this directly unless you are developing support for new commands or plugins for MogileFS

Constant Summary collapse

BACKEND_ERRORS =

:nodoc:

{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Backend

Creates a new MogileFS::Backend.

:hosts is a required argument and must be an Array containing one or more ‘hostname:port’ pairs as Strings.

:timeout adjusts the request timeout before an error is returned.

Raises:

  • (ArgumentError)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/mogilefs/backend.rb', line 71

def initialize(args)
  @hosts = args[:hosts]
  @fail_timeout = args[:fail_timeout] || 5
  raise ArgumentError, "must specify at least one host" unless @hosts
  raise ArgumentError, "must specify at least one host" if @hosts.empty?
  unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
    raise ArgumentError, ":hosts must be in 'host:port' form"
  end

  @mutex = Mutex.new
  @timeout = args[:timeout] || 3
  @connect_timeout = args[:connect_timeout] || @timeout
  @socket = nil
  @lasterr = nil
  @lasterrstr = nil
  @pending = []

  @dead = {}
end

Instance Attribute Details

#lasterrObject (readonly)

The last error



56
57
58
# File 'lib/mogilefs/backend.rb', line 56

def lasterr
  @lasterr
end

#lasterrstrObject (readonly)

The string attached to the last error



61
62
63
# File 'lib/mogilefs/backend.rb', line 61

def lasterrstr
  @lasterrstr
end

Class Method Details

.add_command(*names) ⇒ Object

Adds MogileFS commands names.



10
11
12
13
14
15
16
# File 'lib/mogilefs/backend.rb', line 10

def self.add_command(*names)
  names.each do |name|
    define_method name do |*args|
      do_request(name, args[0] || {}, false)
    end
  end
end

.add_error(err_snake) ⇒ Object

this converts an error code from a mogilefsd tracker to an exception:

Examples of some exceptions that get created:

class AfterMismatchError < MogileFS::Error; end
class DomainNotFoundError < MogileFS::Error; end
class InvalidCharsError < MogileFS::Error; end


36
37
38
39
40
41
42
43
# File 'lib/mogilefs/backend.rb', line 36

def self.add_error(err_snake)
  err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
  err_camel << 'Error' unless /Error\z/ =~ err_camel
  unless const_defined?(err_camel)
    const_set(err_camel, Class.new(MogileFS::Error))
  end
  BACKEND_ERRORS[err_snake] = const_get(err_camel)
end

.add_idempotent_command(*names) ⇒ Object

adds idempotent MogileFS commands names, these commands may be retried transparently on a different tracker if there is a network/server error.



20
21
22
23
24
25
26
# File 'lib/mogilefs/backend.rb', line 20

def self.add_idempotent_command(*names)
  names.each do |name|
    define_method name do |*args|
      do_request(name, args[0] || {}, true)
    end
  end
end

.const_missing(name) ⇒ Object

:nodoc:



45
46
47
48
49
50
51
# File 'lib/mogilefs/backend.rb', line 45

def self.const_missing(name) # :nodoc:
  if /Error\z/ =~ name.to_s
    const_set(name, Class.new(MogileFS::Error))
  else
    super name
  end
end

Instance Method Details

#clear_cache(types = %w(all))) ⇒ Object

this command is special since the cache is per-tracker, so we connect to all backends and not just one



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/mogilefs/backend.rb', line 312

def clear_cache(types = %w(all))
  opts = {}
  types.each { |type| opts[type] = 1 }

  sockets = @hosts.map do |host|
    MogileFS::Socket.start(*(host.split(':'.freeze))) rescue nil
  end
  sockets.compact!

  wpending = sockets
  rpending = []
  request = make_request("clear_cache", opts)
  while wpending[0] || rpending[0]
    r = IO.select(rpending, wpending, nil, @timeout) or return
    rpending -= r[0]
    wpending -= r[1]
    r[0].each { |io| io.timed_gets(0) rescue nil }
    r[1].each do |io|
      begin
        io.timed_write(request, 0)
        rpending << io
      rescue
      end
    end
  end
  nil
ensure
  sockets.each { |io| io.close }
end

#dispatch_unlocked(request, timeout = @timeout) ⇒ Object

:nodoc:



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/mogilefs/backend.rb', line 141

def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
  tries = nil
  begin
    io = socket
    io.timed_write(request, timeout)
    io
  rescue SystemCallError, MogileFS::RequestTruncatedError => err
    tries ||= Hash.new { |hash,host| hash[host] = 0 }
    nr = tries[@active_host] += 1
    if nr >= 2
      @dead[@active_host] = [ MogileFS.now, err ]
    end
    shutdown_unlocked
    retry
  end
end

#do_request(cmd, args, idempotent = false) ⇒ Object

Performs the cmd request with args.



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/mogilefs/backend.rb', line 237

def do_request(cmd, args, idempotent = false)
  no_raise = args.delete(:ruby_no_raise)
  request = make_request(cmd, args)
  line = nil
  failed = false
  @mutex.synchronize do
    begin
      io = dispatch_unlocked(request)
      line = io.timed_gets(@timeout)
      break if /\n\z/ =~ line

      line and raise MogileFS::InvalidResponseError,
                     "Invalid response from server: #{line.inspect}"

      idempotent or
        raise EOFError, "end of file reached after: #{request.inspect}"
      # fall through to retry in loop
    rescue SystemCallError,
           MogileFS::InvalidResponseError # truncated response
      # we got a successful timed_write, but not a timed_gets
      if idempotent
        failed = true
        shutdown_unlocked(false)
        retry
      end
      shutdown_unlocked(true)
    rescue MogileFS::UnreadableSocketError, MogileFS::Timeout
      shutdown_unlocked(true)
    rescue
      # we DO NOT want the response we timed out waiting for, to crop up later
      # on, on the same socket, intersperesed with a subsequent request!  we
      # close the socket if there's any error.
      shutdown_unlocked(true)
    end while idempotent
    shutdown_unlocked if failed
  end # @mutex.synchronize
  parse_response(line, no_raise ? request : nil)
end

#error(err_snake) ⇒ Object

this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes



285
286
287
# File 'lib/mogilefs/backend.rb', line 285

def error(err_snake)
  BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
end

#make_request(cmd, args) ⇒ Object

Makes a new request string for cmd and args.



277
278
279
# File 'lib/mogilefs/backend.rb', line 277

def make_request(cmd, args)
  "#{cmd} #{url_encode args}\r\n"
end

#parse_response(line, request = nil) ⇒ Object

Turns the line response from the server into a Hash of options, an error, or raises, as appropriate.



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/mogilefs/backend.rb', line 291

def parse_response(line, request = nil)
  case line
  when /\AOK\s+\d*\s*(\S*)\r?\n\z/
    url_decode($1)
  when /\AERR\s+(\w+)\s*([^\r\n]*)/
    @lasterr = $1
    @lasterrstr = $2 ? url_unescape($2) : nil
    if request
      request = " request=#{request.strip}"
      @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
      return error(@lasterr).new(@lasterrstr)
    end
    raise error(@lasterr).new(@lasterrstr)
  else
    raise MogileFS::InvalidResponseError,
          "Invalid response from server: #{line.inspect}"
  end
end

#pipeline_dispatch(cmd, args, &block) ⇒ Object

dispatch a request like do_request, but queue block for execution upon receiving a response. It is the users’ responsibility to ensure &block is executed in the correct order. Trackers with multiple queryworkers are not guaranteed to return responses in the same order they were requested.



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/mogilefs/backend.rb', line 198

def pipeline_dispatch(cmd, args, &block) # :nodoc:
  request = make_request(cmd, args)
  timeout = @timeout

  @mutex.synchronize do
    io = socket
    timeout = pipeline_drain_unlocked(io, timeout)

    # send the request out...
    begin
      io.timed_write(request, timeout)
      @pending << [ request, block ]
    rescue SystemCallError, MogileFS::RequestTruncatedError => err
      @dead[@active_host] = [ MogileFS.now, err ]
      shutdown_unlocked(@pending[0])
      io = socket
      retry
    end

    @pending.size
  end
end

#pipeline_drain_unlocked(io, timeout) ⇒ Object

try to read any responses we have pending already before filling the pipeline more requests. This usually takes very little time, but trackers may return huge responses and we could be on a slow network.



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/mogilefs/backend.rb', line 175

def pipeline_drain_unlocked(io, timeout) # :nodoc:
  set = [ io ]
  while @pending.size > 0
    t0 = MogileFS.now
    r = IO.select(set, set, nil, timeout)
    timeout = timeout_update(timeout, t0)

    if r && r[0][0]
      t0 = MogileFS.now
      pipeline_gets_unlocked(io, timeout)
      timeout = timeout_update(timeout, t0)
    else
      return timeout
    end
  end
  timeout
end

#pipeline_gets_unlocked(io, timeout) ⇒ Object

:nodoc:



158
159
160
161
162
163
164
# File 'lib/mogilefs/backend.rb', line 158

def pipeline_gets_unlocked(io, timeout) # :nodoc:
  line = io.timed_gets(timeout) or
    raise MogileFS::PipelineError,
          "EOF with #{@pending.size} requests in-flight"
  ready = @pending.shift
  ready[1].call(parse_response(line, ready[0]))
end

#pipeline_wait(count = nil) ⇒ Object

:nodoc:



221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/mogilefs/backend.rb', line 221

def pipeline_wait(count = nil) # :nodoc:
  @mutex.synchronize do
    io = socket
    count ||= @pending.size
    @pending.size < count and
      raise MogileFS::Error,
            "pending=#{@pending.size} < expected=#{count} failed"
    begin
      count.times { pipeline_gets_unlocked(io, @timeout) }
    rescue
      shutdown_unlocked(true)
    end
  end
end

#shutdownObject

Closes this backend’s socket.



94
95
96
# File 'lib/mogilefs/backend.rb', line 94

def shutdown
  @mutex.synchronize { shutdown_unlocked }
end

#shutdown_unlocked(do_raise = false) ⇒ Object

:nodoc:



132
133
134
135
136
137
138
139
# File 'lib/mogilefs/backend.rb', line 132

def shutdown_unlocked(do_raise = false) # :nodoc:
  @pending = []
  if @socket
    @socket.close rescue nil # ignore errors
    @socket = nil
  end
  raise if do_raise
end

#socketObject

Returns a socket connected to a MogileFS tracker.



343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/mogilefs/backend.rb', line 343

def socket
  return @socket if @socket and not @socket.closed?

  @hosts.shuffle.each do |host|
    next if dead = @dead[host] and dead[0] > (MogileFS.now - @fail_timeout)

    begin
      addr, port = host.split(':'.freeze)
      @socket = MogileFS::Socket.tcp(addr, port, @connect_timeout)
      @active_host = host
    rescue SystemCallError, MogileFS::Timeout => err
      @dead[host] = [ MogileFS.now, err ]
      next
    end

    return @socket
  end

  errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
  raise MogileFS::UnreachableBackendError,
        "couldn't connect to any tracker: #{errors.join(', ')}"
end

#timeout_update(timeout, t0) ⇒ Object

:nodoc:



166
167
168
169
# File 'lib/mogilefs/backend.rb', line 166

def timeout_update(timeout, t0) # :nodoc:
  timeout -= (MogileFS.now - t0)
  timeout < 0 ? 0 : timeout
end

#url_decode(str) ⇒ Object

Turns a url params string into a Hash.



367
368
369
370
371
372
373
374
# File 'lib/mogilefs/backend.rb', line 367

def url_decode(str) # :nodoc:
  rv = {}
  str.split('&'.freeze).each do |pair|
    k, v = pair.split('='.freeze, 2).map! { |x| url_unescape(x) }
    rv[k.freeze] = v
  end
  rv
end

#url_encode(params) ⇒ Object

Turns a Hash (or Array of pairs) into a url params string.



383
384
385
386
387
# File 'lib/mogilefs/backend.rb', line 383

def url_encode(params) # :nodoc:
  params.map do |k,v|
    "#{url_escape k.to_s}=#{url_escape v.to_s}"
  end.join('&'.freeze)
end

#url_escape(str) ⇒ Object

Ruby 1.8



391
392
393
394
395
# File 'lib/mogilefs/backend.rb', line 391

def url_escape(str) # :nodoc:
  str = str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x".freeze % $1.ord }
  str.tr!(' '.freeze, '+'.freeze)
  str
end

#url_unescape(str) ⇒ Object

Unescapes naughty URL characters.



403
404
405
406
407
# File 'lib/mogilefs/backend.rb', line 403

def url_unescape(str) # :nodoc:
  str = str.tr('+'.freeze, ' '.freeze)
  str.gsub!(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack('C'.freeze) }
  str
end