Class: Embulk::Output::Bigquery::Helper
- Inherits:
-
Object
- Object
- Embulk::Output::Bigquery::Helper
- Defined in:
- lib/embulk/output/bigquery/helper.rb
Constant Summary collapse
- PARTITION_DECORATOR_REGEXP =
/\$.+\z/
Class Method Summary collapse
- .bq_type_from_embulk_type(embulk_type) ⇒ Object
- .chomp_partition_decorator(table_name) ⇒ Object
-
.column_options_map(column_options) ⇒ Hash
ToDo: recursively map fields?.
- .create_load_job_id(task, path, fields) ⇒ Object
- .deep_symbolize_keys(obj) ⇒ Object
- .field_partitioning?(task) ⇒ Boolean
- .fields_from_embulk_schema(task, schema) ⇒ Object
- .has_partition_decorator?(table_name) ⇒ Boolean
Class Method Details
.bq_type_from_embulk_type(embulk_type) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/embulk/output/bigquery/helper.rb', line 22 def self.bq_type_from_embulk_type(embulk_type) case embulk_type when :boolean then 'BOOLEAN' when :long then 'INTEGER' when :double then 'FLOAT' when :string then 'STRING' when :timestamp then 'TIMESTAMP' when :json then 'STRING' # NOTE: Default is not RECORD since it requires `fields` else raise ArgumentError, "embulk type #{embulk_type} is not supported" end end |
.chomp_partition_decorator(table_name) ⇒ Object
18 19 20 |
# File 'lib/embulk/output/bigquery/helper.rb', line 18 def self.chomp_partition_decorator(table_name) table_name.sub(PARTITION_DECORATOR_REGEXP, '') end |
.column_options_map(column_options) ⇒ Hash
ToDo: recursively map fields?
36 37 38 39 40 |
# File 'lib/embulk/output/bigquery/helper.rb', line 36 def self.() ( || {}).map do |column_option| [column_option['name'], column_option] end.to_h end |
.create_load_job_id(task, path, fields) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/embulk/output/bigquery/helper.rb', line 71 def self.create_load_job_id(task, path, fields) elements = [ Digest::MD5.file(path).hexdigest, task['dataset'], task['location'], task['table'], fields, task['source_format'], task['max_bad_records'], task['field_delimiter'], task['encoding'], task['ignore_unknown_values'], task['allow_quoted_newlines'], ] str = elements.map(&:to_s).join('') md5 = Digest::MD5.hexdigest(str) "embulk_load_job_#{md5}" end |
.deep_symbolize_keys(obj) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/embulk/output/bigquery/helper.rb', line 58 def self.deep_symbolize_keys(obj) if obj.is_a?(Hash) obj.inject({}) do |, (key, value)| [(key.to_sym rescue key) || key] = deep_symbolize_keys(value) end elsif obj.is_a?(Array) obj.map {|value| deep_symbolize_keys(value) } else obj end end |
.field_partitioning?(task) ⇒ Boolean
10 11 12 |
# File 'lib/embulk/output/bigquery/helper.rb', line 10 def self.field_partitioning?(task) (task['time_partitioning'] || {}).key?('field') end |
.fields_from_embulk_schema(task, schema) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/embulk/output/bigquery/helper.rb', line 42 def self.fields_from_embulk_schema(task, schema) = self.(task['column_options']) schema.map do |column| column_name = column[:name] embulk_type = column[:type] column_option = [column_name] || {} {}.tap do |field| field[:name] = column_name field[:type] = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase field[:mode] = column_option['mode'] if column_option['mode'] field[:fields] = deep_symbolize_keys(column_option['fields']) if column_option['fields'] field[:description] = column_option['description'] if column_option['description'] end end end |
.has_partition_decorator?(table_name) ⇒ Boolean
14 15 16 |
# File 'lib/embulk/output/bigquery/helper.rb', line 14 def self.has_partition_decorator?(table_name) !!(table_name =~ PARTITION_DECORATOR_REGEXP) end |