Class: MogileFS::Backend

Inherits:
Object
  • Object
show all
Defined in:
lib/mogilefs/backend.rb

Overview

This class communicates with the MogileFS trackers. You should not have to use this directly unless you are developing support for new commands or plugins for MogileFS

Constant Summary collapse

BACKEND_ERRORS =

:nodoc:

{}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args) ⇒ Backend

Creates a new MogileFS::Backend.

:hosts is a required argument and must be an Array containing one or more ‘hostname:port’ pairs as Strings.

:timeout adjusts the request timeout before an error is returned.

Raises:

  • (ArgumentError)


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/mogilefs/backend.rb', line 71

def initialize(args)
  @hosts = args[:hosts]
  @fail_timeout = args[:fail_timeout] || 5
  raise ArgumentError, "must specify at least one host" unless @hosts
  raise ArgumentError, "must specify at least one host" if @hosts.empty?
  unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then
    raise ArgumentError, ":hosts must be in 'host:port' form"
  end

  @mutex = Mutex.new
  @timeout = args[:timeout] || 3
  @socket = nil
  @lasterr = nil
  @lasterrstr = nil
  @pending = []

  @dead = {}
end

Instance Attribute Details

#lasterrObject (readonly)

The last error



56
57
58
# File 'lib/mogilefs/backend.rb', line 56

def lasterr
  @lasterr
end

#lasterrstrObject (readonly)

The string attached to the last error



61
62
63
# File 'lib/mogilefs/backend.rb', line 61

def lasterrstr
  @lasterrstr
end

Class Method Details

.add_command(*names) ⇒ Object

Adds MogileFS commands names.



10
11
12
13
14
15
16
# File 'lib/mogilefs/backend.rb', line 10

def self.add_command(*names)
  names.each do |name|
    define_method name do |*args|
      do_request(name, args[0] || {}, false)
    end
  end
end

.add_error(err_snake) ⇒ Object

this converts an error code from a mogilefsd tracker to an exception:

Examples of some exceptions that get created:

class AfterMismatchError < MogileFS::Error; end
class DomainNotFoundError < MogileFS::Error; end
class InvalidCharsError < MogileFS::Error; end


36
37
38
39
40
41
42
43
# File 'lib/mogilefs/backend.rb', line 36

def self.add_error(err_snake)
  err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase }
  err_camel << 'Error' unless /Error\z/ =~ err_camel
  unless const_defined?(err_camel)
    const_set(err_camel, Class.new(MogileFS::Error))
  end
  BACKEND_ERRORS[err_snake] = const_get(err_camel)
end

.add_idempotent_command(*names) ⇒ Object

adds idempotent MogileFS commands names, these commands may be retried transparently on a different tracker if there is a network/server error.



20
21
22
23
24
25
26
# File 'lib/mogilefs/backend.rb', line 20

def self.add_idempotent_command(*names)
  names.each do |name|
    define_method name do |*args|
      do_request(name, args[0] || {}, true)
    end
  end
end

.const_missing(name) ⇒ Object

:nodoc:



45
46
47
48
49
50
51
# File 'lib/mogilefs/backend.rb', line 45

def self.const_missing(name) # :nodoc:
  if /Error\z/ =~ name.to_s
    const_set(name, Class.new(MogileFS::Error))
  else
    super name
  end
end

Instance Method Details

#clear_cache(types = %w(all)) ⇒ Object

this command is special since the cache is per-tracker, so we connect to all backends and not just one



306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/mogilefs/backend.rb', line 306

def clear_cache(types = %w(all))
  opts = {}
  types.each { |type| opts[type] = 1 }

  sockets = @hosts.map do |host|
    MogileFS::Socket.start(*(host.split(/:/))) rescue nil
  end
  sockets.compact!

  wpending = sockets
  rpending = []
  request = make_request("clear_cache", opts)
  while wpending[0] || rpending[0]
    r = IO.select(rpending, wpending, nil, @timeout) or return
    rpending -= r[0]
    wpending -= r[1]
    r[0].each { |io| io.timed_gets(0) rescue nil }
    r[1].each do |io|
      begin
        io.timed_write(request, 0)
        rpending << io
      rescue
      end
    end
  end
  nil
  ensure
    sockets.each { |io| io.close }
end

#dispatch_unlocked(request, timeout = @timeout) ⇒ Object

:nodoc:



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/mogilefs/backend.rb', line 140

def dispatch_unlocked(request, timeout = @timeout) # :nodoc:
  begin
    io = socket
    io.timed_write(request, timeout)
    io
  rescue SystemCallError, MogileFS::RequestTruncatedError  => err
    @dead[@active_host] = [ Time.now, err ]
    shutdown_unlocked
    retry
  end
end

#do_request(cmd, args, idempotent = false) ⇒ Object

Performs the cmd request with args.



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/mogilefs/backend.rb', line 231

def do_request(cmd, args, idempotent = false)
  no_raise = args.delete(:ruby_no_raise)
  request = make_request(cmd, args)
  line = nil
  failed = false
  @mutex.synchronize do
    begin
      io = dispatch_unlocked(request)
      line = io.timed_gets(@timeout)
      break if /\r?\n\z/ =~ line

      line and raise MogileFS::InvalidResponseError,
                     "Invalid response from server: #{line.inspect}"

      idempotent or
        raise EOFError, "end of file reached after: #{request.inspect}"
      # fall through to retry in loop
    rescue SystemCallError,
           MogileFS::InvalidResponseError # truncated response
      # we got a successful timed_write, but not a timed_gets
      if idempotent
        failed = true
        shutdown_unlocked(false)
        retry
      end
      shutdown_unlocked(true)
    rescue MogileFS::UnreadableSocketError, MogileFS::Timeout
      shutdown_unlocked(true)
    rescue
      # we DO NOT want the response we timed out waiting for, to crop up later
      # on, on the same socket, intersperesed with a subsequent request!  we
      # close the socket if there's any error.
      shutdown_unlocked(true)
    end while idempotent
    shutdown_unlocked if failed
  end # @mutex.synchronize
  parse_response(line, no_raise ? request : nil)
end

#error(err_snake) ⇒ Object

this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes



279
280
281
# File 'lib/mogilefs/backend.rb', line 279

def error(err_snake)
  BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake)
end

#make_request(cmd, args) ⇒ Object

Makes a new request string for cmd and args.



271
272
273
# File 'lib/mogilefs/backend.rb', line 271

def make_request(cmd, args)
  "#{cmd} #{url_encode args}\r\n"
end

#parse_response(line, request = nil) ⇒ Object

Turns the line response from the server into a Hash of options, an error, or raises, as appropriate.



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/mogilefs/backend.rb', line 285

def parse_response(line, request = nil)
  case line
  when /\AOK\s+\d*\s*(\S*)\r?\n\z/
    url_decode($1)
  when /\AERR\s+(\w+)\s*([^\r\n]*)/
    @lasterr = $1
    @lasterrstr = $2 ? url_unescape($2) : nil
    if request
      request = " request=#{request.strip}"
      @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request
      return error(@lasterr).new(@lasterrstr)
    end
    raise error(@lasterr).new(@lasterrstr)
  else
    raise MogileFS::InvalidResponseError,
          "Invalid response from server: #{line.inspect}"
  end
end

#pipeline_dispatch(cmd, args, &block) ⇒ Object

dispatch a request like do_request, but queue block for execution upon receiving a response. It is the users’ responsibility to ensure &block is executed in the correct order. Trackers with multiple queryworkers are not guaranteed to return responses in the same order they were requested.



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/mogilefs/backend.rb', line 192

def pipeline_dispatch(cmd, args, &block) # :nodoc:
  request = make_request(cmd, args)
  timeout = @timeout

  @mutex.synchronize do
    io = socket
    timeout = pipeline_drain_unlocked(io, timeout)

    # send the request out...
    begin
      io.timed_write(request, timeout)
      @pending << [ request, block ]
    rescue SystemCallError, MogileFS::RequestTruncatedError => err
      @dead[@active_host] = [ Time.now, err ]
      shutdown_unlocked(@pending[0])
      io = socket
      retry
    end

    @pending.size
  end
end

#pipeline_drain_unlocked(io, timeout) ⇒ Object

try to read any responses we have pending already before filling the pipeline more requests. This usually takes very little time, but trackers may return huge responses and we could be on a slow network.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/mogilefs/backend.rb', line 169

def pipeline_drain_unlocked(io, timeout) # :nodoc:
  set = [ io ]
  while @pending.size > 0
    t0 = Time.now
    r = IO.select(set, set, nil, timeout)
    timeout = timeout_update(timeout, t0)

    if r && r[0][0]
      t0 = Time.now
      pipeline_gets_unlocked(io, timeout)
      timeout = timeout_update(timeout, t0)
    else
      return timeout
    end
  end
  timeout
end

#pipeline_gets_unlocked(io, timeout) ⇒ Object

:nodoc:



152
153
154
155
156
157
158
# File 'lib/mogilefs/backend.rb', line 152

def pipeline_gets_unlocked(io, timeout) # :nodoc:
  line = io.timed_gets(timeout) or
    raise MogileFS::PipelineError,
          "EOF with #{@pending.size} requests in-flight"
  ready = @pending.shift
  ready[1].call(parse_response(line, ready[0]))
end

#pipeline_wait(count = nil) ⇒ Object

:nodoc:



215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/mogilefs/backend.rb', line 215

def pipeline_wait(count = nil) # :nodoc:
  @mutex.synchronize do
    io = socket
    count ||= @pending.size
    @pending.size < count and
      raise MogileFS::Error,
            "pending=#{@pending.size} < expected=#{count} failed"
    begin
      count.times { pipeline_gets_unlocked(io, @timeout) }
    rescue
      shutdown_unlocked(true)
    end
  end
end

#shutdownObject

Closes this backend’s socket.



93
94
95
# File 'lib/mogilefs/backend.rb', line 93

def shutdown
  @mutex.synchronize { shutdown_unlocked }
end

#shutdown_unlocked(do_raise = false) ⇒ Object

:nodoc:



131
132
133
134
135
136
137
138
# File 'lib/mogilefs/backend.rb', line 131

def shutdown_unlocked(do_raise = false) # :nodoc:
  @pending = []
  if @socket
    @socket.close rescue nil # ignore errors
    @socket = nil
  end
  raise if do_raise
end

#socketObject

Returns a socket connected to a MogileFS tracker.



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/mogilefs/backend.rb', line 337

def socket
  return @socket if @socket and not @socket.closed?

  @hosts.shuffle.each do |host|
    next if dead = @dead[host] and dead[0] > (Time.now - @fail_timeout)

    begin
      addr, port = host.split(/:/)
      @socket = MogileFS::Socket.tcp(addr, port, @timeout)
      @active_host = host
    rescue SystemCallError, MogileFS::Timeout => err
      @dead[host] = [ Time.now, err ]
      next
    end

    return @socket
  end

  errors = @dead.map { |host,(_,e)| "#{host} - #{e.message} (#{e.class})" }
  raise MogileFS::UnreachableBackendError,
        "couldn't connect to any tracker: #{errors.join(', ')}"
end

#timeout_update(timeout, t0) ⇒ Object

:nodoc:



160
161
162
163
# File 'lib/mogilefs/backend.rb', line 160

def timeout_update(timeout, t0) # :nodoc:
  timeout -= (Time.now - t0)
  timeout < 0 ? 0 : timeout
end

#url_decode(str) ⇒ Object

Turns a url params string into a Hash.



361
362
363
364
365
366
367
368
# File 'lib/mogilefs/backend.rb', line 361

def url_decode(str) # :nodoc:
  rv = {}
  str.split(/&/).each do |pair|
    k, v = pair.split(/=/, 2).map! { |x| url_unescape(x) }
    rv[k.freeze] = v
  end
  rv
end

#url_encode(params) ⇒ Object

Turns a Hash (or Array of pairs) into a url params string.



377
378
379
380
381
# File 'lib/mogilefs/backend.rb', line 377

def url_encode(params) # :nodoc:
  params.map do |k,v|
    "#{url_escape k.to_s}=#{url_escape v.to_s}"
  end.join("&")
end

#url_escape(str) ⇒ Object

Ruby 1.8



385
386
387
# File 'lib/mogilefs/backend.rb', line 385

def url_escape(str) # :nodoc:
  str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+')
end

#url_unescape(str) ⇒ Object

Unescapes naughty URL characters.



395
396
397
# File 'lib/mogilefs/backend.rb', line 395

def url_unescape(str) # :nodoc:
  str.tr('+', ' ').gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' }
end