Module: Chronicle::ETL::Registry::Connectors

Defined in:
lib/chronicle/etl/registry/connectors.rb

Overview

A singleton class that acts as a registry of connector classes available for ETL jobs

Constant Summary collapse

PHASES =
%i[extractor transformer loader].freeze

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.connectorsObject

Returns the value of attribute connectors.



14
15
16
# File 'lib/chronicle/etl/registry/connectors.rb', line 14

def connectors
  @connectors
end

Class Method Details

.ancestor_for_phase(phase) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/chronicle/etl/registry/connectors.rb', line 25

def self.ancestor_for_phase(phase)
  case phase
  when :extractor
    Chronicle::ETL::Extractor
  when :transformer
    Chronicle::ETL::Transformer
  when :loader
    Chronicle::ETL::Loader
  end
end

.find_by_phase_and_identifier(phase, identifier) ⇒ Object

Find connector and load relevant plugin to find it if necessary



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/chronicle/etl/registry/connectors.rb', line 59

def self.find_by_phase_and_identifier(phase, identifier)
  connector = find_by_phase_and_identifier_built_in(phase, identifier)
  return connector if connector

  # determine if we need to try to load a local file. if it has a dot in the identifier, we treat it as a file
  return find_by_phase_and_identifier_local(phase, identifier) if identifier.to_s.include?('.')

  # Example identifier: lastfm:listens:api
  plugin, type, strategy = identifier.split(':')
    .map { |part| part.gsub('-', '_') }
    .map(&:to_sym)

  plugin_identifier = plugin.to_s.gsub('_', '-')

  unless Chronicle::ETL::Registry::Plugins.installed?(plugin_identifier)
    raise Chronicle::ETL::PluginNotInstalledError, plugin_identifier
  end

  Chronicle::ETL::Registry::Plugins.activate(plugin_identifier)

  # find most specific connector that matches the identifier
  connector = connectors.find do |c|
    c.plugin == plugin && (type.nil? || c.type == type) && (strategy.nil? || c.strategy == strategy)
  end

  connector || raise(ConnectorNotAvailableError, "Connector '#{identifier}' not found")
end

.find_by_phase_and_identifier_built_in(phase, identifier) ⇒ Object

Find connector from amongst those currently loaded



54
55
56
# File 'lib/chronicle/etl/registry/connectors.rb', line 54

def self.find_by_phase_and_identifier_built_in(phase, identifier)
  connectors.find { |c| c.phase == phase.to_sym && c.identifier == identifier.to_sym }
end

.find_by_phase_and_identifier_local(phase, identifier) ⇒ Object

Load a plugin from local file system



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/chronicle/etl/registry/connectors.rb', line 88

def self.find_by_phase_and_identifier_local(phase, identifier)
  script = File.read(identifier)
  raise ConnectorNotAvailableError, "Connector '#{identifier}' not found" if script.nil?

  # load the file by evaluating the contents
  eval(script, TOPLEVEL_BINDING, __FILE__, __LINE__) # rubocop:disable Security/Eval

  # read the file and look for all class definitions in the ruby script.
  class_names = script.scan(/class (\w+)/).flatten

  class_names.each do |class_name|
    klass = Object.const_get(class_name)

    next unless klass.ancestors.include?(ancestor_for_phase(phase))

    registration = ::Chronicle::ETL::Registry::ConnectorRegistration.new(klass)

    klass.connector_registration = registration
    return registration
    # return klass
  rescue NameError
    # ignore
  end

  raise ConnectorNotAvailableError, "Connector '#{identifier}' not found"
end

.find_converter_for_source(source:, type: nil, strategy: nil, target: nil) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/chronicle/etl/registry/connectors.rb', line 36

def self.find_converter_for_source(source:, type: nil, strategy: nil, target: nil)
  # FIXME: we're assuming extractor plugin has been loaded already
  # This may not be the case if the schema converter is running
  # off a json dump off extraction data.
  # plugin = source_klass.connector_registration.source
  # type = source_klass.connector_registration.type
  # strategy = source_klass.connector_registration.strategy

  connectors.find do |c|
    c.phase == :transformer &&
      c.source == source &&
      (type.nil? || c.type == type) &&
      (strategy.nil? || c.strategy == strategy || c.strategy.nil?) &&
      (target.nil? || c.to_schema == target)
  end
end

.register(connector) ⇒ Object



17
18
19
# File 'lib/chronicle/etl/registry/connectors.rb', line 17

def self.register(connector)
  connectors << connector
end