Class: Embulk::Output::Bigquery::ValueConverterFactory

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/output/bigquery/value_converter_factory.rb

Defined Under Namespace

Classes: NotSupportedType, TypeCastError

Constant Summary collapse

DEFAULT_TIMESTAMP_FORMAT =
"%Y-%m-%d %H:%M:%S.%6N"
DEFAULT_TIMEZONE =

BigQuery timestamp format

"UTC"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(embulk_type, type = nil, timestamp_format: nil, timezone: nil, strict: nil, default_timestamp_format: DEFAULT_TIMESTAMP_FORMAT, default_timezone: DEFAULT_TIMEZONE) ⇒ ValueConverterFactory

Returns a new instance of ValueConverterFactory.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 45

def initialize(
  embulk_type, type = nil,
  timestamp_format: nil, timezone: nil, strict: nil,
  default_timestamp_format: DEFAULT_TIMESTAMP_FORMAT,
  default_timezone: DEFAULT_TIMEZONE
)
  @embulk_type      = embulk_type
  @type             = (type || Helper.bq_type_from_embulk_type(embulk_type)).upcase
  @timestamp_format = timestamp_format
  @default_timestamp_format = default_timestamp_format
  @timezone         = timezone || default_timezone
  @zone_offset      = TimeWithZone.zone_offset(@timezone)
  @strict           = strict.nil? ? true : strict
end

Instance Attribute Details

#embulk_typeObject (readonly)

Returns the value of attribute embulk_type.



43
44
45
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 43

def embulk_type
  @embulk_type
end

#strictObject (readonly)

Returns the value of attribute strict.



43
44
45
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 43

def strict
  @strict
end

#timestamp_formatObject (readonly)

Returns the value of attribute timestamp_format.



43
44
45
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 43

def timestamp_format
  @timestamp_format
end

#timezoneObject (readonly)

Returns the value of attribute timezone.



43
44
45
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 43

def timezone
  @timezone
end

#typeObject (readonly)

Returns the value of attribute type.



43
44
45
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 43

def type
  @type
end

#zone_offsetObject (readonly)

Returns the value of attribute zone_offset.



43
44
45
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 43

def zone_offset
  @zone_offset
end

Class Method Details

.create_converters(task, schema) ⇒ Array

Returns an arary whose key is column_index, and value is its converter (Proc).

Parameters:

  • task (Hash)
  • schema (Schema)

    embulk defined column types

Options Hash (task):

  • default_timestamp_format (String)
  • default_timezone (String)
  • column_options (Hash)

    user defined column types

Returns:

  • (Array)

    an arary whose key is column_index, and value is its converter (Proc)



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 24

def self.create_converters(task, schema)
  column_options_map       = Helper.column_options_map(task['column_options'])
  default_timestamp_format = task['default_timestamp_format'] || DEFAULT_TIMESTAMP_FORMAT
  default_timezone         = task['default_timezone'] || DEFAULT_TIMEZONE
  schema.map do |column|
    column_name   = column[:name]
    embulk_type   = column[:type]
    column_option = column_options_map[column_name] || {}
    self.new(
      embulk_type, column_option['type'],
      timestamp_format: column_option['timestamp_format'],
      timezone: column_option['timezone'],
      strict: column_option['strict'],
      default_timestamp_format: default_timestamp_format,
      default_timezone: default_timezone,
    ).create_converter
  end
end

Instance Method Details

#boolean_converterObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 90

def boolean_converter
  case type
  when 'BOOLEAN'
    Proc.new {|val|
      val
    }
  when 'STRING'
    Proc.new {|val|
      next nil if val.nil?
      val.to_s
    }
  else
    raise NotSupportedType, "cannot take column type #{type} for boolean column"
  end
end

#create_converterObject



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 60

def create_converter
  case embulk_type
  when :boolean   then boolean_converter
  when :long      then long_converter
  when :double    then double_converter
  when :string    then string_converter
  when :timestamp then timestamp_converter
  when :json      then json_converter
  else raise NotSupportedType, "embulk type #{embulk_type} is not supported"
  end
end

#double_converterObject



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 139

def double_converter
  case type
  when 'INTEGER'
    Proc.new {|val|
      next nil if val.nil?
      val.to_i
    }
  when 'FLOAT'
    Proc.new {|val|
      val
    }
  when 'STRING'
    Proc.new {|val|
      next nil if val.nil?
      val.to_s
    }
  when 'TIMESTAMP'
    Proc.new {|val|
      next nil if val.nil?
      val # BigQuery supports UNIX timestamp
    }
  else
    raise NotSupportedType, "cannot take column type #{type} for double column"
  end
end

#json_converterObject

ToDo: recursive conversion



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 293

def json_converter
  case type
  when 'STRING'
    Proc.new {|val|
      next nil if val.nil?
      val.to_json
    }
  when 'RECORD'
    Proc.new {|val|
      val
    }
  when 'JSON'
    Proc.new {|val|
      val
    }
  else
    raise NotSupportedType, "cannot take column type #{type} for json column"
  end
end

#long_converterObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 106

def long_converter
  case type
  when 'BOOLEAN'
    Proc.new {|val|
      next nil if val.nil?
      next true if val == 1
      next false if val == 0
      raise_typecast_error(val)
    }
  when 'INTEGER'
    Proc.new {|val|
      val
    }
  when 'FLOAT'
    Proc.new {|val|
      next nil if val.nil?
      val.to_f
    }
  when 'STRING'
    Proc.new {|val|
      next nil if val.nil?
      val.to_s
    }
  when 'TIMESTAMP'
    Proc.new {|val|
      next nil if val.nil?
      val # BigQuery supports UNIX timestamp
    }
  else
    raise NotSupportedType, "cannot take column type #{type} for long column"
  end
end

#raise_typecast_error(val) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 80

def raise_typecast_error(val)
  message = "cannot cast #{@embulk_type} `#{val}` to #{@type}"
  if @strict
    raise TypeCastError, message
  else
    Embulk.logger.trace { message }
    return nil
  end
end

#string_converterObject



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 165

def string_converter
  case type
  when 'BOOLEAN'
    Proc.new {|val|
      next nil if val.nil?
      next true if val == 'true'.freeze
      next false if val == 'false'.freeze
      raise_typecast_error(val)
    }
  when 'INTEGER'
    Proc.new {|val|
      next nil if val.nil?
      with_typecast_error(val) do |val|
        Integer(val)
      end
    }
  when 'FLOAT'
    Proc.new {|val|
      next nil if val.nil?
      with_typecast_error(val) do |val|
        Float(val)
      end
    }
  when 'STRING'
    Proc.new {|val|
      val
    }
  when 'TIMESTAMP'
    if @timestamp_format
      Proc.new {|val|
        next nil if val.nil?
        with_typecast_error(val) do |val|
          TimeWithZone.set_zone_offset(Time.strptime(val, @timestamp_format), zone_offset).strftime("%Y-%m-%d %H:%M:%S.%6N %:z")
        end
      }
    else
      Proc.new {|val|
        next nil if val.nil?
        val # Users must care of BQ timestamp format
      }
    end
  when 'DATE'
    Proc.new {|val|
      next nil if val.nil?
      with_typecast_error(val) do |val|
        TimeWithZone.set_zone_offset(Time.parse(val), zone_offset).strftime("%Y-%m-%d")
      end
    }
  when 'DATETIME'
    if @timestamp_format
      Proc.new {|val|
        next nil if val.nil?
        with_typecast_error(val) do |val|
          Time.strptime(val, @timestamp_format).strftime("%Y-%m-%d %H:%M:%S.%6N")
        end
      }
    else
      Proc.new {|val|
        next nil if val.nil?
        val # Users must care of BQ timestamp format
      }
    end
  when 'TIME'
    # TimeWithZone doesn't affect any change to the time value
    Proc.new {|val|
      next nil if val.nil?
      with_typecast_error(val) do |val|
        TimeWithZone.set_zone_offset(Time.parse(val), zone_offset).strftime("%H:%M:%S.%6N")
      end
    }
  when 'RECORD'
    Proc.new {|val|
      next nil if val.nil?
      with_typecast_error(val) do |val|
        JSON.parse(val)
      end
    }
  else
    raise NotSupportedType, "cannot take column type #{type} for string column"
  end
end

#timestamp_converterObject



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 247

def timestamp_converter
  case type
  when 'INTEGER'
    Proc.new {|val|
      next nil if val.nil?
      val.to_i
    }
  when 'FLOAT'
    Proc.new {|val|
      next nil if val.nil?
      val.to_f
    }
  when 'STRING'
    _timestamp_format = @timestamp_format || @default_timestamp_format
    Proc.new {|val|
      next nil if val.nil?
      with_typecast_error(val) do |val|
        val.localtime(zone_offset).strftime(_timestamp_format)
      end
    }
  when 'TIMESTAMP'
    Proc.new {|val|
      next nil if val.nil?
      val.strftime("%Y-%m-%d %H:%M:%S.%6N %:z")
    }
  when 'DATE'
    Proc.new {|val|
      next nil if val.nil?
      val.localtime(zone_offset).strftime("%Y-%m-%d")
    }
  when 'DATETIME'
    Proc.new {|val|
      next nil if val.nil?
      val.localtime(zone_offset).strftime("%Y-%m-%d %H:%M:%S.%6N")
    }
  when 'TIME'
    Proc.new {|val|
      next nil if val.nil?
      val.localtime(zone_offset).strftime("%H:%M:%S.%6N")
    }
  else
    raise NotSupportedType, "cannot take column type #{type} for timestamp column"
  end
end

#with_typecast_error(val) ⇒ Object



72
73
74
75
76
77
78
# File 'lib/embulk/output/bigquery/value_converter_factory.rb', line 72

def with_typecast_error(val)
  begin
    yield(val)
  rescue => e
    raise_typecast_error(val)
  end
end