Class: WebHDFS::ClientV2
- Inherits:
-
Object
- Object
- WebHDFS::ClientV2
- Defined in:
- lib/webhdfs/client_v2.rb
Direct Known Subclasses
Constant Summary collapse
- OPT_TABLE =
This hash table holds command options.
{}.freeze
- REDIRECTED_OPERATIONS =
%w(APPEND CREATE OPEN GETFILECHECKSUM).freeze
Instance Attribute Summary collapse
-
#doas ⇒ Object
Returns the value of attribute doas.
-
#host ⇒ Object
Returns the value of attribute host.
-
#http_headers ⇒ Object
Returns the value of attribute http_headers.
-
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
-
#kerberos ⇒ Object
Returns the value of attribute kerberos.
-
#open_timeout ⇒ Object
default 30s (in ruby net/http).
-
#port ⇒ Object
Returns the value of attribute port.
-
#proxy ⇒ Object
Returns the value of attribute proxy.
-
#read_timeout ⇒ Object
default 60s (in ruby net/http).
-
#retry_interval ⇒ Object
default 1 ([sec], ignored when retry_known_errors is false).
-
#retry_known_errors ⇒ Object
default false (not to retry).
-
#retry_times ⇒ Object
default 1 (ignored when retry_known_errors is false).
-
#ssl ⇒ Object
Returns the value of attribute ssl.
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
-
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op= APPEND”.
-
#checksum(path, options = {}) ⇒ Object
(also: #getfilechecksum)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”.
-
#chmod(path, mode, options = {}) ⇒ Object
(also: #setpermission)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION [&permission=<OCTAL>]”.
-
#chown(path, options = {}) ⇒ Object
(also: #setowner)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER [&owner=<USER>]”.
-
#content_summary(path, options = {}) ⇒ Object
(also: #getcontentsummary)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”.
-
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE [&overwrite=<true|false>] [&replication=<SHORT>] [&permission=<OCTAL>]”.
-
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE [&recursive=<true|false>]”.
-
#homedir(options = {}) ⇒ Object
(also: #gethomedirectory)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”.
-
#initialize(host = 'localhost', port = 50_070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil, http_headers = {}) ⇒ ClientV2
constructor
A new instance of ClientV2.
-
#list(path, options = {}) ⇒ Object
(also: #liststatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”.
-
#mkdir(path, options = {}) ⇒ Object
(also: #mkdirs)
curl -i -X PUT “http://<HOST>:<PORT>/<PATH>?op= MKDIRS”.
- #operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
-
#read(path, options = {}) ⇒ Object
(also: #open)
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN [&offset=<LONG>][&buffersize=<INT>]”.
-
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op= RENAME&destination=<PATH>”.
-
#replication(path, replnum, options = {}) ⇒ Object
(also: #setreplication)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION [&replication=<SHORT>]”.
- #request_options ⇒ Object
-
#stat(path, options = {}) ⇒ Object
(also: #getfilestatus)
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”.
-
#touch(path, options = {}) ⇒ Object
(also: #settimes)
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES [&modificationtime=<TIME>]” motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer.
Constructor Details
#initialize(host = 'localhost', port = 50_070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil, http_headers = {}) ⇒ ClientV2
Returns a new instance of ClientV2.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/webhdfs/client_v2.rb', line 38 def initialize(host = 'localhost', port = 50_070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil, http_headers = {}) @host = host @port = port @username = username @doas = doas @proxy = WebHDFS::Proxy.new(proxy_address, proxy_port) if proxy_address && proxy_port @retry_known_errors = false @retry_times = @retry_interval = 1 @httpfs_mode = false @ssl = nil @kerberos = nil @http_headers = http_headers end |
Instance Attribute Details
#doas ⇒ Object
Returns the value of attribute doas.
14 15 16 |
# File 'lib/webhdfs/client_v2.rb', line 14 def doas @doas end |
#host ⇒ Object
Returns the value of attribute host.
14 15 16 |
# File 'lib/webhdfs/client_v2.rb', line 14 def host @host end |
#http_headers ⇒ Object
Returns the value of attribute http_headers.
23 24 25 |
# File 'lib/webhdfs/client_v2.rb', line 23 def http_headers @http_headers end |
#httpfs_mode ⇒ Object
Returns the value of attribute httpfs_mode.
17 18 19 |
# File 'lib/webhdfs/client_v2.rb', line 17 def httpfs_mode @httpfs_mode end |
#kerberos ⇒ Object
Returns the value of attribute kerberos.
22 23 24 |
# File 'lib/webhdfs/client_v2.rb', line 22 def kerberos @kerberos end |
#open_timeout ⇒ Object
default 30s (in ruby net/http)
15 16 17 |
# File 'lib/webhdfs/client_v2.rb', line 15 def open_timeout @open_timeout end |
#port ⇒ Object
Returns the value of attribute port.
14 15 16 |
# File 'lib/webhdfs/client_v2.rb', line 14 def port @port end |
#proxy ⇒ Object
Returns the value of attribute proxy.
14 15 16 |
# File 'lib/webhdfs/client_v2.rb', line 14 def proxy @proxy end |
#read_timeout ⇒ Object
default 60s (in ruby net/http)
16 17 18 |
# File 'lib/webhdfs/client_v2.rb', line 16 def read_timeout @read_timeout end |
#retry_interval ⇒ Object
default 1 ([sec], ignored when retry_known_errors is false)
20 21 22 |
# File 'lib/webhdfs/client_v2.rb', line 20 def retry_interval @retry_interval end |
#retry_known_errors ⇒ Object
default false (not to retry)
18 19 20 |
# File 'lib/webhdfs/client_v2.rb', line 18 def retry_known_errors @retry_known_errors end |
#retry_times ⇒ Object
default 1 (ignored when retry_known_errors is false)
19 20 21 |
# File 'lib/webhdfs/client_v2.rb', line 19 def retry_times @retry_times end |
#ssl ⇒ Object
Returns the value of attribute ssl.
21 22 23 |
# File 'lib/webhdfs/client_v2.rb', line 21 def ssl @ssl end |
#username ⇒ Object
Returns the value of attribute username.
14 15 16 |
# File 'lib/webhdfs/client_v2.rb', line 14 def username @username end |
Instance Method Details
#append(path, body, options = {}) ⇒ Object
curl -i -X POST “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=
APPEND[&buffersize=<INT>]"
85 86 87 88 89 90 |
# File 'lib/webhdfs/client_v2.rb', line 85 def append(path, body, = {}) = .merge('data' => 'true') if @httpfs_mode WebHDFS.(, OPT_TABLE['APPEND']) res = operate_requests('POST', path, 'APPEND', , body) res.code == '200' end |
#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”
155 156 157 158 159 |
# File 'lib/webhdfs/client_v2.rb', line 155 def checksum(path, = {}) WebHDFS.(, OPT_TABLE['GETFILECHECKSUM']) res = operate_requests('GET', path, 'GETFILECHECKSUM', ) WebHDFS.check_success_json(res, 'FileChecksum') end |
#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION
[&permission=<OCTAL>]"
172 173 174 175 176 177 |
# File 'lib/webhdfs/client_v2.rb', line 172 def chmod(path, mode, = {}) WebHDFS.(, OPT_TABLE['SETPERMISSION']) res = operate_requests('PUT', path, 'SETPERMISSION', .merge('permission' => mode)) res.code == '200' end |
#chown(path, options = {}) ⇒ Object Also known as: setowner
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER
[&owner=<USER>][&group=<GROUP>]"
182 183 184 185 186 187 188 189 190 |
# File 'lib/webhdfs/client_v2.rb', line 182 def chown(path, = {}) WebHDFS.(, OPT_TABLE['SETOWNER']) unless .key?('owner') || .key?('group') || .key?(:owner) || .key?(:group) raise ArgumentError, "'chown' needs at least one of owner or group" end res = operate_requests('PUT', path, 'SETOWNER', ) res.code == '200' end |
#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”
147 148 149 150 151 |
# File 'lib/webhdfs/client_v2.rb', line 147 def content_summary(path, = {}) WebHDFS.(, OPT_TABLE['GETCONTENTSUMMARY']) res = operate_requests('GET', path, 'GETCONTENTSUMMARY', ) WebHDFS.check_success_json(res, 'ContentSummary') end |
#create(path, body, options = {}) ⇒ Object
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
[&overwrite=<true|false>][&blocksize=<LONG>]
[&replication=<SHORT>]
[&permission=<OCTAL>][&buffersize=<INT>]"
76 77 78 79 80 81 |
# File 'lib/webhdfs/client_v2.rb', line 76 def create(path, body, = {}) = .merge('data' => 'true') if @httpfs_mode WebHDFS.(, OPT_TABLE['CREATE']) res = operate_requests('PUT', path, 'CREATE', , body) res.code == '201' end |
#delete(path, options = {}) ⇒ Object
curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
[&recursive=<true|false>]"
124 125 126 127 128 |
# File 'lib/webhdfs/client_v2.rb', line 124 def delete(path, = {}) WebHDFS.(, OPT_TABLE['DELETE']) res = operate_requests('DELETE', path, 'DELETE', ) WebHDFS.check_success_json(res, 'boolean') end |
#homedir(options = {}) ⇒ Object Also known as: gethomedirectory
curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”
163 164 165 166 167 |
# File 'lib/webhdfs/client_v2.rb', line 163 def homedir( = {}) WebHDFS.(, OPT_TABLE['GETHOMEDIRECTORY']) res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', ) WebHDFS.check_success_json(res, 'Path') end |
#list(path, options = {}) ⇒ Object Also known as: liststatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”
139 140 141 142 143 |
# File 'lib/webhdfs/client_v2.rb', line 139 def list(path, = {}) WebHDFS.(, OPT_TABLE['LISTSTATUS']) res = operate_requests('GET', path, 'LISTSTATUS', ) WebHDFS.check_success_json(res, 'FileStatuses')['FileStatus'] end |
#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs
curl -i -X PUT “http://<HOST>:<PORT>/<PATH>?op=
MKDIRS[&permission=<OCTAL>]"
104 105 106 107 108 |
# File 'lib/webhdfs/client_v2.rb', line 104 def mkdir(path, = {}) WebHDFS.(, OPT_TABLE['MKDIRS']) res = operate_requests('PUT', path, 'MKDIRS', ) WebHDFS.check_success_json(res, 'boolean') end |
#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/webhdfs/client_v2.rb', line 221 def operate_requests(method, path, op, params = {}, payload = nil) request = WebHDFS::Request.new(@host, @port, ) if !@httpfs_mode && REDIRECTED_OPERATIONS.include?(op) response = request.execute(path, method, nil, nil, op, params, nil) unless response.is_a?(Net::HTTPRedirection) && response['location'] msg = 'NameNode returns non-redirection (or without location' \ " header), code:#{res.code}, body:#{res.body}." raise WebHDFS::RequestFailedError, msg end uri = URI.parse(response['location']) rpath = if uri.query uri.path + '?' + uri.query else uri.path end request = WebHDFS::Request.new(uri.host, uri.port, ) request.execute(rpath, method, { 'Content-Type' => 'application/octet-stream' }, payload, nil, {}) elsif @httpfs_mode && !payload.nil? request.execute(path, method, { 'Content-Type' => 'application/octet-stream' }, payload, op, params) else request.execute(path, method, nil, payload, op, params) end end |
#read(path, options = {}) ⇒ Object Also known as: open
curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
94 95 96 97 98 |
# File 'lib/webhdfs/client_v2.rb', line 94 def read(path, = {}) WebHDFS.(, OPT_TABLE['OPEN']) res = operate_requests('GET', path, 'OPEN', ) res.body end |
#rename(path, dest, options = {}) ⇒ Object
curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=
RENAME&destination=<PATH>"
114 115 116 117 118 119 120 |
# File 'lib/webhdfs/client_v2.rb', line 114 def rename(path, dest, = {}) WebHDFS.(, OPT_TABLE['RENAME']) dest = '/' + dest unless dest.start_with?('/') res = operate_requests('PUT', path, 'RENAME', .merge('destination' => dest)) WebHDFS.check_success_json(res, 'boolean') end |
#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION
[&replication=<SHORT>]"
196 197 198 199 200 201 |
# File 'lib/webhdfs/client_v2.rb', line 196 def replication(path, replnum, = {}) WebHDFS.(, OPT_TABLE['SETREPLICATION']) res = operate_requests('PUT', path, 'SETREPLICATION', .merge('replication' => replnum.to_s)) WebHDFS.check_success_json(res, 'boolean') end |
#request_options ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/webhdfs/client_v2.rb', line 59 def { username: @username, doas: @doas, proxy: @proxy, ssl: @ssl, kerberos: @kerberos, open_timeout: @open_timeout, read_timeout: @read_timeout, retry_known_errors: @retry_known_errors, retry_times: @retry_times, retry_interval: @retry_interval } end |
#stat(path, options = {}) ⇒ Object Also known as: getfilestatus
curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”
131 132 133 134 135 |
# File 'lib/webhdfs/client_v2.rb', line 131 def stat(path, = {}) WebHDFS.(, OPT_TABLE['GETFILESTATUS']) res = operate_requests('GET', path, 'GETFILESTATUS', ) WebHDFS.check_success_json(res, 'FileStatus') end |
#touch(path, options = {}) ⇒ Object Also known as: settimes
curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES
[&modificationtime=<TIME>][&accesstime=<TIME>]"
motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer
208 209 210 211 212 213 214 215 216 217 |
# File 'lib/webhdfs/client_v2.rb', line 208 def touch(path, = {}) WebHDFS.(, OPT_TABLE['SETTIMES']) unless .key?('modificationtime') || .key?('accesstime') || .key?(:modificationtime) || .key?(:accesstime) raise ArgumentError, "'chown' needs at least one of " \ 'modificationtime or accesstime' end res = operate_requests('PUT', path, 'SETTIMES', ) res.code == '200' end |