Module: Sip::Hive
- Defined in:
- lib/sip/hive.rb
Class Method Summary collapse
- .hdfs_ls(location) ⇒ Object
- .load_data_statements(tableconf, base_path) ⇒ Object
- .run(sipper, query) ⇒ Object
- .run_file(sipper, path) ⇒ Object
- .run_hsql_create_table(sipper, db, tableconf, method) ⇒ Object
- .run_import(sipper, tableconf) ⇒ Object
Class Method Details
.hdfs_ls(location) ⇒ Object
55 56 57 58 59 60 |
# File 'lib/sip/hive.rb', line 55 def Hive.hdfs_ls(location) result = `#{ENV['HADOOP_HOME']}/bin/hadoop dfs -lsr hdfs://#{location}` lines = result.split("\n") lines.shift lines.map { |l| l.split(" ").last } end |
.load_data_statements(tableconf, base_path) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sip/hive.rb', line 43 def self.load_data_statements(tableconf, base_path) files = Hive.hdfs_ls(base_path).select { |p| p.slice(-5, 5) == '.part' } if tableconf.has_key? 'partition_by' files.map { |path| partval = path.split('/')[-2] "LOAD DATA INPATH '#{path}' INTO TABLE #{tableconf['hive_table_name']} PARTITION (#{tableconf['partition_by']}='#{partval}');" } else ["LOAD DATA INPATH '#{base_path}' INTO TABLE #{tableconf['hive_table_name']};"] end end |
.run(sipper, query) ⇒ Object
21 22 23 24 25 |
# File 'lib/sip/hive.rb', line 21 def self.run(sipper, query) cmd = "#{ENV['HIVE_HOME']}/bin/hive -S -e \"#{query}\"" sipper.log "Running Hive command: #{cmd}" raise HiveQueryException, "Could not execute hive command #{cmd}" if not system(cmd) end |
.run_file(sipper, path) ⇒ Object
27 28 29 30 31 |
# File 'lib/sip/hive.rb', line 27 def self.run_file(sipper, path) cmd = "#{ENV['HIVE_HOME']}/bin/hive -S -f #{path}" sipper.log "Running Hive cmd: #{cmd}" raise HiveQueryException, "Could not execute hive command #{cmd}" if not system(cmd) end |
.run_hsql_create_table(sipper, db, tableconf, method) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/sip/hive.rb', line 3 def self.run_hsql_create_table(sipper, db, tableconf, method) if method == :overwrite Hive.run sipper, "DROP TABLE #{tableconf['hive_table_name']}" end partition = "" cols = db.hive_columns tableconf['tablename'] if tableconf.has_key? 'partition_by' if not cols.map { |k,v| k }.include? tableconf['partition_by'] raise NoSuchColumn, "Column to parition by '#{tableconf['partition_by']}' not found in table '#{tableconf['tablename']}'" end partition_type = cols.select { |k,v| k == tableconf['partition_by'] }.map { |k,v| v }.first partition = "PARTITIONED BY (#{tableconf['partition_by']} #{partition_type})" end colstring = cols.select { |k,v| k != tableconf.fetch('partition_by', nil) }.map { |k,v| "#{k} #{v}" }.join(", ") Hive.run sipper, "CREATE TABLE IF NOT EXISTS #{tableconf['hive_table_name']} (#{colstring}) #{partition}" end |
.run_import(sipper, tableconf) ⇒ Object
33 34 35 36 37 38 39 40 41 |
# File 'lib/sip/hive.rb', line 33 def self.run_import(sipper, tableconf) base_path = "#{sipper.config['hdfs_tmpdir']}/#{tableconf['hive_table_name']}" stmts = Hive.load_data_statements tableconf, base_path path = File.join(sipper.config['tmpdir'], 'scripts', 'hive_import.hql') open(path, 'w') { |f| f.write stmts.join("\n") } Hive.run_file sipper, path end |