Class: Ganapati::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ganapati/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, port, timeout = 60) ⇒ Client

Returns a new instance of Client.



4
5
6
7
8
9
10
11
# File 'lib/ganapati/client.rb', line 4

def initialize(server, port, timeout=60)
  socket = Thrift::Socket.new(server, port)
  @transport = Thrift::BufferedTransport.new(socket)
  @transport.open
  protocol = Thrift::BinaryProtocol.new(@transport)
  @client = ThriftHadoopFileSystem::Client.new(protocol)
  @client.setInactivityTimeoutPeriod(timeout)
end

Class Method Details

.run(server, port) ⇒ Object



107
108
109
110
111
112
# File 'lib/ganapati/client.rb', line 107

def self.run(server, port)
  c = Client.new(server, port)
  result = yield c
  c.close
  result
end

Instance Method Details

#append(path, &block) ⇒ Object

for appending



66
67
68
# File 'lib/ganapati/client.rb', line 66

def append(path, &block)
  file_handle :append, path, &block
end

#chmod(path, mode) ⇒ Object



95
96
97
# File 'lib/ganapati/client.rb', line 95

def chmod(path, mode)
  @client.chmod pname(path), mode
end

#chown(path, owner, group) ⇒ Object



99
100
101
# File 'lib/ganapati/client.rb', line 99

def chown(path, owner, group)
  @client.chown pname(path), owner, group
end

#closeObject



13
14
15
# File 'lib/ganapati/client.rb', line 13

def close
  @transport.close
end

#create(path, &block) ⇒ Object

for writing to a new file



56
57
58
# File 'lib/ganapati/client.rb', line 56

def create(path, &block)
  file_handle :create, path, &block
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/ganapati/client.rb', line 82

def exists?(path)
  @client.exists pname(path)
end

#get(remotepath, destpath) ⇒ Object

copy remote file to local



35
36
37
38
39
40
41
# File 'lib/ganapati/client.rb', line 35

def get(remotepath, destpath)
  Kernel.open(destpath, 'w') { |dest|
    readchunks(remotepath) { |chunk|
      dest.write chunk
    }
  }
end

#ls(path, details = false) ⇒ Object



90
91
92
93
# File 'lib/ganapati/client.rb', line 90

def ls(path, details=false)
  statuses = @client.listStatus pname(path)
  (details) ? statuses : statuses.map { |s| s.path }
end

#mkdir(path) ⇒ Object



78
79
80
# File 'lib/ganapati/client.rb', line 78

def mkdir(path)
  @client.mkdirs pname(path)
end

#mv(source, dest) ⇒ Object



74
75
76
# File 'lib/ganapati/client.rb', line 74

def mv(source, dest)
  @client.rename pname(source), pname(dest)
end

#open(path, &block) ⇒ Object

for reading



61
62
63
# File 'lib/ganapati/client.rb', line 61

def open(path, &block)
  file_handle :open, path, &block
end

#put(localpath, destpath) ⇒ Object

copy local file to remote



23
24
25
26
27
28
29
30
31
32
# File 'lib/ganapati/client.rb', line 23

def put(localpath, destpath)
  create(destpath) { |dest|
    Kernel.open(localpath) { |source|
      # read 1 MB at a time
      while record = source.read(1048576)
        dest.write(record)
      end
    }
  }
end

#readchunks(path, chunksize = 1048576) ⇒ Object

yeild chunksize of path one chunk at a time



44
45
46
47
48
49
50
51
52
53
# File 'lib/ganapati/client.rb', line 44

def readchunks(path, chunksize=1048576)
  open(path) { |source|
    size = source.length
    index = 0
    while index < size
      yield source.read(index, chunksize)
      index += chunksize
    end
  }
end

#rm(path, recursive = false) ⇒ Object



70
71
72
# File 'lib/ganapati/client.rb', line 70

def rm(path, recursive=false)
  @client.rm pname(path), recursive
end

#set_replication(path, level) ⇒ Object



103
104
105
# File 'lib/ganapati/client.rb', line 103

def set_replication(path, level)
  @client.setReplication pname(path), level
end

#shutdown(status = 0) ⇒ Object

shutdown the thrift server



18
19
20
# File 'lib/ganapati/client.rb', line 18

def shutdown(status=0)
  @client.shutdown status
end

#stat(path) ⇒ Object



86
87
88
# File 'lib/ganapati/client.rb', line 86

def stat(path)
  @client.stat pname(path)
end