Class: Visor::Image::Store::HDFS
- Inherits:
-
Object
- Object
- Visor::Image::Store::HDFS
- Includes:
- Common::Exception
- Defined in:
- lib/image/store/hdfs.rb
Overview
The Apache Hadoop HDFS (HDFS) backend store.
This class handles the management of image files located in the HDFS storage system, based on a URI like *hdfs://username@s<host>:<port>/<path>/<bucket>/<image>*.
10.0.3.12:50075/webhdfs/v1/foo/1.iso?op=OPEN&user.name=hadoop&offset=0
Constant Summary collapse
- CONTEXT_ROOT =
"webhdfs/v1"
Instance Attribute Summary collapse
-
#base ⇒ Object
Returns the value of attribute base.
-
#bucket ⇒ Object
Returns the value of attribute bucket.
-
#config ⇒ Object
Returns the value of attribute config.
-
#file ⇒ Object
Returns the value of attribute file.
-
#host ⇒ Object
Returns the value of attribute host.
-
#port ⇒ Object
Returns the value of attribute port.
-
#uri ⇒ Object
Returns the value of attribute uri.
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
-
#delete ⇒ Object
Deletes the image file from its location.
-
#file_exists?(raise_exc = true) ⇒ True, False
Check if the image file exists.
- #generate_uri(params) ⇒ Object
-
#get ⇒ Object
Returns the image file to clients, streamed in chunks.
-
#initialize(uri, config) ⇒ Object
constructor
Initializes a new Cumulus store client object.
-
#save(id, tmp_file, format) ⇒ String, Integer
Saves the image file to the its final destination, based on the temporary file created by the server at data reception time.
Constructor Details
#initialize(uri, config) ⇒ Object
Initializes a new Cumulus store client object. Cumulus credentials are loaded from the URI, on GET and DELETE operations, or from the configuration file for POST and PUT operation.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/image/store/hdfs.rb', line 32 def initialize(uri, config) @uri = URI(uri) @config = config[:hdfs] if @uri.scheme @username = @uri.user @base = @uri.path.split('/')[1..2].join('/') @bucket = @uri.path.split('/')[3] @file = @uri.path.split('/')[4] @host = @uri.host @port = @uri.port else @username = @config[:username] @bucket = @config[:bucket] @base = CONTEXT_ROOT @host = @config[:host] @port = @config[:port] end end |
Instance Attribute Details
#base ⇒ Object
Returns the value of attribute base.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def base @base end |
#bucket ⇒ Object
Returns the value of attribute bucket.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def bucket @bucket end |
#config ⇒ Object
Returns the value of attribute config.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def config @config end |
#file ⇒ Object
Returns the value of attribute file.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def file @file end |
#host ⇒ Object
Returns the value of attribute host.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def host @host end |
#port ⇒ Object
Returns the value of attribute port.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def port @port end |
#uri ⇒ Object
Returns the value of attribute uri.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def uri @uri end |
#username ⇒ Object
Returns the value of attribute username.
21 22 23 |
# File 'lib/image/store/hdfs.rb', line 21 def username @username end |
Instance Method Details
#delete ⇒ Object
Deletes the image file from its location.
118 119 120 121 |
# File 'lib/image/store/hdfs.rb', line 118 def delete uri = generate_uri('op=DELETE&recursive=true') EventMachine::HttpRequest.new(uri).delete end |
#file_exists?(raise_exc = true) ⇒ True, False
Check if the image file exists.
133 134 135 136 137 138 139 140 |
# File 'lib/image/store/hdfs.rb', line 133 def file_exists?(raise_exc=true) uri = generate_uri('op=GETFILESTATUS') req = Net::HTTP::Get.new(uri.request_uri) res = Net::HTTP.new(uri.hostname, uri.port).request(req) exist = res.is_a? Net::HTTPSuccess raise NotFound, "No image file found at #{uri}" if raise_exc && !exist exist end |
#generate_uri(params) ⇒ Object
142 143 144 |
# File 'lib/image/store/hdfs.rb', line 142 def generate_uri(params) URI("http://#{host}:#{port}/#{base}/#{bucket}/#{file}?#{params}&user.name=#{username}") end |
#get ⇒ Object
Returns the image file to clients, streamed in chunks.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/image/store/hdfs.rb', line 57 def get uri = generate_uri('op=OPEN') # This raises cant yield from root fiber #res = EventMachine::HttpRequest.new(uri).get #url = URI(res.response_header['LOCATION']) #url.hostname = host #http = EventMachine::HttpRequest.new(url).aget # ... # This works, should substitute (uri).get with (url).get in down #require "net/http" #req = Net::HTTP::Get.new(uri.request_uri) #res = Net::HTTP.new(uri.hostname, uri.port).request(req) #url = URI(res['location']) #url.hostname = host #STDERR.puts "URL #{url}" # ... # This solves it by manually defining the final location uri.port = 50075 uri = uri.to_s + '&offset=0' http = EventMachine::HttpRequest.new(uri).aget finish = proc { yield nil } http.stream { |chunk| yield chunk } http.callback &finish http.errback &finish end |
#save(id, tmp_file, format) ⇒ String, Integer
Saves the image file to the its final destination, based on the temporary file created by the server at data reception time.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/image/store/hdfs.rb', line 97 def save(id, tmp_file, format) @file = "#{id}.#{format}" uri = "hdfs://#{username}@#{host}:#{port}/#{base}/#{bucket}/#{file}" size = tmp_file.size path = generate_uri('op=CREATE&overwrite=true') http = EventMachine::HttpRequest.new(path).put location = URI(http.response_header['LOCATION']) location.hostname = host raise Duplicated, "The image file #{uri} already exists" if file_exists?(false) STDERR.puts "COPYING!!" EventMachine::HttpRequest.new(location).put :file => tmp_file.path [uri, size] end |