Class: Spotlight::Etl::Executor
- Inherits:
-
Object
- Object
- Spotlight::Etl::Executor
- Includes:
- ActiveSupport::Benchmarkable
- Defined in:
- app/services/spotlight/etl/executor.rb
Overview
ETL pipeline executor
Instance Attribute Summary collapse
-
#cache ⇒ Object
readonly
Returns the value of attribute cache.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#pipeline ⇒ Object
readonly
Returns the value of attribute pipeline.
-
#source ⇒ Object
readonly
Returns the value of attribute source.
Instance Method Summary collapse
-
#call(data: {}) {|optionally..| ... } ⇒ Object
Execute the ETL pipeline.
-
#estimated_size ⇒ Number
Estimate the number of documents that will be produced by the pipeline.
-
#initialize(pipeline, context, cache: nil) ⇒ Executor
constructor
A new instance of Executor.
-
#on_error(exception, data) ⇒ Object
Propagate exceptions up to the context’s error handler.
-
#transform_data_for_debugging(data, verbose: $VERBOSE, truncate: 100) ⇒ String
A simplified + truncated version of the data hash for debugging.
-
#with_logger { ... } ⇒ Object
Tagged logger for benchmarks and data flow logging.
Constructor Details
#initialize(pipeline, context, cache: nil) ⇒ Executor
Returns a new instance of Executor.
16 17 18 19 20 21 22 23 |
# File 'app/services/spotlight/etl/executor.rb', line 16 def initialize(pipeline, context, cache: nil) @pipeline = pipeline @context = context @provided_cache = cache.present? @cache = cache || {} @step_cache = {} end |
Instance Attribute Details
#cache ⇒ Object (readonly)
Returns the value of attribute cache.
9 10 11 |
# File 'app/services/spotlight/etl/executor.rb', line 9 def cache @cache end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
9 10 11 |
# File 'app/services/spotlight/etl/executor.rb', line 9 def context @context end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
9 10 11 |
# File 'app/services/spotlight/etl/executor.rb', line 9 def logger @logger end |
#pipeline ⇒ Object (readonly)
Returns the value of attribute pipeline.
9 10 11 |
# File 'app/services/spotlight/etl/executor.rb', line 9 def pipeline @pipeline end |
#source ⇒ Object (readonly)
Returns the value of attribute source.
9 10 11 |
# File 'app/services/spotlight/etl/executor.rb', line 9 def source @source end |
Instance Method Details
#call(data: {}) {|optionally..| ... } ⇒ Object
Execute the ETL pipeline
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'app/services/spotlight/etl/executor.rb', line 31 def call(data: {}, &block) extract.with_index do |source, index| with_source(source, index) do catch :skip do load(transform(data), &block) end rescue StandardError => e on_error(e, data) end end after_call end |
#estimated_size ⇒ Number
Estimate the number of documents that will be produced by the pipeline
49 50 51 |
# File 'app/services/spotlight/etl/executor.rb', line 49 def estimated_size @estimated_size ||= compile_steps(sources).sum { |source| source.call(context).count } end |
#on_error(exception, data) ⇒ Object
Propagate exceptions up to the context’s error handler.
81 82 83 |
# File 'app/services/spotlight/etl/executor.rb', line 81 def on_error(exception, data) context.on_error(self, exception, data) end |
#transform_data_for_debugging(data, verbose: $VERBOSE, truncate: 100) ⇒ String
Returns a simplified + truncated version of the data hash for debugging.
72 73 74 75 76 77 |
# File 'app/services/spotlight/etl/executor.rb', line 72 def transform_data_for_debugging(data, verbose: $VERBOSE, truncate: 100) return data.inspect.truncate(truncate) unless data.is_a?(Hash) return "id #{context.unique_key(data) || data&.first(5)&.inspect}" unless verbose JSON.fast_generate(data).truncate(truncate) end |
#with_logger { ... } ⇒ Object
Tagged logger for benchmarks and data flow logging. NOTE: this is super weird to support Rails 5.2
58 59 60 61 62 63 64 65 66 |
# File 'app/services/spotlight/etl/executor.rb', line 58 def with_logger logger = (context&.logger || Rails.logger) logger.tagged(pipeline.class) do logger.tagged("#<#{source.class} id=#{source&.id if source.respond_to?(:id)}>") do @logger = logger yield logger end end end |