Class: Ganapati::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ganapati/client.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, port, timeout = 60) ⇒ Client

Returns a new instance of Client.



4
5
6
7
8
9
10
11
12
# File 'lib/ganapati/client.rb', line 4

def initialize(server, port, timeout=60)
  socket = Thrift::Socket.new(server, port)
  @transport = Thrift::BufferedTransport.new(socket)
  @transport.open
  protocol = Thrift::BinaryProtocol.new(@transport)
  @client = ThriftHadoopFileSystem::Client.new(protocol)
  # some versions of ruby + thrift cause an error if the following is called
  @client.setInactivityTimeoutPeriod(timeout) unless timeout.nil?
end

Class Method Details

.run(server, port) ⇒ Object



126
127
128
129
130
131
# File 'lib/ganapati/client.rb', line 126

def self.run(server, port)
  c = Client.new(server, port)
  result = yield c
  c.close
  result
end

Instance Method Details

#append(path, &block) ⇒ Object

for appending



83
84
85
# File 'lib/ganapati/client.rb', line 83

def append(path, &block)
  file_handle :append, path, &block
end

#chmod(path, mode) ⇒ Object



114
115
116
# File 'lib/ganapati/client.rb', line 114

def chmod(path, mode)
  @client.chmod pname(path), mode
end

#chown(path, owner, group) ⇒ Object



118
119
120
# File 'lib/ganapati/client.rb', line 118

def chown(path, owner, group)
  @client.chown pname(path), owner, group
end

#closeObject



14
15
16
# File 'lib/ganapati/client.rb', line 14

def close
  @transport.close
end

#create(path, &block) ⇒ Object

for writing to a new file



73
74
75
# File 'lib/ganapati/client.rb', line 73

def create(path, &block)
  file_handle :create, path, &block
end

#exists?(path) ⇒ Boolean

Returns:

  • (Boolean)


99
100
101
# File 'lib/ganapati/client.rb', line 99

def exists?(path)
  @client.exists pname(path)
end

#get(remotepath, destpath) ⇒ Object

copy remote file to local



36
37
38
39
40
41
42
# File 'lib/ganapati/client.rb', line 36

def get(remotepath, destpath)
  Kernel.open(destpath, 'w') { |dest|
    readchunks(remotepath) { |chunk|
      dest.write chunk
    }
  }
end

#ls(path, details = false, recursive = false) ⇒ Object



107
108
109
110
111
112
# File 'lib/ganapati/client.rb', line 107

def ls(path, details=false, recursive=false)
  statuses = @client.listStatus pname(path)
  paths = (details) ? statuses : statuses.map { |s| s.path }
  return paths if not recursive
  paths + statuses.select { |s| s.isdir }.map { |s| ls(s.path, details, recursive) }.flatten
end

#mkdir(path) ⇒ Object



95
96
97
# File 'lib/ganapati/client.rb', line 95

def mkdir(path)
  @client.mkdirs pname(path)
end

#mv(source, dest) ⇒ Object



91
92
93
# File 'lib/ganapati/client.rb', line 91

def mv(source, dest)
  @client.rename pname(source), pname(dest)
end

#open(path, &block) ⇒ Object

for reading



78
79
80
# File 'lib/ganapati/client.rb', line 78

def open(path, &block)
  file_handle :open, path, &block
end

#put(localpath, destpath) ⇒ Object

copy local file to remote



24
25
26
27
28
29
30
31
32
33
# File 'lib/ganapati/client.rb', line 24

def put(localpath, destpath)
  create(destpath) { |dest|
    Kernel.open(localpath) { |source|
      # read 1 MB at a time
      while record = source.read(1048576)
        dest.write(record)
      end
    }
  }
end

#readchunks(path, chunksize = 1048576) ⇒ Object

yeild chunksize of path one chunk at a time



45
46
47
48
49
50
51
52
53
54
# File 'lib/ganapati/client.rb', line 45

def readchunks(path, chunksize=1048576)
  open(path) { |source|
    size = source.length
    index = 0
    while index < size
      yield source.read(index, chunksize)
      index += chunksize
    end
  }
end

#readlines(path, sep = "\n") {|buffer| ... } ⇒ Object

Yields:

  • (buffer)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/ganapati/client.rb', line 56

def readlines(path, sep="\n")
  buffer = ""

  readchunks(path) { |chunk|
    buffer << chunk

    # partitions[1] will be empty if sep does not exist in the string
    while partitions = buffer.partition(sep) and !partitions[1].empty?
      yield partitions.first
      buffer = partitions.last
    end
  }

  yield buffer if buffer.size > 0
end

#rm(path, recursive = false) ⇒ Object



87
88
89
# File 'lib/ganapati/client.rb', line 87

def rm(path, recursive=false)
  @client.rm pname(path), recursive
end

#set_replication(path, level) ⇒ Object



122
123
124
# File 'lib/ganapati/client.rb', line 122

def set_replication(path, level)
  @client.setReplication pname(path), level
end

#shutdown(status = 0) ⇒ Object

shutdown the thrift server



19
20
21
# File 'lib/ganapati/client.rb', line 19

def shutdown(status=0)
  @client.shutdown status
end

#stat(path) ⇒ Object



103
104
105
# File 'lib/ganapati/client.rb', line 103

def stat(path)
  @client.stat pname(path)
end