Class: Chronicle::ETL::JobDefinition
- Inherits:
-
Object
- Object
- Chronicle::ETL::JobDefinition
- Defined in:
- lib/chronicle/etl/job_definition.rb
Constant Summary collapse
- SKELETON_DEFINITION =
{ incremental: false, extractor: { name: 'stdin', options: {} }, transformers: [ { name: 'null', options: {} } ], loader: { name: 'json', options: {} } }.freeze
Instance Attribute Summary collapse
-
#definition ⇒ Object
Returns the value of attribute definition.
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
Instance Method Summary collapse
-
#add_config(config = {}) ⇒ Object
Add config hash to this definition.
-
#apply_default_secrets ⇒ Object
For each connector in this job, mix in secrets into the options.
- #dry_run? ⇒ Boolean
- #extractor_klass ⇒ Object
- #extractor_options ⇒ Object
-
#incremental? ⇒ Boolean
Is this job continuing from a previous run?.
-
#initialize ⇒ JobDefinition
constructor
A new instance of JobDefinition.
- #loader_klass ⇒ Object
- #loader_options ⇒ Object
- #plugins_missing? ⇒ Boolean
- #transformer_klasses ⇒ Object
- #transformer_options ⇒ Object
- #valid? ⇒ Boolean
- #validate ⇒ Object
- #validate! ⇒ Object
Constructor Details
#initialize ⇒ JobDefinition
Returns a new instance of JobDefinition.
29 30 31 |
# File 'lib/chronicle/etl/job_definition.rb', line 29 def initialize @definition = SKELETON_DEFINITION end |
Instance Attribute Details
#definition ⇒ Object
Returns the value of attribute definition.
27 28 29 |
# File 'lib/chronicle/etl/job_definition.rb', line 27 def definition @definition end |
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
26 27 28 |
# File 'lib/chronicle/etl/job_definition.rb', line 26 def errors @errors end |
Instance Method Details
#add_config(config = {}) ⇒ Object
Add config hash to this definition
65 66 67 68 |
# File 'lib/chronicle/etl/job_definition.rb', line 65 def add_config(config = {}) @definition = @definition.deep_merge(config) load_credentials end |
#apply_default_secrets ⇒ Object
For each connector in this job, mix in secrets into the options
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/chronicle/etl/job_definition.rb', line 71 def apply_default_secrets # FIXME: handle transformer secrets %i[extractor loader].each do |phase| # If the option have a `secrets` key, we look up those secrets and # mix them in. If not, use the connector's plugin name and look up # secrets with the same namespace if @definition[phase][:options][:secrets] namespace = @definition[phase][:options][:secrets] else # We don't want to do this lookup for built-in connectors next if __send__(:"#{phase}_klass").connector_registration.built_in? # infer plugin name from connector name and use it for secrets # namesepace namespace = @definition[phase][:name].split(':').first end # Reverse merge secrets into connector's options (we want to preserve # options that came from job file or CLI options) secrets = Chronicle::ETL::Secrets.read(namespace) @definition[phase][:options] = secrets.merge(@definition[phase][:options]) end end |
#dry_run? ⇒ Boolean
100 101 102 |
# File 'lib/chronicle/etl/job_definition.rb', line 100 def dry_run? @definition[:dry_run] end |
#extractor_klass ⇒ Object
104 105 106 |
# File 'lib/chronicle/etl/job_definition.rb', line 104 def extractor_klass find_connector_klass(:extractor, @definition[:extractor][:name]) end |
#extractor_options ⇒ Object
118 119 120 |
# File 'lib/chronicle/etl/job_definition.rb', line 118 def @definition[:extractor][:options] end |
#incremental? ⇒ Boolean
Is this job continuing from a previous run?
96 97 98 |
# File 'lib/chronicle/etl/job_definition.rb', line 96 def incremental? @definition[:incremental] end |
#loader_klass ⇒ Object
114 115 116 |
# File 'lib/chronicle/etl/job_definition.rb', line 114 def loader_klass find_connector_klass(:loader, @definition[:loader][:name]) end |
#loader_options ⇒ Object
128 129 130 |
# File 'lib/chronicle/etl/job_definition.rb', line 128 def @definition[:loader][:options] end |
#plugins_missing? ⇒ Boolean
49 50 51 52 53 54 55 56 |
# File 'lib/chronicle/etl/job_definition.rb', line 49 def plugins_missing? validate return false unless @errors[:plugins]&.any? @errors[:plugins] .any? { |e| e.instance_of?(Chronicle::ETL::PluginNotInstalledError) } end |
#transformer_klasses ⇒ Object
108 109 110 111 112 |
# File 'lib/chronicle/etl/job_definition.rb', line 108 def transformer_klasses @definition[:transformers].map do |transformer| find_connector_klass(:transformer, transformer[:name]) end end |
#transformer_options ⇒ Object
122 123 124 125 126 |
# File 'lib/chronicle/etl/job_definition.rb', line 122 def @definition[:transformers].map do |transformer| transformer[:options] end end |
#valid? ⇒ Boolean
33 34 35 36 |
# File 'lib/chronicle/etl/job_definition.rb', line 33 def valid? validate @errors.empty? end |
#validate ⇒ Object
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/chronicle/etl/job_definition.rb', line 38 def validate @errors = {} extractor_klass transformer_klasses loader_klass rescue Chronicle::ETL::PluginError => e @errors[:plugins] ||= [] @errors[:plugins] << e end |
#validate! ⇒ Object
58 59 60 61 62 |
# File 'lib/chronicle/etl/job_definition.rb', line 58 def validate! raise(Chronicle::ETL::JobDefinitionError.new(self), 'Job definition is invalid') unless valid? true end |