Class: Fluent::Plugin::BigQueryOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery.rb

Defined Under Namespace

Modules: InsertImplementation, LoadImplementation

Constant Summary collapse

REGEXP_MAX_NUM =
10

Instance Method Summary collapse

Constructor Details

#initializeBigQueryOutput

Table types developers.google.com/bigquery/docs/tables

type - The following data types are supported; see Data Formats for details on each data type: STRING INTEGER FLOAT BOOLEAN RECORD A JSON object, used when importing nested records. This type is only available when using JSON source files.

mode - Whether a field can be null. The following values are supported: NULLABLE - The cell can be null. REQUIRED - The cell cannot be null. REPEATED - Zero or more repeated simple or nested subfields. This mode is only supported when using JSON source files.



181
182
183
184
185
186
187
188
189
# File 'lib/fluent/plugin/out_bigquery.rb', line 181

def initialize
  super
  require 'multi_json'
  require 'google/apis/bigquery_v2'
  require 'googleauth'

  # MEMO: signet-0.6.1 depend on Farady.default_connection
  Faraday.default_connection.options.timeout = 60
end

Instance Method Details

#configure(conf) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/fluent/plugin/out_bigquery.rb', line 191

def configure(conf)
  if conf["method"] == "load"
    configure_for_load(conf)
  else
    configure_for_insert(conf)
  end
  super

  case @method
  when :insert
    extend(InsertImplementation)
  when :load
    raise Fluent::ConfigError, "'template_suffix' is for only `insert` mode, instead use 'fetch_schema_table' and formatted table name" if @template_suffix
    extend(LoadImplementation)
  end

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  @writers = {}

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables : [@table]

  @table_schema = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @table_schema.load_schema(@schema)
  end
  if @schema_path
    @table_schema.load_schema(MultiJson.load(File.read(@schema_path)))
  end

  warn "[DEPRECATION] `replace_record_key` param is deprecated. Please use filter_record_transformer or fluent-plugin-record-reformer" if @replace_record_key

  @regexps = {}
  (1..REGEXP_MAX_NUM).each do |i|
    next unless conf["replace_record_key_regexp#{i}"]
    regexp, replacement = conf["replace_record_key_regexp#{i}"].split(/ /, 2)
    raise ConfigError, "replace_record_key_regexp#{i} does not contain 2 parameters" unless replacement
    raise ConfigError, "replace_record_key_regexp#{i} contains a duplicated key, #{regexp}" if @regexps[regexp]
    @regexps[regexp] = replacement
  end

  if @insert_id_field
    insert_id_keys = @insert_id_field.split('.')
    @get_insert_id = ->(record) {
      insert_id_keys.inject(record) {|h, k| h[k] }
    }
  else
    @get_insert_id = nil
  end

  placeholder_params = "project=#{@project}/dataset=#{@dataset}/table=#{@tablelist.join(",")}/fetch_schema_table=#{@fetch_schema_table}/template_suffix=#{@template_suffix}"
  placeholder_validate!(:bigquery, placeholder_params)
end

#configure_for_insert(conf) ⇒ Object

default for insert

Raises:

  • (ConfigError)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/out_bigquery.rb', line 23

def configure_for_insert(conf)
  raise ConfigError unless conf["method"].nil? || conf["method"] == "insert"

  formatter_config = conf.elements("format")[0]
  if formatter_config && formatter_config['@type'] != "json"
    log.warn "`insert` mode supports only json formatter."
    formatter_config['@type'] = nil
  end
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', type: 'json', conf: formatter_config)

  buffer_config = conf.elements("buffer")[0]
  if buffer_config
    buffer_config["@type"]                       = "memory"      unless buffer_config["@type"]
    buffer_config["flush_mode"]                  = :interval     unless buffer_config["flush_mode"]
    buffer_config["flush_interval"]              = 0.25          unless buffer_config["flush_interval"]
    buffer_config["flush_thread_interval"]       = 0.05          unless buffer_config["flush_thread_interval"]
    buffer_config["flush_thread_burst_interval"] = 0.05          unless buffer_config["flush_thread_burst_interval"]
    buffer_config["chunk_limit_size"]            = 1 * 1024 ** 2 unless buffer_config["chunk_limit_size"] # 1MB
    buffer_config["total_limit_size"]            = 1 * 1024 ** 3 unless buffer_config["total_limit_size"] # 1GB
    buffer_config["chunk_records_limit"]         = 500           unless buffer_config["chunk_records_limit"]
  end
end

#configure_for_load(conf) ⇒ Object

default for loads

Raises:

  • (ConfigError)


47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_bigquery.rb', line 47

def configure_for_load(conf)
  raise ConfigError unless conf["method"] == "load"

  formatter_config = conf.elements("format")[0]
  @formatter = formatter_create(usage: 'out_bigquery_for_load', conf: formatter_config, default_type: 'json')

  buffer_config = conf.elements("buffer")[0]
  return unless buffer_config
  buffer_config["@type"]                       = "file"         unless buffer_config["@type"]
  buffer_config["flush_mode"]                  = :interval      unless buffer_config["flush_mode"]
  buffer_config["chunk_limit_size"]            = 1 * 1024 ** 3  unless buffer_config["chunk_limit_size"] # 1GB
  buffer_config["total_limit_size"]            = 32 * 1024 ** 3 unless buffer_config["total_limit_size"] # 32GB
end

#fetch_schema(metadata) ⇒ Object



343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/fluent/plugin/out_bigquery.rb', line 343

def fetch_schema()
  table_id = nil
  project = extract_placeholders(@project, )
  dataset = extract_placeholders(@dataset, )
  table_id = fetch_schema_target_table()

  if Fluent::Engine.now - @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] > @schema_cache_expire
    schema = writer.fetch_schema(project, dataset, table_id)

    if schema
      table_schema = Fluent::BigQuery::RecordSchema.new("record")
      table_schema.load_schema(schema)
      @fetched_schemas["#{project}.#{dataset}.#{table_id}"] = table_schema
    else
      if @fetched_schemas["#{project}.#{dataset}.#{table_id}"].empty?
        raise "failed to fetch schema from bigquery"
      else
        log.warn "#{table_id} uses previous schema"
      end
    end

    @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] = Fluent::Engine.now
  end

  @fetched_schemas["#{project}.#{dataset}.#{table_id}"]
end

#fetch_schema_target_table(metadata) ⇒ Object



370
371
372
# File 'lib/fluent/plugin/out_bigquery.rb', line 370

def fetch_schema_target_table()
  extract_placeholders(@fetch_schema_table || @tablelist[0], )
end

#format(tag, time, record) ⇒ Object

Formatter



162
163
164
# File 'lib/fluent/plugin/out_bigquery.rb', line 162

config_section :format do
  config_set_default :@type, 'json'
end

#get_schema(project, dataset, metadata) ⇒ Object



374
375
376
377
378
379
380
# File 'lib/fluent/plugin/out_bigquery.rb', line 374

def get_schema(project, dataset, )
  if @fetch_schema
    @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table(metadata)}"] || fetch_schema()
  else
    @table_schema
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


273
274
275
# File 'lib/fluent/plugin/out_bigquery.rb', line 273

def multi_workers_ready?
  true
end

#replace_record_key(record) ⇒ Object



296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/fluent/plugin/out_bigquery.rb', line 296

def replace_record_key(record)
  new_record = {}
  record.each do |key, _|
    new_key = key
    @regexps.each do |regexp, replacement|
      new_key = new_key.gsub(/#{regexp}/, replacement)
    end
    new_key = new_key.gsub(/\W/, '')
    new_record.store(new_key, record[key])
  end
  new_record
end

#request_timeout_secObject

Timeout request_timeout_sec

Bigquery API response timeout

request_open_timeout_sec

Bigquery API connection, and request timeout


154
# File 'lib/fluent/plugin/out_bigquery.rb', line 154

config_param :request_timeout_sec, :time, default: nil

#startObject



264
265
266
267
268
269
270
271
# File 'lib/fluent/plugin/out_bigquery.rb', line 264

def start
  super

  @tables_queue = @tablelist.shuffle
  @tables_mutex = Mutex.new
  @fetched_schemas = {}
  @last_fetch_schema_time = Hash.new(0)
end

#time_partitioning_typeObject

Partitioning



158
# File 'lib/fluent/plugin/out_bigquery.rb', line 158

config_param :time_partitioning_type, :enum, list: [:day], default: nil

#write(chunk) ⇒ Object



334
335
336
337
338
339
340
341
# File 'lib/fluent/plugin/out_bigquery.rb', line 334

def write(chunk)
  table_id_format = @tables_mutex.synchronize do
    t = @tables_queue.shift
    @tables_queue.push t
    t
  end
  _write(chunk, table_id_format)
end

#writerObject



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/fluent/plugin/out_bigquery.rb', line 277

def writer
  @writers["thread-#{Thread.current.object_id}"] ||= Fluent::BigQuery::Writer.new(@log, @auth_method, {
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
    source_format: @source_format,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
    max_bad_records: @max_bad_records,
    allow_retry_insert_errors: @allow_retry_insert_errors,
    prevent_duplicate_load: @prevent_duplicate_load,
    auto_create_table: @auto_create_table,
    time_partitioning_type: @time_partitioning_type,
    time_partitioning_expiration: @time_partitioning_expiration,
    timeout_sec: @request_timeout_sec,
    open_timeout_sec: @request_open_timeout_sec,
  })
end