Module: Sip::Hive

Defined in:
lib/sip/hive.rb

Class Method Summary collapse

Class Method Details

.hdfs_ls(location) ⇒ Object



55
56
57
58
59
60
# File 'lib/sip/hive.rb', line 55

def Hive.hdfs_ls(location)
  result = `#{ENV['HADOOP_HOME']}/bin/hadoop dfs -lsr hdfs://#{location}`
  lines = result.split("\n")
  lines.shift
  lines.map { |l| l.split(" ").last }
end

.load_data_statements(tableconf, base_path) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/sip/hive.rb', line 43

def self.load_data_statements(tableconf, base_path)
  files = Hive.hdfs_ls(base_path).select { |p| p.slice(-5, 5) == '.part' }
  if tableconf.has_key? 'partition_by'
    files.map { |path|
      partval = path.split('/')[-2]
      "LOAD DATA INPATH '#{path}' INTO TABLE #{tableconf['hive_table_name']} PARTITION (#{tableconf['partition_by']}='#{partval}');"
    }
  else
    ["LOAD DATA INPATH '#{base_path}' INTO TABLE #{tableconf['hive_table_name']};"]
  end
end

.run(sipper, query) ⇒ Object

Raises:



21
22
23
24
25
# File 'lib/sip/hive.rb', line 21

def self.run(sipper, query)
  cmd = "#{ENV['HIVE_HOME']}/bin/hive -S -e \"#{query}\""
  sipper.log "Running Hive command: #{cmd}"
  raise HiveQueryException, "Could not execute hive command #{cmd}" if not system(cmd)
end

.run_file(sipper, path) ⇒ Object

Raises:



27
28
29
30
31
# File 'lib/sip/hive.rb', line 27

def self.run_file(sipper, path)
  cmd = "#{ENV['HIVE_HOME']}/bin/hive -S -f #{path}"
  sipper.log "Running Hive cmd: #{cmd}"
  raise HiveQueryException, "Could not execute hive command #{cmd}" if not system(cmd)
end

.run_hsql_create_table(sipper, db, tableconf, method) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/sip/hive.rb', line 3

def self.run_hsql_create_table(sipper, db, tableconf, method)
  if method == :overwrite
    Hive.run sipper, "DROP TABLE #{tableconf['hive_table_name']}"
  end

  partition = ""
  cols = db.hive_columns tableconf['tablename']
  if tableconf.has_key? 'partition_by'
    if not cols.map { |k,v| k }.include? tableconf['partition_by']
      raise NoSuchColumn, "Column to parition by '#{tableconf['partition_by']}' not found in table '#{tableconf['tablename']}'" 
    end
    partition_type = cols.select { |k,v| k == tableconf['partition_by'] }.map { |k,v| v }.first
    partition = "PARTITIONED BY (#{tableconf['partition_by']} #{partition_type})"
  end
  colstring = cols.select { |k,v| k != tableconf.fetch('partition_by', nil) }.map { |k,v| "#{k} #{v}" }.join(", ")
  Hive.run sipper, "CREATE TABLE IF NOT EXISTS #{tableconf['hive_table_name']} (#{colstring}) #{partition}"
end

.run_import(sipper, tableconf) ⇒ Object



33
34
35
36
37
38
39
40
41
# File 'lib/sip/hive.rb', line 33

def self.run_import(sipper, tableconf)
  base_path = "#{sipper.config['hdfs_tmpdir']}/#{tableconf['hive_table_name']}"
  stmts = Hive.load_data_statements tableconf, base_path
  path = File.join(sipper.config['tmpdir'], 'scripts', 'hive_import.hql')
  open(path, 'w') { |f|
    f.write stmts.join("\n")
  }
  Hive.run_file sipper, path
end