Class: Fluent::Plugin::BigQueryBaseOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_bigquery_base.rb

Overview

This class is abstract class

Direct Known Subclasses

BigQueryInsertOutput, BigQueryLoadOutput

Instance Method Summary collapse

Instance Method Details

#clustering_fieldsObject

Clustering



77
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 77

config_param :clustering_fields, :array, default: nil

#configure(conf) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 84

def configure(conf)
  super

  case @auth_method
  when :private_key
    unless @email && @private_key_path
      raise Fluent::ConfigError, "'email' and 'private_key_path' must be specified if auth_method == 'private_key'"
    end
  when :compute_engine
    # Do nothing
  when :json_key
    unless @json_key
      raise Fluent::ConfigError, "'json_key' must be specified if auth_method == 'json_key'"
    end
  when :application_default
    # Do nothing
  else
    raise Fluent::ConfigError, "unrecognized 'auth_method': #{@auth_method}"
  end

  unless @table.nil? ^ @tables.nil?
    raise Fluent::ConfigError, "'table' or 'tables' must be specified, and both are invalid"
  end

  @tablelist = @tables ? @tables : [@table]

  @table_schema = Fluent::BigQuery::RecordSchema.new('record')
  if @schema
    @table_schema.load_schema(@schema)
  end

  formatter_config = conf.elements("format")[0]
  @formatter = formatter_create(usage: 'out_bigquery_for_insert', default_type: 'json', conf: formatter_config)
end

#fetch_schema(metadata) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 187

def fetch_schema()
  table_id = nil
  project = extract_placeholders(@project, )
  dataset = extract_placeholders(@dataset, )
  table_id = fetch_schema_target_table()

  if Fluent::Engine.now - @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] > @schema_cache_expire
    schema = writer.fetch_schema(project, dataset, table_id)

    if schema
      table_schema = Fluent::BigQuery::RecordSchema.new("record")
      table_schema.load_schema(schema)
      @fetched_schemas["#{project}.#{dataset}.#{table_id}"] = table_schema
    else
      if @fetched_schemas["#{project}.#{dataset}.#{table_id}"].nil?
        raise "failed to fetch schema from bigquery"
      else
        log.warn "#{table_id} uses previous schema"
      end
    end

    @last_fetch_schema_time["#{project}.#{dataset}.#{table_id}"] = Fluent::Engine.now
  end

  @fetched_schemas["#{project}.#{dataset}.#{table_id}"]
end

#fetch_schema_target_table(metadata) ⇒ Object



214
215
216
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 214

def fetch_schema_target_table()
  extract_placeholders(@fetch_schema_table || @tablelist[0], )
end

#format(tag, time, record) ⇒ Object

Formatter



80
81
82
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 80

config_section :format do
  config_set_default :@type, 'json'
end

#get_schema(project, dataset, metadata) ⇒ Object



233
234
235
236
237
238
239
240
241
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 233

def get_schema(project, dataset, )
  if @fetch_schema
    @fetched_schemas["#{project}.#{dataset}.#{fetch_schema_target_table()}"] || fetch_schema()
  elsif @schema_path
    @read_schemas[read_schema_target_path()] || read_schema()
  else
    @table_schema
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 129

def multi_workers_ready?
  true
end

#read_schema(metadata) ⇒ Object



218
219
220
221
222
223
224
225
226
227
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 218

def read_schema()
  schema_path = read_schema_target_path()

  unless @read_schemas[schema_path]
    table_schema = Fluent::BigQuery::RecordSchema.new("record")
    table_schema.load_schema(MultiJson.load(File.read(schema_path)))
    @read_schemas[schema_path] = table_schema
  end
  @read_schemas[schema_path]
end

#read_schema_target_path(metadata) ⇒ Object



229
230
231
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 229

def read_schema_target_path()
  extract_placeholders(@schema_path, )
end

#request_timeout_secObject

Timeout request_timeout_sec

Bigquery API response timeout

request_open_timeout_sec

Bigquery API connection, and request timeout


68
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 68

config_param :request_timeout_sec, :time, default: nil

#startObject



119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 119

def start
  super

  @tables_queue = @tablelist.shuffle
  @tables_mutex = Mutex.new
  @fetched_schemas = {}
  @last_fetch_schema_time = Hash.new(0)
  @read_schemas = {}
end

#time_partitioning_typeObject

Partitioning



72
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 72

config_param :time_partitioning_type, :enum, list: [:day, :hour], default: nil

#write(chunk) ⇒ Object



184
185
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 184

def write(chunk)
end

#writerObject



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/fluent/plugin/out_bigquery_base.rb', line 133

def writer
  @writer ||= Fluent::BigQuery::Writer.new(@log, @auth_method,
    private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
    email: @email,
    json_key: @json_key,
    location: @location,
    source_format: @source_format,
    skip_invalid_rows: @skip_invalid_rows,
    ignore_unknown_values: @ignore_unknown_values,
    max_bad_records: @max_bad_records,
    allow_retry_insert_errors: @allow_retry_insert_errors,
    prevent_duplicate_load: @prevent_duplicate_load,
    auto_create_table: @auto_create_table,
    time_partitioning_type: @time_partitioning_type,
    time_partitioning_field: @time_partitioning_field,
    time_partitioning_expiration: @time_partitioning_expiration,
    require_partition_filter: @require_partition_filter,
    clustering_fields: @clustering_fields,
    timeout_sec: @request_timeout_sec,
    open_timeout_sec: @request_open_timeout_sec,
  )
end