Class: Spotlight::Etl::Executor

Inherits:
Object
  • Object
show all
Includes:
ActiveSupport::Benchmarkable
Defined in:
app/services/spotlight/etl/executor.rb

Overview

ETL pipeline executor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pipeline, context, cache: nil) ⇒ Executor

Returns a new instance of Executor.

Parameters:



16
17
18
19
20
21
22
23
# File 'app/services/spotlight/etl/executor.rb', line 16

def initialize(pipeline, context, cache: nil)
  @pipeline = pipeline
  @context = context

  @provided_cache = cache.present?
  @cache = cache || {}
  @step_cache = {}
end

Instance Attribute Details

#cacheObject (readonly)

Returns the value of attribute cache.



9
10
11
# File 'app/services/spotlight/etl/executor.rb', line 9

def cache
  @cache
end

#contextObject (readonly)

Returns the value of attribute context.



9
10
11
# File 'app/services/spotlight/etl/executor.rb', line 9

def context
  @context
end

#loggerObject (readonly)

Returns the value of attribute logger.



9
10
11
# File 'app/services/spotlight/etl/executor.rb', line 9

def logger
  @logger
end

#pipelineObject (readonly)

Returns the value of attribute pipeline.



9
10
11
# File 'app/services/spotlight/etl/executor.rb', line 9

def pipeline
  @pipeline
end

#sourceObject (readonly)

Returns the value of attribute source.



9
10
11
# File 'app/services/spotlight/etl/executor.rb', line 9

def source
  @source
end

Instance Method Details

#call(data: {}) {|optionally..| ... } ⇒ Object

Execute the ETL pipeline

Parameters:

  • data (Hash) (defaults to: {})

    the initial data structure to pass through to the transform steps

Yields:

  • (optionally..)

    each transformed document after it is transformed but before it is sent to the loaders



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'app/services/spotlight/etl/executor.rb', line 31

def call(data: {}, &block)
  extract.with_index do |source, index|
    with_source(source, index) do
      catch :skip do
        load(transform(data), &block)
      end
    rescue StandardError => e
      on_error(e, data)
    end
  end

  after_call
end

#estimated_sizeNumber

Estimate the number of documents that will be produced by the pipeline

Returns:

  • (Number)


49
50
51
# File 'app/services/spotlight/etl/executor.rb', line 49

def estimated_size
  @estimated_size ||= compile_steps(sources).sum { |source| source.call(context).count }
end

#on_error(exception, data) ⇒ Object

Propagate exceptions up to the context’s error handler.



81
82
83
# File 'app/services/spotlight/etl/executor.rb', line 81

def on_error(exception, data)
  context.on_error(self, exception, data)
end

#transform_data_for_debugging(data, verbose: $VERBOSE, truncate: 100) ⇒ String

Returns a simplified + truncated version of the data hash for debugging.

Parameters:

  • data (Hash)

Returns:

  • (String)

    a simplified + truncated version of the data hash for debugging



72
73
74
75
76
77
# File 'app/services/spotlight/etl/executor.rb', line 72

def transform_data_for_debugging(data, verbose: $VERBOSE, truncate: 100)
  return data.inspect.truncate(truncate) unless data.is_a?(Hash)
  return "id #{context.unique_key(data) || data&.first(5)&.inspect}" unless verbose

  JSON.fast_generate(data).truncate(truncate)
end

#with_logger { ... } ⇒ Object

Tagged logger for benchmarks and data flow logging. NOTE: this is super weird to support Rails 5.2

Yields:

  • Logger



58
59
60
61
62
63
64
65
66
# File 'app/services/spotlight/etl/executor.rb', line 58

def with_logger
  logger = context&.logger || Rails.logger
  logger.tagged(pipeline.class) do
    logger.tagged("#<#{source.class} id=#{source&.id if source.respond_to?(:id)}>") do
      @logger = logger
      yield logger
    end
  end
end