Class: NcsNavigator::Warehouse::Transformers::SubprocessTransformer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb

Overview

A transformer that executes a separate executable to perform the transformation. The executable should provide error information on standard out. As it proceeds with the transformation, it can write single line messages. Each message either be an unstructured string or a single-line JSON object whose attributes are a subset of the attributes on NcsNavigator::Warehouse::TransformError. If the final line can be interpreted as an integer alone, it will be used as the record count for the transformer. Example standard out:

Could not connect to frob
{ "record_id": "F111", "model_class": "Person", "message": "was invalid" }
39

This output will result in a NcsNavigator::Warehouse::TransformStatus with record count 39 and two errors. The errors will have the messages Could not connect to frob and was invalid and the latter will have the identity of the specific troublesome record.

If the subprocess is a ruby script, it may be useful to pass the warehouse configuration filename as one of its arguments so it can use the Warehouse API to connect to the database, etc.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, exec_and_args, options = {}) ⇒ SubprocessTransformer

Returns a new instance of SubprocessTransformer.

Parameters:

  • config (Configuration)
  • exec_and_args (String<Array>)

    the arguments for the subprocess to spawn, including the executable name (which should be the first item in the array)

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :environment (Hash<String, String>)

    values to include in the subprocess's environment. The subprocess will inherit all the environment variables from the parent ETL process, so this only needs to include overrides.

  • :directory (String)

    the directory from which to run the subprocess if different from the current working directory.



68
69
70
71
72
73
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 68

def initialize(config, exec_and_args, options={})
  @configuration = config
  @exec_and_args = exec_and_args
  @directory = options[:directory] || '.'
  @environment = options[:environment] || {}
end

Instance Attribute Details

#configurationConfiguration (readonly)

Returns The warehouse configuration in use.

Returns:



43
44
45
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 43

def configuration
  @configuration
end

#directoryString (readonly)

Returns the working directory for the subprocess.

Returns:

  • (String)

    the working directory for the subprocess.



47
48
49
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 47

def directory
  @directory
end

#environmentHash<String, String> (readonly)

Returns supplemental environment variable values for the subprocess.

Returns:

  • (Hash<String, String>)

    supplemental environment variable values for the subprocess.



52
53
54
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 52

def environment
  @environment
end

#exec_and_argsArray<String> (readonly)

Returns The process parameters, including the executable name.

Returns:

  • (Array<String>)

    The process parameters, including the executable name.



39
40
41
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 39

def exec_and_args
  @exec_and_args
end

Instance Method Details

#nameString

Returns:

  • (String)


77
78
79
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 77

def name
  exec_and_args.join(' ')
end

#transform(transform_status)

This method returns an undefined value.

Spawns the child process and updates the provided NcsNavigator::Warehouse::TransformStatus with the results.

Parameters:



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 88

def transform(transform_status)
  process = ChildProcess.build(*exec_and_args)
  process.environment.merge!(environment)

  # inherit stderr
  process.io.inherit!

  # capture stdout
  out_r, out_w = IO.pipe
  process.io.stdout = out_w

  log.info "Spawning subprocess transformer `#{exec_and_args.join(' ')}`"
  shell.say "Spawning subprocess transformer `#{exec_and_args.join(' ')}`"

  FileUtils.cd directory do
    Bundler.with_clean_env do
      process.start
    end
  end

  log.info "  PID is #{process.pid}"
  shell.say_line ": #{process.pid}"
  out_w.close

  out_lines = out_r.readlines
  if !out_lines.empty? && out_lines.last.to_i.to_s == out_lines.last.strip
    transform_status.record_count = out_lines.pop.to_i
  end
  out_lines.each do |error|
    attrs =
      begin
        JSON.parse(error.strip)
      rescue JSON::ParserError => e
        { 'message' => error.strip }
      end
    transform_status.transform_errors <<
      NcsNavigator::Warehouse::TransformError.new(attrs)
  end

  unless process.exited?
    process.stop
  end

  if process.exit_code != 0
    transform_status.add_error(
      "`#{exec_and_args.join(' ')}` process exited with code #{process.exit_code}")
  end

  if transform_status.transform_errors.empty?
    shell.say_line "Subprocess completed."
  else
    shell.say_line "Subprocess completed with errors."
  end
end