Class: Rukawa::Builtins::Embulk

Inherits:
Shell show all
Defined in:
lib/rukawa/builtins/embulk.rb

Instance Attribute Summary

Attributes inherited from Job

#finished_at, #in_comings, #out_goings, #started_at, #state, #variables

Attributes inherited from AbstractJob

#parent_job_net

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Shell

#execute_command

Methods inherited from Base

[]

Methods inherited from Job

after_fail, after_run, around_run, before_run, #dataflow, #initialize, #jobs_as_from, #leaf?, #resource_count, #root?, set_dependency_type, set_resource_count, set_retryable, #set_state, #to_dot_def, wrapper_for, wrappers

Methods inherited from AbstractJob

add_skip_rule, description, #elapsed_time_from, #formatted_elapsed_time_from, #inspect, #name, set_description, #skip?

Constructor Details

This class inherits a constructor from Rukawa::Job

Class Method Details

.handle_parameters(config:, embulk_bin: nil, embulk_bundle: nil, embulk_vm_options: nil, jvm_options: nil, stdout: nil, stderr: nil, env: nil, chdir: nil, preview: false) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/rukawa/builtins/embulk.rb', line 15

def handle_parameters(config:, embulk_bin: nil, embulk_bundle: nil, embulk_vm_options: nil, jvm_options: nil, stdout: nil, stderr: nil, env: nil, chdir: nil, preview: false)
  self.config = config
  self.embulk_bin = embulk_bin if embulk_bin
  self.embulk_bundle = embulk_bundle if embulk_bundle
  self.embulk_vm_options = embulk_vm_options if embulk_vm_options
  self.jvm_options = jvm_options if jvm_options
  self.stdout = stdout if stdout
  self.stderr = stderr if stderr
  self.env = env if env
  self.chdir = chdir if chdir
  self.preview = preview
end

Instance Method Details

#runObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/rukawa/builtins/embulk.rb', line 29

def run
  process = -> do
    if preview
      stdout.puts "Config:\n#{File.read(config)}"
      cmds = [embulk_bin, "preview", *embulk_bundle, config].compact
      stdout.puts cmds.join(" ")
    else
      cmds = [embulk_bin, *embulk_vm_options, *jvm_options, "run", *embulk_bundle, config].compact
      stdout.puts cmds.join(" ")
    end

    stdout.flush

    cmdenv = env || {}
    opts = chdir ? {chdir: chdir} : {}
    result, log = execute_command(cmds, cmdenv, opts)

    unless result.success?
      next if log =~ /NoSampleException/
      raise "embulk error"
    end
  end

  if defined?(Bundler)
    Bundler.with_clean_env(&process)
  else
    process.call
  end
end