Class: Embulk::Output::Bigquery::Helper

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/bigquery/helper.rb

Constant Summary collapse

PARTITION_DECORATOR_REGEXP =
/\$.+\z/

Class Method Summary collapse

Class Method Details

.bq_type_from_embulk_type(embulk_type) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/embulk/output/bigquery/helper.rb', line 22

def self.bq_type_from_embulk_type(embulk_type)
  case embulk_type
  when :boolean then 'BOOLEAN'
  when :long then 'INTEGER'
  when :double then 'FLOAT'
  when :string then 'STRING'
  when :timestamp then 'TIMESTAMP'
  when :json then 'STRING' # NOTE: Default is not RECORD since it requires `fields`
  else raise ArgumentError, "embulk type #{embulk_type} is not supported"
  end
end

.chomp_partition_decorator(table_name) ⇒ Object



18
19
20
# File 'lib/embulk/output/bigquery/helper.rb', line 18

def self.chomp_partition_decorator(table_name)
  table_name.sub(PARTITION_DECORATOR_REGEXP, '')
end

.column_options_map(column_options) ⇒ Hash

ToDo: recursively map fields?

Returns:

  • (Hash)

    name => column_option.



36
37
38
39
40
# File 'lib/embulk/output/bigquery/helper.rb', line 36

def self.column_options_map(column_options)
  (column_options || {}).map do |column_option|
    [column_option['name'], column_option]
  end.to_h
end

.create_load_job_id(task, path, fields) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/embulk/output/bigquery/helper.rb', line 71

def self.create_load_job_id(task, path, fields)
  elements = [
    Digest::MD5.file(path).hexdigest,
    task['dataset'],
    task['location'],
    task['table'],
    fields,
    task['source_format'],
    task['max_bad_records'],
    task['field_delimiter'],
    task['encoding'],
    task['ignore_unknown_values'],
    task['allow_quoted_newlines'],
  ]

  str = elements.map(&:to_s).join('')
  md5 = Digest::MD5.hexdigest(str)
  "embulk_load_job_#{md5}"
end

.deep_symbolize_keys(obj) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/embulk/output/bigquery/helper.rb', line 58

def self.deep_symbolize_keys(obj)
  if obj.is_a?(Hash)
    obj.inject({}) do |options, (key, value)|
      options[(key.to_sym rescue key) || key] = deep_symbolize_keys(value)
      options
    end
  elsif obj.is_a?(Array)
    obj.map {|value| deep_symbolize_keys(value) }
  else
    obj
  end
end

.field_partitioning?(task) ⇒ Boolean

Returns:

  • (Boolean)


10
11
12
# File 'lib/embulk/output/bigquery/helper.rb', line 10

def self.field_partitioning?(task)
  (task['time_partitioning'] || {}).key?('field')
end

.fields_from_embulk_schema(task, schema) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/embulk/output/bigquery/helper.rb', line 42

def self.fields_from_embulk_schema(task, schema)
  column_options_map = self.column_options_map(task['column_options'])
  schema.map do |column|
    column_name   = column[:name]
    embulk_type   = column[:type]
    column_option = column_options_map[column_name] || {}
    {}.tap do |field|
      field[:name]        = column_name
      field[:type]        = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase
      field[:mode]        = column_option['mode'] if column_option['mode']
      field[:fields]      = deep_symbolize_keys(column_option['fields']) if column_option['fields']
      field[:description] = column_option['description'] if column_option['description']
    end
  end
end

.has_partition_decorator?(table_name) ⇒ Boolean

Returns:

  • (Boolean)


14
15
16
# File 'lib/embulk/output/bigquery/helper.rb', line 14

def self.has_partition_decorator?(table_name)
  !!(table_name =~ PARTITION_DECORATOR_REGEXP)
end