Class: Chronicle::ETL::JobDefinition

Inherits:
Object
  • Object
show all
Defined in:
lib/chronicle/etl/job_definition.rb

Constant Summary collapse

SKELETON_DEFINITION =
{
  incremental: false,
  extractor: {
    name: 'stdin',
    options: {}
  },
  transformers: [
    {
      name: 'null',
      options: {}
    }
  ],
  loader: {
    name: 'json',
    options: {}
  }
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJobDefinition

Returns a new instance of JobDefinition.



29
30
31
# File 'lib/chronicle/etl/job_definition.rb', line 29

def initialize
  @definition = SKELETON_DEFINITION
end

Instance Attribute Details

#definitionObject

Returns the value of attribute definition.



27
28
29
# File 'lib/chronicle/etl/job_definition.rb', line 27

def definition
  @definition
end

#errorsObject (readonly)

Returns the value of attribute errors.



26
27
28
# File 'lib/chronicle/etl/job_definition.rb', line 26

def errors
  @errors
end

Instance Method Details

#add_config(config = {}) ⇒ Object

Add config hash to this definition



65
66
67
68
# File 'lib/chronicle/etl/job_definition.rb', line 65

def add_config(config = {})
  @definition = @definition.deep_merge(config)
  load_credentials
end

#apply_default_secretsObject

For each connector in this job, mix in secrets into the options



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/chronicle/etl/job_definition.rb', line 71

def apply_default_secrets
  # FIXME: handle transformer secrets
  %i[extractor loader].each do |phase|
    # If the option have a `secrets` key, we look up those secrets and
    # mix them in. If not, use the connector's plugin name and look up
    # secrets with the same namespace
    if @definition[phase][:options][:secrets]
      namespace = @definition[phase][:options][:secrets]
    else
      # We don't want to do this lookup for built-in connectors
      next if __send__(:"#{phase}_klass").connector_registration.built_in?

      # infer plugin name from connector name and use it for secrets
      # namesepace
      namespace = @definition[phase][:name].split(':').first
    end

    # Reverse merge secrets into connector's options (we want to preserve
    # options that came from job file or CLI options)
    secrets = Chronicle::ETL::Secrets.read(namespace)
    @definition[phase][:options] = secrets.merge(@definition[phase][:options])
  end
end

#dry_run?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/chronicle/etl/job_definition.rb', line 100

def dry_run?
  @definition[:dry_run]
end

#extractor_klassObject



104
105
106
# File 'lib/chronicle/etl/job_definition.rb', line 104

def extractor_klass
  find_connector_klass(:extractor, @definition[:extractor][:name])
end

#extractor_optionsObject



118
119
120
# File 'lib/chronicle/etl/job_definition.rb', line 118

def extractor_options
  @definition[:extractor][:options]
end

#incremental?Boolean

Is this job continuing from a previous run?

Returns:

  • (Boolean)


96
97
98
# File 'lib/chronicle/etl/job_definition.rb', line 96

def incremental?
  @definition[:incremental]
end

#loader_klassObject



114
115
116
# File 'lib/chronicle/etl/job_definition.rb', line 114

def loader_klass
  find_connector_klass(:loader, @definition[:loader][:name])
end

#loader_optionsObject



128
129
130
# File 'lib/chronicle/etl/job_definition.rb', line 128

def loader_options
  @definition[:loader][:options]
end

#plugins_missing?Boolean

Returns:

  • (Boolean)


49
50
51
52
53
54
55
56
# File 'lib/chronicle/etl/job_definition.rb', line 49

def plugins_missing?
  validate

  return false unless @errors[:plugins]&.any?

  @errors[:plugins]
    .any? { |e| e.instance_of?(Chronicle::ETL::PluginNotInstalledError) }
end

#transformer_klassesObject



108
109
110
111
112
# File 'lib/chronicle/etl/job_definition.rb', line 108

def transformer_klasses
  @definition[:transformers].map do |transformer|
    find_connector_klass(:transformer, transformer[:name])
  end
end

#transformer_optionsObject



122
123
124
125
126
# File 'lib/chronicle/etl/job_definition.rb', line 122

def transformer_options
  @definition[:transformers].map do |transformer|
    transformer[:options]
  end
end

#valid?Boolean

Returns:

  • (Boolean)


33
34
35
36
# File 'lib/chronicle/etl/job_definition.rb', line 33

def valid?
  validate
  @errors.empty?
end

#validateObject



38
39
40
41
42
43
44
45
46
47
# File 'lib/chronicle/etl/job_definition.rb', line 38

def validate
  @errors = {}

  extractor_klass
  transformer_klasses
  loader_klass
rescue Chronicle::ETL::PluginError => e
  @errors[:plugins] ||= []
  @errors[:plugins] << e
end

#validate!Object



58
59
60
61
62
# File 'lib/chronicle/etl/job_definition.rb', line 58

def validate!
  raise(Chronicle::ETL::JobDefinitionError.new(self), 'Job definition is invalid') unless valid?

  true
end