Class: WebHDFS::ClientV2

Inherits:
Object
  • Object
show all
Defined in:
lib/webhdfs/client_v2.rb

Direct Known Subclasses

Client

Constant Summary collapse

OPT_TABLE =

This hash table holds command options.

{}.freeze
REDIRECTED_OPERATIONS =
%w(APPEND CREATE OPEN GETFILECHECKSUM).freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = 50_070, username = nil, doas = nil, proxy_address = nil, proxy_port = nil, http_headers = {}) ⇒ ClientV2

Returns a new instance of ClientV2.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/webhdfs/client_v2.rb', line 38

def initialize(host = 'localhost', port = 50_070, username = nil,
               doas = nil, proxy_address = nil, proxy_port = nil,
               http_headers = {})
  @host = host
  @port = port
  @username = username
  @doas = doas

  @proxy = WebHDFS::Proxy.new(proxy_address, proxy_port) if proxy_address && proxy_port

  @retry_known_errors = false
  @retry_times = @retry_interval = 1

  @httpfs_mode = false

  @ssl = nil

  @kerberos = nil
  @http_headers = http_headers
end

Instance Attribute Details

#doasObject

Returns the value of attribute doas.



14
15
16
# File 'lib/webhdfs/client_v2.rb', line 14

def doas
  @doas
end

#hostObject

Returns the value of attribute host.



14
15
16
# File 'lib/webhdfs/client_v2.rb', line 14

def host
  @host
end

#http_headersObject

Returns the value of attribute http_headers.



23
24
25
# File 'lib/webhdfs/client_v2.rb', line 23

def http_headers
  @http_headers
end

#httpfs_modeObject

Returns the value of attribute httpfs_mode.



17
18
19
# File 'lib/webhdfs/client_v2.rb', line 17

def httpfs_mode
  @httpfs_mode
end

#kerberosObject

Returns the value of attribute kerberos.



22
23
24
# File 'lib/webhdfs/client_v2.rb', line 22

def kerberos
  @kerberos
end

#open_timeoutObject

default 30s (in ruby net/http)



15
16
17
# File 'lib/webhdfs/client_v2.rb', line 15

def open_timeout
  @open_timeout
end

#portObject

Returns the value of attribute port.



14
15
16
# File 'lib/webhdfs/client_v2.rb', line 14

def port
  @port
end

#proxyObject

Returns the value of attribute proxy.



14
15
16
# File 'lib/webhdfs/client_v2.rb', line 14

def proxy
  @proxy
end

#read_timeoutObject

default 60s (in ruby net/http)



16
17
18
# File 'lib/webhdfs/client_v2.rb', line 16

def read_timeout
  @read_timeout
end

#retry_intervalObject

default 1 ([sec], ignored when retry_known_errors is false)



20
21
22
# File 'lib/webhdfs/client_v2.rb', line 20

def retry_interval
  @retry_interval
end

#retry_known_errorsObject

default false (not to retry)



18
19
20
# File 'lib/webhdfs/client_v2.rb', line 18

def retry_known_errors
  @retry_known_errors
end

#retry_timesObject

default 1 (ignored when retry_known_errors is false)



19
20
21
# File 'lib/webhdfs/client_v2.rb', line 19

def retry_times
  @retry_times
end

#sslObject

Returns the value of attribute ssl.



21
22
23
# File 'lib/webhdfs/client_v2.rb', line 21

def ssl
  @ssl
end

#usernameObject

Returns the value of attribute username.



14
15
16
# File 'lib/webhdfs/client_v2.rb', line 14

def username
  @username
end

Instance Method Details

#append(path, body, options = {}) ⇒ Object

curl -i -X POST “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=

APPEND[&buffersize=<INT>]"


85
86
87
88
89
90
# File 'lib/webhdfs/client_v2.rb', line 85

def append(path, body, options = {})
  options = options.merge('data' => 'true') if @httpfs_mode
  WebHDFS.check_options(options, OPT_TABLE['APPEND'])
  res = operate_requests('POST', path, 'APPEND', options, body)
  res.code == '200'
end

#checksum(path, options = {}) ⇒ Object Also known as: getfilechecksum

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM”



155
156
157
158
159
# File 'lib/webhdfs/client_v2.rb', line 155

def checksum(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['GETFILECHECKSUM'])
  res = operate_requests('GET', path, 'GETFILECHECKSUM', options)
  WebHDFS.check_success_json(res, 'FileChecksum')
end

#chmod(path, mode, options = {}) ⇒ Object Also known as: setpermission

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION

[&permission=<OCTAL>]"


172
173
174
175
176
177
# File 'lib/webhdfs/client_v2.rb', line 172

def chmod(path, mode, options = {})
  WebHDFS.check_options(options, OPT_TABLE['SETPERMISSION'])
  res = operate_requests('PUT', path, 'SETPERMISSION',
                         options.merge('permission' => mode))
  res.code == '200'
end

#chown(path, options = {}) ⇒ Object Also known as: setowner

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER

[&owner=<USER>][&group=<GROUP>]"


182
183
184
185
186
187
188
189
190
# File 'lib/webhdfs/client_v2.rb', line 182

def chown(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['SETOWNER'])
  unless options.key?('owner') || options.key?('group') ||
         options.key?(:owner) || options.key?(:group)
    raise ArgumentError, "'chown' needs at least one of owner or group"
  end
  res = operate_requests('PUT', path, 'SETOWNER', options)
  res.code == '200'
end

#content_summary(path, options = {}) ⇒ Object Also known as: getcontentsummary

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY”



147
148
149
150
151
# File 'lib/webhdfs/client_v2.rb', line 147

def content_summary(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['GETCONTENTSUMMARY'])
  res = operate_requests('GET', path, 'GETCONTENTSUMMARY', options)
  WebHDFS.check_success_json(res, 'ContentSummary')
end

#create(path, body, options = {}) ⇒ Object

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE

[&overwrite=<true|false>][&blocksize=<LONG>]
[&replication=<SHORT>]
[&permission=<OCTAL>][&buffersize=<INT>]"


76
77
78
79
80
81
# File 'lib/webhdfs/client_v2.rb', line 76

def create(path, body, options = {})
  options = options.merge('data' => 'true') if @httpfs_mode
  WebHDFS.check_options(options, OPT_TABLE['CREATE'])
  res = operate_requests('PUT', path, 'CREATE', options, body)
  res.code == '201'
end

#delete(path, options = {}) ⇒ Object

curl -i -X DELETE “http://<host>:<port>/webhdfs/v1/<path>?op=DELETE

[&recursive=<true|false>]"


124
125
126
127
128
# File 'lib/webhdfs/client_v2.rb', line 124

def delete(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['DELETE'])
  res = operate_requests('DELETE', path, 'DELETE', options)
  WebHDFS.check_success_json(res, 'boolean')
end

#homedir(options = {}) ⇒ Object Also known as: gethomedirectory

curl -i “http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY”



163
164
165
166
167
# File 'lib/webhdfs/client_v2.rb', line 163

def homedir(options = {})
  WebHDFS.check_options(options, OPT_TABLE['GETHOMEDIRECTORY'])
  res = operate_requests('GET', '/', 'GETHOMEDIRECTORY', options)
  WebHDFS.check_success_json(res, 'Path')
end

#list(path, options = {}) ⇒ Object Also known as: liststatus

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS”



139
140
141
142
143
# File 'lib/webhdfs/client_v2.rb', line 139

def list(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['LISTSTATUS'])
  res = operate_requests('GET', path, 'LISTSTATUS', options)
  WebHDFS.check_success_json(res, 'FileStatuses')['FileStatus']
end

#mkdir(path, options = {}) ⇒ Object Also known as: mkdirs

curl -i -X PUT “http://<HOST>:<PORT>/<PATH>?op=

MKDIRS[&permission=<OCTAL>]"


104
105
106
107
108
# File 'lib/webhdfs/client_v2.rb', line 104

def mkdir(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['MKDIRS'])
  res = operate_requests('PUT', path, 'MKDIRS', options)
  WebHDFS.check_success_json(res, 'boolean')
end

#operate_requests(method, path, op, params = {}, payload = nil) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/webhdfs/client_v2.rb', line 221

def operate_requests(method, path, op, params = {}, payload = nil)
  request = WebHDFS::Request.new(@host, @port, request_options)
  if !@httpfs_mode && REDIRECTED_OPERATIONS.include?(op)
    response = request.execute(path, method, nil, nil, op, params, nil)
    unless response.is_a?(Net::HTTPRedirection) && response['location']
      msg = 'NameNode returns non-redirection (or without location' \
            " header), code:#{res.code}, body:#{res.body}."
      raise WebHDFS::RequestFailedError, msg
    end
    uri = URI.parse(response['location'])
    rpath = if uri.query
              uri.path + '?' + uri.query
            else
              uri.path
            end
    request = WebHDFS::Request.new(uri.host, uri.port, request_options)
    request.execute(rpath, method,
                    { 'Content-Type' => 'application/octet-stream' },
                    payload, nil, {})
  elsif @httpfs_mode && !payload.nil?
    request.execute(path, method,
                    { 'Content-Type' => 'application/octet-stream' },
                    payload, op, params)
  else
    request.execute(path, method, nil, payload, op, params)
  end
end

#read(path, options = {}) ⇒ Object Also known as: open

curl -i -L “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN

[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"


94
95
96
97
98
# File 'lib/webhdfs/client_v2.rb', line 94

def read(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['OPEN'])
  res = operate_requests('GET', path, 'OPEN', options)
  res.body
end

#rename(path, dest, options = {}) ⇒ Object

curl -i -X PUT “<HOST>:<PORT>/webhdfs/v1/<PATH>?op=

RENAME&destination=<PATH>"


114
115
116
117
118
119
120
# File 'lib/webhdfs/client_v2.rb', line 114

def rename(path, dest, options = {})
  WebHDFS.check_options(options, OPT_TABLE['RENAME'])
  dest = '/' + dest unless dest.start_with?('/')
  res = operate_requests('PUT', path, 'RENAME',
                         options.merge('destination' => dest))
  WebHDFS.check_success_json(res, 'boolean')
end

#replication(path, replnum, options = {}) ⇒ Object Also known as: setreplication

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION

[&replication=<SHORT>]"


196
197
198
199
200
201
# File 'lib/webhdfs/client_v2.rb', line 196

def replication(path, replnum, options = {})
  WebHDFS.check_options(options, OPT_TABLE['SETREPLICATION'])
  res = operate_requests('PUT', path, 'SETREPLICATION',
                         options.merge('replication' => replnum.to_s))
  WebHDFS.check_success_json(res, 'boolean')
end

#request_optionsObject



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/webhdfs/client_v2.rb', line 59

def request_options
  {
    username: @username,
    doas: @doas,
    proxy: @proxy,
    ssl: @ssl,
    kerberos: @kerberos,
    open_timeout: @open_timeout, read_timeout: @read_timeout,
    retry_known_errors: @retry_known_errors,
    retry_times: @retry_times, retry_interval: @retry_interval
  }
end

#stat(path, options = {}) ⇒ Object Also known as: getfilestatus

curl -i “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS”



131
132
133
134
135
# File 'lib/webhdfs/client_v2.rb', line 131

def stat(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['GETFILESTATUS'])
  res = operate_requests('GET', path, 'GETFILESTATUS', options)
  WebHDFS.check_success_json(res, 'FileStatus')
end

#touch(path, options = {}) ⇒ Object Also known as: settimes

curl -i -X PUT “http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES

[&modificationtime=<TIME>][&accesstime=<TIME>]"

motidicationtime: radix-10 logn integer accesstime: radix-10 logn integer



208
209
210
211
212
213
214
215
216
217
# File 'lib/webhdfs/client_v2.rb', line 208

def touch(path, options = {})
  WebHDFS.check_options(options, OPT_TABLE['SETTIMES'])
  unless options.key?('modificationtime') || options.key?('accesstime') ||
         options.key?(:modificationtime) || options.key?(:accesstime)
    raise ArgumentError, "'chown' needs at least one of " \
                           'modificationtime or accesstime'
  end
  res = operate_requests('PUT', path, 'SETTIMES', options)
  res.code == '200'
end