Class: PatriotHadoop::Command::HiveCommand

Inherits:
Patriot::Command::Base
  • Object
show all
Includes:
Ext::Hive
Defined in:
lib/patriot_hadoop/command/hive.rb

Constant Summary

Constants included from Ext::Hive

Ext::Hive::HIVE_MAX_ERROR_MSG_SIZE

Instance Method Summary collapse

Methods included from Ext::Hive

#_compress_option, #_execute_hivequery_internal, #execute_hivequery, included

Instance Method Details

#_add_udfs(hive_ql, udfs) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/patriot_hadoop/command/hive.rb', line 70

def _add_udfs(hive_ql, udfs)
  return hive_ql if udfs.nil?
  register = ""
  udfs = [udfs] unless udfs.is_a?(Array)
  udfs.each do |udf|
    register += "add jar #{udf['jar']};"
    functions = udf['functions']
    functions = [functions] unless functions.is_a?(Array)
    functions.each do |f|
      register += "create temporary function #{f['name']} as \"#{f['class']}\";"
    end
  end
  return "#{register}#{hive_ql}"
end

#_create_hivequery_tmpfile(hive_ql, tmpfile, opt = {}) ⇒ Object



57
58
59
60
61
# File 'lib/patriot_hadoop/command/hive.rb', line 57

def _create_hivequery_tmpfile(hive_ql, tmpfile, opt={})
  hive_ql = _add_udfs(hive_ql, opt[:udf]) if opt.has_key?(:udf)
  hive_ql = "#{_set_hive_property_prefix(opt[:props])}#{hive_ql}" if opt.has_key?(:props)
  File.write(tmpfile, hive_ql)
end

#_create_output_filename(output_prefix, compression) ⇒ Object

true / ‘gzip’ / ‘bzip2’ are available.



45
46
47
48
49
50
51
52
53
54
# File 'lib/patriot_hadoop/command/hive.rb', line 45

def _create_output_filename(output_prefix, compression)
  output_file = output_prefix + '.tsv'
  case compression
  when true, 'gzip'
    output_file += '.gz'
  when 'bzip2'
    output_file += '.bz2'
  end
  return output_file
end

#_set_hive_property_prefix(props = {}) ⇒ Object



64
65
66
67
# File 'lib/patriot_hadoop/command/hive.rb', line 64

def _set_hive_property_prefix(props={})
  return "" if props.nil?
  return props.map{|k,v| "set #{k}=#{v};"}.join
end

#executeObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/patriot_hadoop/command/hive.rb', line 16

def execute
  @logger.info "start hive"

  opt = {}
  opt[:udf] = @udf unless @udf.nil?
  opt[:props] = @props unless @props.nil?

  output_prefix = @output_prefix.nil? ? File.join('/tmp', job_id()) : @output_prefix
  output_directory = File.dirname(output_prefix)
  if not Dir.exist?(output_directory)
    FileUtils.mkdir_p(output_directory)
  end

  tmpfile = output_prefix + '.hql'
  _create_hivequery_tmpfile(@hive_ql, tmpfile, opt)
  output_file = _create_output_filename(output_prefix, @compression)

  execute_hivequery(tmpfile, output_file, @exec_user)

  if File.zero?(output_file)
    @logger.warn "#{@hive_ql} generated empty result"
    return
  end

  @logger.info "end hive"
end

#job_idObject



9
10
11
12
13
# File 'lib/patriot_hadoop/command/hive.rb', line 9

def job_id
  job_id = "#{command_name}"
  job_id = "#{job_id}_#{@name_suffix}" unless @name_suffix.nil?
  return job_id
end