Class: Ganapati::Client
- Inherits:
-
Object
- Object
- Ganapati::Client
- Defined in:
- lib/ganapati/client.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#append(path, &block) ⇒ Object
for appending.
- #chmod(path, mode) ⇒ Object
- #chown(path, owner, group) ⇒ Object
- #close ⇒ Object
-
#create(path, &block) ⇒ Object
for writing to a new file.
- #exists?(path) ⇒ Boolean
-
#get(remotepath, destpath) ⇒ Object
copy remote file to local.
-
#initialize(server, port, timeout = 60) ⇒ Client
constructor
A new instance of Client.
- #ls(path, details = false, recursive = false) ⇒ Object
- #mkdir(path) ⇒ Object
- #mv(source, dest) ⇒ Object
-
#open(path, &block) ⇒ Object
for reading.
-
#put(localpath, destpath) ⇒ Object
copy local file to remote.
-
#readchunks(path, chunksize = 1048576) ⇒ Object
yeild chunksize of path one chunk at a time.
- #readlines(path, sep = "\n") {|buffer| ... } ⇒ Object
- #rm(path, recursive = false) ⇒ Object
- #set_replication(path, level) ⇒ Object
-
#shutdown(status = 0) ⇒ Object
shutdown the thrift server.
- #stat(path) ⇒ Object
Constructor Details
#initialize(server, port, timeout = 60) ⇒ Client
Returns a new instance of Client.
4 5 6 7 8 9 10 11 12 |
# File 'lib/ganapati/client.rb', line 4 def initialize(server, port, timeout=60) socket = Thrift::Socket.new(server, port) @transport = Thrift::BufferedTransport.new(socket) @transport.open protocol = Thrift::BinaryProtocol.new(@transport) @client = ThriftHadoopFileSystem::Client.new(protocol) # some versions of ruby + thrift cause an error if the following is called @client.setInactivityTimeoutPeriod(timeout) unless timeout.nil? end |
Class Method Details
Instance Method Details
#append(path, &block) ⇒ Object
for appending
83 84 85 |
# File 'lib/ganapati/client.rb', line 83 def append(path, &block) file_handle :append, path, &block end |
#chmod(path, mode) ⇒ Object
114 115 116 |
# File 'lib/ganapati/client.rb', line 114 def chmod(path, mode) @client.chmod pname(path), mode end |
#chown(path, owner, group) ⇒ Object
118 119 120 |
# File 'lib/ganapati/client.rb', line 118 def chown(path, owner, group) @client.chown pname(path), owner, group end |
#close ⇒ Object
14 15 16 |
# File 'lib/ganapati/client.rb', line 14 def close @transport.close end |
#create(path, &block) ⇒ Object
for writing to a new file
73 74 75 |
# File 'lib/ganapati/client.rb', line 73 def create(path, &block) file_handle :create, path, &block end |
#exists?(path) ⇒ Boolean
99 100 101 |
# File 'lib/ganapati/client.rb', line 99 def exists?(path) @client.exists pname(path) end |
#get(remotepath, destpath) ⇒ Object
copy remote file to local
36 37 38 39 40 41 42 |
# File 'lib/ganapati/client.rb', line 36 def get(remotepath, destpath) Kernel.open(destpath, 'w') { |dest| readchunks(remotepath) { |chunk| dest.write chunk } } end |
#ls(path, details = false, recursive = false) ⇒ Object
107 108 109 110 111 112 |
# File 'lib/ganapati/client.rb', line 107 def ls(path, details=false, recursive=false) statuses = @client.listStatus pname(path) paths = (details) ? statuses : statuses.map { |s| s.path } return paths if not recursive paths + statuses.select { |s| s.isdir }.map { |s| ls(s.path, details, recursive) }.flatten end |
#mkdir(path) ⇒ Object
95 96 97 |
# File 'lib/ganapati/client.rb', line 95 def mkdir(path) @client.mkdirs pname(path) end |
#mv(source, dest) ⇒ Object
91 92 93 |
# File 'lib/ganapati/client.rb', line 91 def mv(source, dest) @client.rename pname(source), pname(dest) end |
#open(path, &block) ⇒ Object
for reading
78 79 80 |
# File 'lib/ganapati/client.rb', line 78 def open(path, &block) file_handle :open, path, &block end |
#put(localpath, destpath) ⇒ Object
copy local file to remote
24 25 26 27 28 29 30 31 32 33 |
# File 'lib/ganapati/client.rb', line 24 def put(localpath, destpath) create(destpath) { |dest| Kernel.open(localpath) { |source| # read 1 MB at a time while record = source.read(1048576) dest.write(record) end } } end |
#readchunks(path, chunksize = 1048576) ⇒ Object
yeild chunksize of path one chunk at a time
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/ganapati/client.rb', line 45 def readchunks(path, chunksize=1048576) open(path) { |source| size = source.length index = 0 while index < size yield source.read(index, chunksize) index += chunksize end } end |
#readlines(path, sep = "\n") {|buffer| ... } ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/ganapati/client.rb', line 56 def readlines(path, sep="\n") buffer = "" readchunks(path) { |chunk| buffer << chunk # partitions[1] will be empty if sep does not exist in the string while partitions = buffer.partition(sep) and !partitions[1].empty? yield partitions.first buffer = partitions.last end } yield buffer if buffer.size > 0 end |
#rm(path, recursive = false) ⇒ Object
87 88 89 |
# File 'lib/ganapati/client.rb', line 87 def rm(path, recursive=false) @client.rm pname(path), recursive end |
#set_replication(path, level) ⇒ Object
122 123 124 |
# File 'lib/ganapati/client.rb', line 122 def set_replication(path, level) @client.setReplication pname(path), level end |
#shutdown(status = 0) ⇒ Object
shutdown the thrift server
19 20 21 |
# File 'lib/ganapati/client.rb', line 19 def shutdown(status=0) @client.shutdown status end |
#stat(path) ⇒ Object
103 104 105 |
# File 'lib/ganapati/client.rb', line 103 def stat(path) @client.stat pname(path) end |