Class: NcsNavigator::Warehouse::Transformers::SubprocessTransformer
- Inherits:
-
Object
- Object
- NcsNavigator::Warehouse::Transformers::SubprocessTransformer
- Extended by:
- Forwardable
- Defined in:
- lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb
Overview
A transformer that executes a separate executable to perform the transformation. The executable should provide error information on standard out. As it proceeds with the transformation, it can write single line messages. Each message either be an unstructured string or a single-line JSON object whose attributes are a subset of the attributes on NcsNavigator::Warehouse::TransformError. If the final line can be interpreted as an integer alone, it will be used as the record count for the transformer. Example standard out:
Could not connect to frob
{ "record_id": "F111", "model_class": "Person", "message": "was invalid" }
39
This output will result in a NcsNavigator::Warehouse::TransformStatus with record count
39 and two errors. The errors will have the messages Could not
connect to frob
and was invalid
and the latter will have the
identity of the specific troublesome record.
If the subprocess is a ruby script, it may be useful to pass the warehouse configuration filename as one of its arguments so it can use the Warehouse API to connect to the database, etc.
Instance Attribute Summary collapse
-
#configuration ⇒ Configuration
readonly
The warehouse configuration in use.
-
#directory ⇒ String
readonly
The working directory for the subprocess.
-
#environment ⇒ Hash<String, String>
readonly
Supplemental environment variable values for the subprocess.
-
#exec_and_args ⇒ Array<String>
readonly
The process parameters, including the executable name.
Instance Method Summary collapse
-
#initialize(config, exec_and_args, options = {}) ⇒ SubprocessTransformer
constructor
A new instance of SubprocessTransformer.
- #name ⇒ String
-
#transform(transform_status)
Spawns the child process and updates the provided NcsNavigator::Warehouse::TransformStatus with the results.
Constructor Details
#initialize(config, exec_and_args, options = {}) ⇒ SubprocessTransformer
Returns a new instance of SubprocessTransformer.
68 69 70 71 72 73 |
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 68 def initialize(config, exec_and_args, ={}) @configuration = config @exec_and_args = exec_and_args @directory = [:directory] || '.' @environment = [:environment] || {} end |
Instance Attribute Details
#configuration ⇒ Configuration (readonly)
Returns The warehouse configuration in use.
43 44 45 |
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 43 def configuration @configuration end |
#directory ⇒ String (readonly)
Returns the working directory for the subprocess.
47 48 49 |
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 47 def directory @directory end |
#environment ⇒ Hash<String, String> (readonly)
Returns supplemental environment variable values for the subprocess.
52 53 54 |
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 52 def environment @environment end |
#exec_and_args ⇒ Array<String> (readonly)
Returns The process parameters, including the executable name.
39 40 41 |
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 39 def exec_and_args @exec_and_args end |
Instance Method Details
#name ⇒ String
77 78 79 |
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 77 def name exec_and_args.join(' ') end |
#transform(transform_status)
This method returns an undefined value.
Spawns the child process and updates the provided NcsNavigator::Warehouse::TransformStatus with the results.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/ncs_navigator/warehouse/transformers/subprocess_transformer.rb', line 88 def transform(transform_status) process = ChildProcess.build(*exec_and_args) process.environment.merge!(environment) # inherit stderr process.io.inherit! # capture stdout out_r, out_w = IO.pipe process.io.stdout = out_w log.info "Spawning subprocess transformer `#{exec_and_args.join(' ')}`" shell.say "Spawning subprocess transformer `#{exec_and_args.join(' ')}`" FileUtils.cd directory do Bundler.with_clean_env do process.start end end log.info " PID is #{process.pid}" shell.say_line ": #{process.pid}" out_w.close out_lines = out_r.readlines if !out_lines.empty? && out_lines.last.to_i.to_s == out_lines.last.strip transform_status.record_count = out_lines.pop.to_i end out_lines.each do |error| attrs = begin JSON.parse(error.strip) rescue JSON::ParserError => e { 'message' => error.strip } end transform_status.transform_errors << NcsNavigator::Warehouse::TransformError.new(attrs) end unless process.exited? process.stop end if process.exit_code != 0 transform_status.add_error( "`#{exec_and_args.join(' ')}` process exited with code #{process.exit_code}") end if transform_status.transform_errors.empty? shell.say_line "Subprocess completed." else shell.say_line "Subprocess completed with errors." end end |