Class: MogileFS::Backend
- Inherits:
-
Object
- Object
- MogileFS::Backend
- Defined in:
- lib/mogilefs/backend.rb
Overview
This class communicates with the MogileFS trackers. You should not have to use this directly unless you are developing support for new commands or plugins for MogileFS
Constant Summary collapse
- BACKEND_ERRORS =
:nodoc:
{}
Instance Attribute Summary collapse
-
#lasterr ⇒ Object
readonly
The last error.
-
#lasterrstr ⇒ Object
readonly
The string attached to the last error.
Class Method Summary collapse
-
.add_command(*names) ⇒ Object
Adds MogileFS commands
names
. -
.add_error(err_snake) ⇒ Object
this converts an error code from a mogilefsd tracker to an exception:.
-
.add_idempotent_command(*names) ⇒ Object
adds idempotent MogileFS commands
names
, these commands may be retried transparently on a different tracker if there is a network/server error. -
.const_missing(name) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#clear_cache(types = %w(all))) ⇒ Object
this command is special since the cache is per-tracker, so we connect to all backends and not just one.
-
#dispatch_unlocked(request, timeout = @timeout) ⇒ Object
:nodoc:.
-
#do_request(cmd, args, idempotent = false) ⇒ Object
Performs the
cmd
request withargs
. -
#error(err_snake) ⇒ Object
this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes.
-
#initialize(args) ⇒ Backend
constructor
Creates a new MogileFS::Backend.
-
#make_request(cmd, args) ⇒ Object
Makes a new request string for
cmd
andargs
. -
#parse_response(line, request = nil) ⇒ Object
Turns the
line
response from the server into a Hash of options, an error, or raises, as appropriate. -
#pipeline_dispatch(cmd, args, &block) ⇒ Object
dispatch a request like do_request, but queue
block
for execution upon receiving a response. -
#pipeline_drain_unlocked(io, timeout) ⇒ Object
try to read any responses we have pending already before filling the pipeline more requests.
-
#pipeline_gets_unlocked(io, timeout) ⇒ Object
:nodoc:.
-
#pipeline_wait(count = nil) ⇒ Object
:nodoc:.
-
#shutdown ⇒ Object
Closes this backend’s socket.
-
#shutdown_unlocked(do_raise = false) ⇒ Object
:nodoc:.
-
#socket ⇒ Object
Returns a socket connected to a MogileFS tracker.
-
#timeout_update(timeout, t0) ⇒ Object
:nodoc:.
-
#url_decode(str) ⇒ Object
Turns a url params string into a Hash.
-
#url_encode(params) ⇒ Object
Turns a Hash (or Array of pairs) into a url params string.
-
#url_escape(str) ⇒ Object
Ruby 1.8.
-
#url_unescape(str) ⇒ Object
Unescapes naughty URL characters.
Constructor Details
#initialize(args) ⇒ Backend
Creates a new MogileFS::Backend.
:hosts is a required argument and must be an Array containing one or more ‘hostname:port’ pairs as Strings.
:timeout adjusts the request timeout before an error is returned.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/mogilefs/backend.rb', line 71 def initialize(args) @hosts = args[:hosts] @fail_timeout = args[:fail_timeout] || 5 raise ArgumentError, "must specify at least one host" unless @hosts raise ArgumentError, "must specify at least one host" if @hosts.empty? unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then raise ArgumentError, ":hosts must be in 'host:port' form" end @mutex = Mutex.new @timeout = args[:timeout] || 3 @connect_timeout = args[:connect_timeout] || @timeout @socket = nil @lasterr = nil @lasterrstr = nil @pending = [] @dead = {} end |
Instance Attribute Details
#lasterr ⇒ Object (readonly)
The last error
56 57 58 |
# File 'lib/mogilefs/backend.rb', line 56 def lasterr @lasterr end |
#lasterrstr ⇒ Object (readonly)
The string attached to the last error
61 62 63 |
# File 'lib/mogilefs/backend.rb', line 61 def lasterrstr @lasterrstr end |
Class Method Details
.add_command(*names) ⇒ Object
Adds MogileFS commands names
.
10 11 12 13 14 15 16 |
# File 'lib/mogilefs/backend.rb', line 10 def self.add_command(*names) names.each do |name| define_method name do |*args| do_request(name, args[0] || {}, false) end end end |
.add_error(err_snake) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/mogilefs/backend.rb', line 36 def self.add_error(err_snake) err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase } err_camel << 'Error' unless /Error\z/ =~ err_camel unless const_defined?(err_camel) const_set(err_camel, Class.new(MogileFS::Error)) end BACKEND_ERRORS[err_snake] = const_get(err_camel) end |
.add_idempotent_command(*names) ⇒ Object
adds idempotent MogileFS commands names
, these commands may be retried transparently on a different tracker if there is a network/server error.
20 21 22 23 24 25 26 |
# File 'lib/mogilefs/backend.rb', line 20 def self.add_idempotent_command(*names) names.each do |name| define_method name do |*args| do_request(name, args[0] || {}, true) end end end |
.const_missing(name) ⇒ Object
:nodoc:
45 46 47 48 49 50 51 |
# File 'lib/mogilefs/backend.rb', line 45 def self.const_missing(name) # :nodoc: if /Error\z/ =~ name.to_s const_set(name, Class.new(MogileFS::Error)) else super name end end |
Instance Method Details
#clear_cache(types = %w(all))) ⇒ Object
this command is special since the cache is per-tracker, so we connect to all backends and not just one
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/mogilefs/backend.rb', line 312 def clear_cache(types = %w(all)) opts = {} types.each { |type| opts[type] = 1 } sockets = @hosts.map do |host| MogileFS::Socket.start(*(host.split(':'.freeze))) rescue nil end sockets.compact! wpending = sockets rpending = [] request = make_request("clear_cache", opts) while wpending[0] || rpending[0] r = IO.select(rpending, wpending, nil, @timeout) or return rpending -= r[0] wpending -= r[1] r[0].each { |io| io.timed_gets(0) rescue nil } r[1].each do |io| begin io.timed_write(request, 0) rpending << io rescue end end end nil ensure sockets.each { |io| io.close } end |
#dispatch_unlocked(request, timeout = @timeout) ⇒ Object
:nodoc:
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/mogilefs/backend.rb', line 141 def dispatch_unlocked(request, timeout = @timeout) # :nodoc: tries = nil begin io = socket io.timed_write(request, timeout) io rescue SystemCallError, MogileFS::RequestTruncatedError => err tries ||= Hash.new { |hash,host| hash[host] = 0 } nr = tries[@active_host] += 1 if nr >= 2 @dead[@active_host] = [ MogileFS.now, err ] end shutdown_unlocked retry end end |
#do_request(cmd, args, idempotent = false) ⇒ Object
Performs the cmd
request with args
.
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/mogilefs/backend.rb', line 237 def do_request(cmd, args, idempotent = false) no_raise = args.delete(:ruby_no_raise) request = make_request(cmd, args) line = nil failed = false @mutex.synchronize do begin io = dispatch_unlocked(request) line = io.timed_gets(@timeout) break if /\n\z/ =~ line line and raise MogileFS::InvalidResponseError, "Invalid response from server: #{line.inspect}" idempotent or raise EOFError, "end of file reached after: #{request.inspect}" # fall through to retry in loop rescue SystemCallError, MogileFS::InvalidResponseError # truncated response # we got a successful timed_write, but not a timed_gets if idempotent failed = true shutdown_unlocked(false) retry end shutdown_unlocked(true) rescue MogileFS::UnreadableSocketError, MogileFS::Timeout shutdown_unlocked(true) rescue # we DO NOT want the response we timed out waiting for, to crop up later # on, on the same socket, intersperesed with a subsequent request! we # close the socket if there's any error. shutdown_unlocked(true) end while idempotent shutdown_unlocked if failed end # @mutex.synchronize parse_response(line, no_raise ? request : nil) end |
#error(err_snake) ⇒ Object
this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes
285 286 287 |
# File 'lib/mogilefs/backend.rb', line 285 def error(err_snake) BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake) end |
#make_request(cmd, args) ⇒ Object
Makes a new request string for cmd
and args
.
277 278 279 |
# File 'lib/mogilefs/backend.rb', line 277 def make_request(cmd, args) "#{cmd} #{url_encode args}\r\n" end |
#parse_response(line, request = nil) ⇒ Object
Turns the line
response from the server into a Hash of options, an error, or raises, as appropriate.
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/mogilefs/backend.rb', line 291 def parse_response(line, request = nil) case line when /\AOK\s+\d*\s*(\S*)\r?\n\z/ url_decode($1) when /\AERR\s+(\w+)\s*([^\r\n]*)/ @lasterr = $1 @lasterrstr = $2 ? url_unescape($2) : nil if request request = " request=#{request.strip}" @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request return error(@lasterr).new(@lasterrstr) end raise error(@lasterr).new(@lasterrstr) else raise MogileFS::InvalidResponseError, "Invalid response from server: #{line.inspect}" end end |
#pipeline_dispatch(cmd, args, &block) ⇒ Object
dispatch a request like do_request, but queue block
for execution upon receiving a response. It is the users’ responsibility to ensure &block is executed in the correct order. Trackers with multiple queryworkers are not guaranteed to return responses in the same order they were requested.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/mogilefs/backend.rb', line 198 def pipeline_dispatch(cmd, args, &block) # :nodoc: request = make_request(cmd, args) timeout = @timeout @mutex.synchronize do io = socket timeout = pipeline_drain_unlocked(io, timeout) # send the request out... begin io.timed_write(request, timeout) @pending << [ request, block ] rescue SystemCallError, MogileFS::RequestTruncatedError => err @dead[@active_host] = [ MogileFS.now, err ] shutdown_unlocked(@pending[0]) io = socket retry end @pending.size end end |
#pipeline_drain_unlocked(io, timeout) ⇒ Object
try to read any responses we have pending already before filling the pipeline more requests. This usually takes very little time, but trackers may return huge responses and we could be on a slow network.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/mogilefs/backend.rb', line 175 def pipeline_drain_unlocked(io, timeout) # :nodoc: set = [ io ] while @pending.size > 0 t0 = MogileFS.now r = IO.select(set, set, nil, timeout) timeout = timeout_update(timeout, t0) if r && r[0][0] t0 = MogileFS.now pipeline_gets_unlocked(io, timeout) timeout = timeout_update(timeout, t0) else return timeout end end timeout end |
#pipeline_gets_unlocked(io, timeout) ⇒ Object
:nodoc:
158 159 160 161 162 163 164 |
# File 'lib/mogilefs/backend.rb', line 158 def pipeline_gets_unlocked(io, timeout) # :nodoc: line = io.timed_gets(timeout) or raise MogileFS::PipelineError, "EOF with #{@pending.size} requests in-flight" ready = @pending.shift ready[1].call(parse_response(line, ready[0])) end |
#pipeline_wait(count = nil) ⇒ Object
:nodoc:
221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/mogilefs/backend.rb', line 221 def pipeline_wait(count = nil) # :nodoc: @mutex.synchronize do io = socket count ||= @pending.size @pending.size < count and raise MogileFS::Error, "pending=#{@pending.size} < expected=#{count} failed" begin count.times { pipeline_gets_unlocked(io, @timeout) } rescue shutdown_unlocked(true) end end end |
#shutdown ⇒ Object
Closes this backend’s socket.
94 95 96 |
# File 'lib/mogilefs/backend.rb', line 94 def shutdown @mutex.synchronize { shutdown_unlocked } end |
#shutdown_unlocked(do_raise = false) ⇒ Object
:nodoc:
132 133 134 135 136 137 138 139 |
# File 'lib/mogilefs/backend.rb', line 132 def shutdown_unlocked(do_raise = false) # :nodoc: @pending = [] if @socket @socket.close rescue nil # ignore errors @socket = nil end raise if do_raise end |
#socket ⇒ Object
Returns a socket connected to a MogileFS tracker.
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/mogilefs/backend.rb', line 343 def socket return @socket if @socket and not @socket.closed? @hosts.shuffle.each do |host| next if dead = @dead[host] and dead[0] > (MogileFS.now - @fail_timeout) begin addr, port = host.split(':'.freeze) @socket = MogileFS::Socket.tcp(addr, port, @connect_timeout) @active_host = host rescue SystemCallError, MogileFS::Timeout => err @dead[host] = [ MogileFS.now, err ] next end return @socket end errors = @dead.map { |host,(_,e)| "#{host} - #{e.} (#{e.class})" } raise MogileFS::UnreachableBackendError, "couldn't connect to any tracker: #{errors.join(', ')}" end |
#timeout_update(timeout, t0) ⇒ Object
:nodoc:
166 167 168 169 |
# File 'lib/mogilefs/backend.rb', line 166 def timeout_update(timeout, t0) # :nodoc: timeout -= (MogileFS.now - t0) timeout < 0 ? 0 : timeout end |
#url_decode(str) ⇒ Object
Turns a url params string into a Hash.
367 368 369 370 371 372 373 374 |
# File 'lib/mogilefs/backend.rb', line 367 def url_decode(str) # :nodoc: rv = {} str.split('&'.freeze).each do |pair| k, v = pair.split('='.freeze, 2).map! { |x| url_unescape(x) } rv[k.freeze] = v end rv end |
#url_encode(params) ⇒ Object
Turns a Hash (or Array of pairs) into a url params string.
383 384 385 386 387 |
# File 'lib/mogilefs/backend.rb', line 383 def url_encode(params) # :nodoc: params.map do |k,v| "#{url_escape k.to_s}=#{url_escape v.to_s}" end.join('&'.freeze) end |
#url_escape(str) ⇒ Object
Ruby 1.8
391 392 393 394 395 |
# File 'lib/mogilefs/backend.rb', line 391 def url_escape(str) # :nodoc: str = str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x".freeze % $1.ord } str.tr!(' '.freeze, '+'.freeze) str end |
#url_unescape(str) ⇒ Object
Unescapes naughty URL characters.
403 404 405 406 407 |
# File 'lib/mogilefs/backend.rb', line 403 def url_unescape(str) # :nodoc: str = str.tr('+'.freeze, ' '.freeze) str.gsub!(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack('C'.freeze) } str end |