Class: Masamune::Schema::Map
- Inherits:
-
Object
- Object
- Masamune::Schema::Map
show all
- Defined in:
- lib/masamune/schema/map.rb
Defined Under Namespace
Classes: Buffer, JSONEncoder
Constant Summary
collapse
- DEFAULT_ATTRIBUTES =
{
source: nil,
target: nil,
columns: nil,
store: nil,
function: ->(row, *_) { row },
distinct: false,
fail_fast: false,
debug: false
}.freeze
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts = {}) ⇒ Map
Returns a new instance of Map.
185
186
187
188
189
190
191
192
193
|
# File 'lib/masamune/schema/map.rb', line 185
def initialize(opts = {})
opts.symbolize_keys!
raise ArgumentError, 'required parameter source: missing' unless opts.key?(:source)
raise ArgumentError, 'required parameter target: missing' unless opts.key?(:target)
DEFAULT_ATTRIBUTES.merge(opts).each do |name, value|
public_send("#{name}=", value)
end
@skipped = 0
end
|
Instance Attribute Details
#source=(value) ⇒ Object
Sets the attribute source
195
196
197
|
# File 'lib/masamune/schema/map.rb', line 195
def source=(value)
@source = value
end
|
Class Method Details
.convert_file(file) ⇒ Object
254
255
256
257
258
259
260
261
|
# File 'lib/masamune/schema/map.rb', line 254
def convert_file(file)
if file.respond_to?(:path)
file.flush if file.respond_to?(:flush) && file.respond_to?(:open?) && file.open?
file.path
else
file
end
end
|
.convert_files(files) ⇒ Object
263
264
265
266
267
268
269
270
271
272
|
# File 'lib/masamune/schema/map.rb', line 263
def convert_files(files)
case files
when Set
files.map { |file| convert_file(file) }.to_a
when Array
files.map { |file| convert_file(file) }.to_a
else
[convert_file(files)]
end
end
|
Instance Method Details
#apply(input_files, output_file) ⇒ Object
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
# File 'lib/masamune/schema/map.rb', line 213
def apply(input_files, output_file)
input_buffer = Buffer.new(self, source)
output_buffer = Buffer.new(self, intermediate)
self.class.convert_files(input_files).each do |input_file|
open_stream(input_file, 'r') do |input_stream|
input_buffer.bind(input_stream)
open_stream(output_file, 'a+') do |output_stream|
output_buffer.bind(output_stream)
apply_buffer(input_buffer, output_buffer)
end
end
end
intermediate
end
|
209
210
211
|
# File 'lib/masamune/schema/map.rb', line 209
def intermediate
target.stage_table(columns: columns || intermediate_columns, inherit: false)
end
|
202
203
204
205
206
207
|
# File 'lib/masamune/schema/map.rb', line 202
def intermediate_columns
output = function.call(default_row(source.columns), 0)
example = Array.wrap(output).first
raise ArgumentError, "function for map between '#{source.name}' and '#{target.name}' does not return output for default input" unless example
example.keys
end
|
#open_stream(file, mode) ⇒ Object
228
229
230
231
232
233
234
235
236
237
238
|
# File 'lib/masamune/schema/map.rb', line 228
def open_stream(file, mode)
case file
when IO, StringIO
file.flush
yield file
when String, Tempfile
File.open(file, mode) do |io|
yield io
end
end
end
|
#skip_or_raise(buffer, row, message) ⇒ Object
240
241
242
243
244
245
246
247
248
249
250
251
|
# File 'lib/masamune/schema/map.rb', line 240
def skip_or_raise(buffer, row, message)
message = 'failed to process' if message.nil? || message.blank?
trace = { message: message, source: source.name, target: target.name, file: buffer.try(:to_s), line: buffer.try(:line), row: row.try(:to_hash) }
if fail_fast
@store.logger.error(message)
@store.logger.debug(trace)
raise message
else
@store.logger.warn(message)
@store.logger.debug(trace)
end
end
|
#target=(target) ⇒ Object
FIXME: avoid implict conversions
198
199
200
|
# File 'lib/masamune/schema/map.rb', line 198
def target=(target)
@target = target.type == :four ? target.ledger_table : target
end
|