Class: Sip::Sipper

Inherits:
Object
  • Object
show all
Defined in:
lib/sip/sipper.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Sipper

Returns a new instance of Sipper.



5
6
7
8
9
10
11
12
13
14
15
# File 'lib/sip/sipper.rb', line 5

def initialize(config)
  @config = config
  slavefile = File.join(ENV['HADOOP_HOME'], 'conf', 'slaves')
  log "Reading slaves from file #{slavefile}..."
  begin
    open(slavefile, 'r') { |f| @slaves = f.read.split("\n") }
  rescue
    raise HadoopException, "Could not read \"#{slavefile}\".  Is your HADOOP_HOME environment variable correct?"
  end
  Utils::sanity_check(@config)
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



3
4
5
# File 'lib/sip/sipper.rb', line 3

def config
  @config
end

Instance Method Details

#create_script(slavename, index, dbconf, tableconf, db, max, method) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/sip/sipper.rb', line 68

def create_script(slavename, index, dbconf, tableconf, db, max, method)
  if method == :append
    first, last = get_even_split(tableconf['incremental_index_value']+1, max, index, @slaves.length)
  else
    first, last = 1, max
  end

  log "Importing #{first} <= #{tableconf['incremental_index']} <= #{last} from table #{dbconf['dbname']}.#{tableconf['tablename']}"
  select = db.generate_command tableconf, first, last
  transpart_opts = generate_transpart_options(tableconf, db)
  Utils::write_script self, slavename, select, dbconf['dbname'], tableconf['tablename'], transpart_opts
end

#create_script_without_index(dbconf, tableconf, db) ⇒ Object

this is the case where there is no primary key index, so the whole table will need to be imported



40
41
42
43
44
45
46
47
# File 'lib/sip/sipper.rb', line 40

def create_script_without_index(dbconf, tableconf, db)
  Hive::run_hsql_create_table self, db, tableconf, :overwrite

  log "Importing all rows from table #{dbconf['dbname']}.#{tableconf['tablename']}"
  select = db.generate_command tableconf
  transpart_opts = generate_transpart_options(tableconf, db)
  @scripts << Utils::write_script(self, @slaves.first, select, dbconf['dbname'], tableconf['tablename'], transpart_opts)
end

#create_scripts(dbconf, tableconf) ⇒ Object

return number of scripts created



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/sip/sipper.rb', line 22

def create_scripts(dbconf, tableconf)
  @scripts = []
  db = DBBase.make_interface dbconf['type'], dbconf, self

  # set default columns if necessary
  tableconf['columns'] = db.columns(tableconf['tablename']).map { |c| c.first } if tableconf['columns'].nil?

  if tableconf['incremental_index'].nil?
    create_script_without_index(dbconf, tableconf, db)
  else
    create_scripts_with_index(dbconf, tableconf, db)
  end

  @scripts.length
end

#create_scripts_with_index(dbconf, tableconf, db) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/sip/sipper.rb', line 49

def create_scripts_with_index(dbconf, tableconf, db)
  max = db.get_column_max tableconf['tablename'], tableconf['incremental_index']

  method = (tableconf['method'] == "append" and not @config[:overwrite]) ? :append : :overwrite
  if method == :append and max == tableconf['incremental_index_value']
    log "Ignoring #{dbconf['dbname']}.#{tableconf['tablename']} - already up to date"
  else
    Hive::run_hsql_create_table self, db, tableconf, method
    @slaves.each_with_index { |slavename, index|
      @scripts << create_script(slavename, index, dbconf, tableconf, db, max, method)
    }
    db.close
  end

  # update incremental index value if method in conf is append, regardless of whether or 
  # not this is a forced overwrite
  tableconf['incremental_index_value'] = max if tableconf['method'] == "append"
end

#generate_transpart_options(tableconf, db) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/sip/sipper.rb', line 81

def generate_transpart_options(tableconf, db)
  opts = CmdOpts.new
  opts['c'] = db.order_column_list(tableconf['tablename'], tableconf['columns']).join(',')
  opts['p'] = tableconf['partition_by'] if tableconf.has_key? "partition_by"
  if tableconf.has_key? 'transformations' and tableconf['transformations'].length > 0
    opts['t'] = tableconf['transformations'].map { |k,v| "#{k}:#{v}" }.join(',')
  end
  opts['H'] = tableconf['hive_table_name']
  opts['o'] = File.join(@config['tmpdir'], 'partitions')
  opts.set('d') if @config[:debug]
  opts
end

#get_even_split(min, max, index, count) ⇒ Object



94
95
96
97
98
99
# File 'lib/sip/sipper.rb', line 94

def get_even_split(min, max, index, count)
  size = ((max - min + 1).to_f / count.to_f).ceil
  first = (size * index) + min
  last = (size * (index+1)) + min - 1
  [first, [max, last].min]
end

#log(msg) ⇒ Object



17
18
19
# File 'lib/sip/sipper.rb', line 17

def log(msg)
  puts "#{Utils::hostname} #{Time.now.strftime '%Y-%m-%d %H:%M:%S'}: #{msg}" if @config[:debug]
end

#run_hive_import(tableconf) ⇒ Object



114
115
116
# File 'lib/sip/sipper.rb', line 114

def run_hive_import(tableconf)
  Hive.run_import self, tableconf
end

#run_scriptsObject



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/sip/sipper.rb', line 101

def run_scripts
  pids = {}
  @scripts.each { |script|
    log "Running #{script}..."
    pid = fork { Kernel.exit system("sh #{script}") }
    pids[pid] = script
    sleep 1
  }
  Process.waitall.map { |r| r.last }.each { |status|
    raise ImportScriptExecutionError, "Error runing script '#{pids[status.pid]}'" if status.exitstatus != 0
  }
end