Module: Etl::Integrations::Core::Utils

Included in:
BaseConnector, Destination::Tally, Protocol::ProtocolModel
Defined in:
lib/etl/integrations/core/utils.rb

Instance Method Summary collapse

Instance Method Details

#build_catalog(catalog_json) ⇒ Object



76
77
78
79
80
81
82
83
84
85
# File 'lib/etl/integrations/core/utils.rb', line 76

def build_catalog(catalog_json)
  streams = catalog_json["streams"].map { |stream_json| build_stream(stream_json) }
  Etl::Integrations::Protocol::Catalog.new(
    streams: streams,
    request_rate_limit: catalog_json["request_rate_limit"] || 60,
    request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute",
    request_rate_concurrency: catalog_json["request_rate_concurrency"] || 10,
    schema_mode: catalog_json["schema_mode"] || "schema"
  )
end

#build_stream(stream_json) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/etl/integrations/core/utils.rb', line 87

def build_stream(stream_json)
  Etl::Integrations::Protocol::Stream.new(
    name: stream_json["name"],
    url: stream_json["url"],
    action: stream_json["action"],
    request_method: stream_json["method"],
    batch_support: stream_json["batch_support"] || false,
    batch_size: stream_json["batch_size"] || 1,
    json_schema: stream_json["json_schema"],
    request_rate_limit: stream_json["request_rate_limit"].to_i,
    request_rate_limit_unit: stream_json["request_rate_limit_unit"] || "minute",
    request_rate_concurrency: stream_json["request_rate_concurrency"].to_i,
    supported_sync_modes: stream_json["supported_sync_modes"]
  )
end

#convert_to_json_schema(column_definitions) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/etl/integrations/core/utils.rb', line 18

def convert_to_json_schema(column_definitions)
  json_schema = {
    "type" => "object",
    "properties" => {}
  }

  column_definitions.each do |column|
    column_name = column[:column_name]
    type = column[:type]
    optional = column[:optional]
    json_type = map_type_to_json_schema(type)
    json_schema["properties"][column_name] = {
      "type" => json_type
    }
    json_schema["properties"][column_name]["type"] = [json_type, "null"] if optional
  end

  json_schema
end

#create_log_message(context, type, exception) ⇒ Object



51
52
53
54
55
56
57
# File 'lib/etl/integrations/core/utils.rb', line 51

def create_log_message(context, type, exception)
  Integrations::Protocol::LogMessage.new(
    name: context,
    level: type,
    message: exception.message
  ).to_multiwoven_message
end

#extract_data(record_object, properties) ⇒ Object



67
68
69
70
# File 'lib/etl/integrations/core/utils.rb', line 67

def extract_data(record_object, properties)
  data_attributes = record_object.with_indifferent_access
  data_attributes.select { |key, _| properties.key?(key.to_sym) }
end

#handle_exception(context, type, exception) ⇒ Object



59
60
61
62
63
64
65
# File 'lib/etl/integrations/core/utils.rb', line 59

def handle_exception(context, type, exception)
  logger.error(
    "#{context}: #{exception.message}"
  )

  create_log_message(context, type, exception)
end

#keys_to_symbols(hash) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/etl/integrations/core/utils.rb', line 6

def keys_to_symbols(hash)
  if hash.is_a?(Hash)
    hash.each_with_object({}) do |(key, value), result|
      result[key.to_sym] = keys_to_symbols(value)
    end
  elsif hash.is_a?(Array)
    hash.map { |item| keys_to_symbols(item) }
  else
    hash
  end
end

#loggerObject



47
48
49
# File 'lib/etl/integrations/core/utils.rb', line 47

def logger
  Integrations::Service.logger
end

#map_type_to_json_schema(type) ⇒ Object



38
39
40
41
42
43
44
45
# File 'lib/etl/integrations/core/utils.rb', line 38

def map_type_to_json_schema(type)
  case type
  when "NUMBER"
    "integer"
  else
    "string" # Default type
  end
end

#success?(response) ⇒ Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/etl/integrations/core/utils.rb', line 72

def success?(response)
  response && %w[200 201].include?(response.code.to_s)
end