Module: Etl::Integrations::Core::Utils
- Included in:
- BaseConnector, Destination::Tally, Protocol::ProtocolModel
- Defined in:
- lib/etl/integrations/core/utils.rb
Instance Method Summary collapse
- #build_catalog(catalog_json) ⇒ Object
- #build_stream(stream_json) ⇒ Object
- #convert_to_json_schema(column_definitions) ⇒ Object
- #create_log_message(context, type, exception) ⇒ Object
- #extract_data(record_object, properties) ⇒ Object
- #handle_exception(context, type, exception) ⇒ Object
- #keys_to_symbols(hash) ⇒ Object
- #logger ⇒ Object
- #map_type_to_json_schema(type) ⇒ Object
- #success?(response) ⇒ Boolean
Instance Method Details
#build_catalog(catalog_json) ⇒ Object
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/etl/integrations/core/utils.rb', line 76 def build_catalog(catalog_json) streams = catalog_json["streams"].map { |stream_json| build_stream(stream_json) } Etl::Integrations::Protocol::Catalog.new( streams: streams, request_rate_limit: catalog_json["request_rate_limit"] || 60, request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute", request_rate_concurrency: catalog_json["request_rate_concurrency"] || 10, schema_mode: catalog_json["schema_mode"] || "schema" ) end |
#build_stream(stream_json) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/etl/integrations/core/utils.rb', line 87 def build_stream(stream_json) Etl::Integrations::Protocol::Stream.new( name: stream_json["name"], url: stream_json["url"], action: stream_json["action"], request_method: stream_json["method"], batch_support: stream_json["batch_support"] || false, batch_size: stream_json["batch_size"] || 1, json_schema: stream_json["json_schema"], request_rate_limit: stream_json["request_rate_limit"].to_i, request_rate_limit_unit: stream_json["request_rate_limit_unit"] || "minute", request_rate_concurrency: stream_json["request_rate_concurrency"].to_i, supported_sync_modes: stream_json["supported_sync_modes"] ) end |
#convert_to_json_schema(column_definitions) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/etl/integrations/core/utils.rb', line 18 def convert_to_json_schema(column_definitions) json_schema = { "type" => "object", "properties" => {} } column_definitions.each do |column| column_name = column[:column_name] type = column[:type] optional = column[:optional] json_type = map_type_to_json_schema(type) json_schema["properties"][column_name] = { "type" => json_type } json_schema["properties"][column_name]["type"] = [json_type, "null"] if optional end json_schema end |
#create_log_message(context, type, exception) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/etl/integrations/core/utils.rb', line 51 def (context, type, exception) Integrations::Protocol::LogMessage.new( name: context, level: type, message: exception. ). end |
#extract_data(record_object, properties) ⇒ Object
67 68 69 70 |
# File 'lib/etl/integrations/core/utils.rb', line 67 def extract_data(record_object, properties) data_attributes = record_object.with_indifferent_access data_attributes.select { |key, _| properties.key?(key.to_sym) } end |
#handle_exception(context, type, exception) ⇒ Object
59 60 61 62 63 64 65 |
# File 'lib/etl/integrations/core/utils.rb', line 59 def handle_exception(context, type, exception) logger.error( "#{context}: #{exception.}" ) (context, type, exception) end |
#keys_to_symbols(hash) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/etl/integrations/core/utils.rb', line 6 def keys_to_symbols(hash) if hash.is_a?(Hash) hash.each_with_object({}) do |(key, value), result| result[key.to_sym] = keys_to_symbols(value) end elsif hash.is_a?(Array) hash.map { |item| keys_to_symbols(item) } else hash end end |
#logger ⇒ Object
47 48 49 |
# File 'lib/etl/integrations/core/utils.rb', line 47 def logger Integrations::Service.logger end |
#map_type_to_json_schema(type) ⇒ Object
38 39 40 41 42 43 44 45 |
# File 'lib/etl/integrations/core/utils.rb', line 38 def map_type_to_json_schema(type) case type when "NUMBER" "integer" else "string" # Default type end end |
#success?(response) ⇒ Boolean
72 73 74 |
# File 'lib/etl/integrations/core/utils.rb', line 72 def success?(response) response && %w[200 201].include?(response.code.to_s) end |