Class: MogileFS::Backend
- Inherits:
-
Object
- Object
- MogileFS::Backend
- Defined in:
- lib/mogilefs/backend.rb
Overview
This class communicates with the MogileFS trackers. You should not have to use this directly unless you are developing support for new commands or plugins for MogileFS
Constant Summary collapse
- BACKEND_ERRORS =
:nodoc:
{}
Instance Attribute Summary collapse
-
#lasterr ⇒ Object
readonly
The last error.
-
#lasterrstr ⇒ Object
readonly
The string attached to the last error.
Class Method Summary collapse
-
.add_command(*names) ⇒ Object
Adds MogileFS commands
names
. -
.add_error(err_snake) ⇒ Object
this converts an error code from a mogilefsd tracker to an exception:.
-
.add_idempotent_command(*names) ⇒ Object
adds idempotent MogileFS commands
names
, these commands may be retried transparently on a different tracker if there is a network/server error. -
.const_missing(name) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#clear_cache(types = %w(all)) ⇒ Object
this command is special since the cache is per-tracker, so we connect to all backends and not just one.
-
#dispatch_unlocked(request, timeout = @timeout) ⇒ Object
:nodoc:.
-
#do_request(cmd, args, idempotent = false) ⇒ Object
Performs the
cmd
request withargs
. -
#error(err_snake) ⇒ Object
this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes.
-
#initialize(args) ⇒ Backend
constructor
Creates a new MogileFS::Backend.
-
#make_request(cmd, args) ⇒ Object
Makes a new request string for
cmd
andargs
. -
#parse_response(line, request = nil) ⇒ Object
Turns the
line
response from the server into a Hash of options, an error, or raises, as appropriate. -
#pipeline_dispatch(cmd, args, &block) ⇒ Object
dispatch a request like do_request, but queue
block
for execution upon receiving a response. -
#pipeline_drain_unlocked(io, timeout) ⇒ Object
try to read any responses we have pending already before filling the pipeline more requests.
-
#pipeline_gets_unlocked(io, timeout) ⇒ Object
:nodoc:.
-
#pipeline_wait(count = nil) ⇒ Object
:nodoc:.
-
#shutdown ⇒ Object
Closes this backend’s socket.
-
#shutdown_unlocked(do_raise = false) ⇒ Object
:nodoc:.
-
#socket ⇒ Object
Returns a socket connected to a MogileFS tracker.
-
#timeout_update(timeout, t0) ⇒ Object
:nodoc:.
-
#url_decode(str) ⇒ Object
Turns a url params string into a Hash.
-
#url_encode(params) ⇒ Object
Turns a Hash (or Array of pairs) into a url params string.
-
#url_escape(str) ⇒ Object
Ruby 1.8.
-
#url_unescape(str) ⇒ Object
Unescapes naughty URL characters.
Constructor Details
#initialize(args) ⇒ Backend
Creates a new MogileFS::Backend.
:hosts is a required argument and must be an Array containing one or more ‘hostname:port’ pairs as Strings.
:timeout adjusts the request timeout before an error is returned.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/mogilefs/backend.rb', line 71 def initialize(args) @hosts = args[:hosts] @fail_timeout = args[:fail_timeout] || 5 raise ArgumentError, "must specify at least one host" unless @hosts raise ArgumentError, "must specify at least one host" if @hosts.empty? unless @hosts == @hosts.select { |h| h =~ /:\d+$/ } then raise ArgumentError, ":hosts must be in 'host:port' form" end @mutex = Mutex.new @timeout = args[:timeout] || 3 @socket = nil @lasterr = nil @lasterrstr = nil @pending = [] @dead = {} end |
Instance Attribute Details
#lasterr ⇒ Object (readonly)
The last error
56 57 58 |
# File 'lib/mogilefs/backend.rb', line 56 def lasterr @lasterr end |
#lasterrstr ⇒ Object (readonly)
The string attached to the last error
61 62 63 |
# File 'lib/mogilefs/backend.rb', line 61 def lasterrstr @lasterrstr end |
Class Method Details
.add_command(*names) ⇒ Object
Adds MogileFS commands names
.
10 11 12 13 14 15 16 |
# File 'lib/mogilefs/backend.rb', line 10 def self.add_command(*names) names.each do |name| define_method name do |*args| do_request(name, args[0] || {}, false) end end end |
.add_error(err_snake) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/mogilefs/backend.rb', line 36 def self.add_error(err_snake) err_camel = err_snake.gsub(/(?:^|_)([a-z])/) { $1.upcase } err_camel << 'Error' unless /Error\z/ =~ err_camel unless const_defined?(err_camel) const_set(err_camel, Class.new(MogileFS::Error)) end BACKEND_ERRORS[err_snake] = const_get(err_camel) end |
.add_idempotent_command(*names) ⇒ Object
adds idempotent MogileFS commands names
, these commands may be retried transparently on a different tracker if there is a network/server error.
20 21 22 23 24 25 26 |
# File 'lib/mogilefs/backend.rb', line 20 def self.add_idempotent_command(*names) names.each do |name| define_method name do |*args| do_request(name, args[0] || {}, true) end end end |
.const_missing(name) ⇒ Object
:nodoc:
45 46 47 48 49 50 51 |
# File 'lib/mogilefs/backend.rb', line 45 def self.const_missing(name) # :nodoc: if /Error\z/ =~ name.to_s const_set(name, Class.new(MogileFS::Error)) else super name end end |
Instance Method Details
#clear_cache(types = %w(all)) ⇒ Object
this command is special since the cache is per-tracker, so we connect to all backends and not just one
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/mogilefs/backend.rb', line 306 def clear_cache(types = %w(all)) opts = {} types.each { |type| opts[type] = 1 } sockets = @hosts.map do |host| MogileFS::Socket.start(*(host.split(/:/))) rescue nil end sockets.compact! wpending = sockets rpending = [] request = make_request("clear_cache", opts) while wpending[0] || rpending[0] r = IO.select(rpending, wpending, nil, @timeout) or return rpending -= r[0] wpending -= r[1] r[0].each { |io| io.timed_gets(0) rescue nil } r[1].each do |io| begin io.timed_write(request, 0) rpending << io rescue end end end nil ensure sockets.each { |io| io.close } end |
#dispatch_unlocked(request, timeout = @timeout) ⇒ Object
:nodoc:
140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/mogilefs/backend.rb', line 140 def dispatch_unlocked(request, timeout = @timeout) # :nodoc: begin io = socket io.timed_write(request, timeout) io rescue SystemCallError, MogileFS::RequestTruncatedError => err @dead[@active_host] = [ Time.now, err ] shutdown_unlocked retry end end |
#do_request(cmd, args, idempotent = false) ⇒ Object
Performs the cmd
request with args
.
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/mogilefs/backend.rb', line 231 def do_request(cmd, args, idempotent = false) no_raise = args.delete(:ruby_no_raise) request = make_request(cmd, args) line = nil failed = false @mutex.synchronize do begin io = dispatch_unlocked(request) line = io.timed_gets(@timeout) break if /\r?\n\z/ =~ line line and raise MogileFS::InvalidResponseError, "Invalid response from server: #{line.inspect}" idempotent or raise EOFError, "end of file reached after: #{request.inspect}" # fall through to retry in loop rescue SystemCallError, MogileFS::InvalidResponseError # truncated response # we got a successful timed_write, but not a timed_gets if idempotent failed = true shutdown_unlocked(false) retry end shutdown_unlocked(true) rescue MogileFS::UnreadableSocketError, MogileFS::Timeout shutdown_unlocked(true) rescue # we DO NOT want the response we timed out waiting for, to crop up later # on, on the same socket, intersperesed with a subsequent request! we # close the socket if there's any error. shutdown_unlocked(true) end while idempotent shutdown_unlocked if failed end # @mutex.synchronize parse_response(line, no_raise ? request : nil) end |
#error(err_snake) ⇒ Object
this converts an error code from a mogilefsd tracker to an exception Most of these exceptions should already be defined, but since the MogileFS server code is liable to change and we may not always be able to keep up with the changes
279 280 281 |
# File 'lib/mogilefs/backend.rb', line 279 def error(err_snake) BACKEND_ERRORS[err_snake] || self.class.add_error(err_snake) end |
#make_request(cmd, args) ⇒ Object
Makes a new request string for cmd
and args
.
271 272 273 |
# File 'lib/mogilefs/backend.rb', line 271 def make_request(cmd, args) "#{cmd} #{url_encode args}\r\n" end |
#parse_response(line, request = nil) ⇒ Object
Turns the line
response from the server into a Hash of options, an error, or raises, as appropriate.
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/mogilefs/backend.rb', line 285 def parse_response(line, request = nil) case line when /\AOK\s+\d*\s*(\S*)\r?\n\z/ url_decode($1) when /\AERR\s+(\w+)\s*([^\r\n]*)/ @lasterr = $1 @lasterrstr = $2 ? url_unescape($2) : nil if request request = " request=#{request.strip}" @lasterrstr = @lasterrstr ? (@lasterrstr << request) : request return error(@lasterr).new(@lasterrstr) end raise error(@lasterr).new(@lasterrstr) else raise MogileFS::InvalidResponseError, "Invalid response from server: #{line.inspect}" end end |
#pipeline_dispatch(cmd, args, &block) ⇒ Object
dispatch a request like do_request, but queue block
for execution upon receiving a response. It is the users’ responsibility to ensure &block is executed in the correct order. Trackers with multiple queryworkers are not guaranteed to return responses in the same order they were requested.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/mogilefs/backend.rb', line 192 def pipeline_dispatch(cmd, args, &block) # :nodoc: request = make_request(cmd, args) timeout = @timeout @mutex.synchronize do io = socket timeout = pipeline_drain_unlocked(io, timeout) # send the request out... begin io.timed_write(request, timeout) @pending << [ request, block ] rescue SystemCallError, MogileFS::RequestTruncatedError => err @dead[@active_host] = [ Time.now, err ] shutdown_unlocked(@pending[0]) io = socket retry end @pending.size end end |
#pipeline_drain_unlocked(io, timeout) ⇒ Object
try to read any responses we have pending already before filling the pipeline more requests. This usually takes very little time, but trackers may return huge responses and we could be on a slow network.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/mogilefs/backend.rb', line 169 def pipeline_drain_unlocked(io, timeout) # :nodoc: set = [ io ] while @pending.size > 0 t0 = Time.now r = IO.select(set, set, nil, timeout) timeout = timeout_update(timeout, t0) if r && r[0][0] t0 = Time.now pipeline_gets_unlocked(io, timeout) timeout = timeout_update(timeout, t0) else return timeout end end timeout end |
#pipeline_gets_unlocked(io, timeout) ⇒ Object
:nodoc:
152 153 154 155 156 157 158 |
# File 'lib/mogilefs/backend.rb', line 152 def pipeline_gets_unlocked(io, timeout) # :nodoc: line = io.timed_gets(timeout) or raise MogileFS::PipelineError, "EOF with #{@pending.size} requests in-flight" ready = @pending.shift ready[1].call(parse_response(line, ready[0])) end |
#pipeline_wait(count = nil) ⇒ Object
:nodoc:
215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/mogilefs/backend.rb', line 215 def pipeline_wait(count = nil) # :nodoc: @mutex.synchronize do io = socket count ||= @pending.size @pending.size < count and raise MogileFS::Error, "pending=#{@pending.size} < expected=#{count} failed" begin count.times { pipeline_gets_unlocked(io, @timeout) } rescue shutdown_unlocked(true) end end end |
#shutdown ⇒ Object
Closes this backend’s socket.
93 94 95 |
# File 'lib/mogilefs/backend.rb', line 93 def shutdown @mutex.synchronize { shutdown_unlocked } end |
#shutdown_unlocked(do_raise = false) ⇒ Object
:nodoc:
131 132 133 134 135 136 137 138 |
# File 'lib/mogilefs/backend.rb', line 131 def shutdown_unlocked(do_raise = false) # :nodoc: @pending = [] if @socket @socket.close rescue nil # ignore errors @socket = nil end raise if do_raise end |
#socket ⇒ Object
Returns a socket connected to a MogileFS tracker.
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/mogilefs/backend.rb', line 337 def socket return @socket if @socket and not @socket.closed? @hosts.shuffle.each do |host| next if dead = @dead[host] and dead[0] > (Time.now - @fail_timeout) begin addr, port = host.split(/:/) @socket = MogileFS::Socket.tcp(addr, port, @timeout) @active_host = host rescue SystemCallError, MogileFS::Timeout => err @dead[host] = [ Time.now, err ] next end return @socket end errors = @dead.map { |host,(_,e)| "#{host} - #{e.} (#{e.class})" } raise MogileFS::UnreachableBackendError, "couldn't connect to any tracker: #{errors.join(', ')}" end |
#timeout_update(timeout, t0) ⇒ Object
:nodoc:
160 161 162 163 |
# File 'lib/mogilefs/backend.rb', line 160 def timeout_update(timeout, t0) # :nodoc: timeout -= (Time.now - t0) timeout < 0 ? 0 : timeout end |
#url_decode(str) ⇒ Object
Turns a url params string into a Hash.
361 362 363 364 365 366 367 368 |
# File 'lib/mogilefs/backend.rb', line 361 def url_decode(str) # :nodoc: rv = {} str.split(/&/).each do |pair| k, v = pair.split(/=/, 2).map! { |x| url_unescape(x) } rv[k.freeze] = v end rv end |
#url_encode(params) ⇒ Object
Turns a Hash (or Array of pairs) into a url params string.
377 378 379 380 381 |
# File 'lib/mogilefs/backend.rb', line 377 def url_encode(params) # :nodoc: params.map do |k,v| "#{url_escape k.to_s}=#{url_escape v.to_s}" end.join("&") end |
#url_escape(str) ⇒ Object
Ruby 1.8
385 386 387 |
# File 'lib/mogilefs/backend.rb', line 385 def url_escape(str) # :nodoc: str.gsub(/([^\w\,\-.\/\\\: ])/) { "%%%02x" % $1.ord }.tr(' ', '+') end |
#url_unescape(str) ⇒ Object
Unescapes naughty URL characters.
395 396 397 |
# File 'lib/mogilefs/backend.rb', line 395 def url_unescape(str) # :nodoc: str.tr('+', ' ').gsub(/%([a-f0-9][a-f0-9])/i) { [$1.to_i(16)].pack 'C' } end |