Class: LogStash::Outputs::GoogleBigQuery

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/google_bigquery.rb

Overview

b - BigQuery charges for storage and for queries, depending on how much data it reads to perform a query. These are other aspects to consider when considering the date pattern which will be used to create new tables and also how to compose the queries when using BQ. For more info on BigQuery Pricing, please access: developers.google.com/bigquery/pricing

USAGE: This is an example of logstash config:

output {

google_bigquery {
  project_id => "folkloric-guru-278"                        (required)
  dataset => "logs"                                         (required)
  csv_schema => "path:STRING,status:INTEGER,score:FLOAT"    (required)
  key_path => "/path/to/privatekey.p12"                     (required)
  key_password => "notasecret"                              (optional)
  service_account => "[email protected]"   (required)
  temp_directory => "/tmp/logstash-bq"                      (optional)
  temp_file_prefix => "logstash_bq"                         (optional)
  date_pattern => "%Y-%m-%dT%H:00"                          (optional)
  flush_interval_secs => 2                                  (optional)
  uploader_interval_secs => 60                              (optional)
  deleter_interval_secs => 60                               (optional)
}

}

Improvements TODO list:

  • Refactor common code between Google BQ and GCS plugins.

  • Turn Google API code into a Plugin Mixin (like AwsConfig).

  • There’s no recover method, so if logstash/plugin crashes, files may not

be uploaded to BQ.

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Outputs::Base

Instance Method Details

#receive(event) ⇒ Object



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/logstash/outputs/google_bigquery.rb', line 179

def receive(event)
  return unless output?(event)

  @logger.debug("BQ: receive method called", :event => event)

  # Message must be written as json
  message = event.to_json
  # Remove "@" from property names
  message = message.gsub(/\"@(\w+)\"/, '"\1"')

  new_base_path = get_base_path()

  # Time to roll file based on the date pattern? Or are we due to upload it to BQ?
  if (@current_base_path != new_base_path || Time.now - @last_file_time >= @uploader_interval_secs)
    @logger.debug("BQ: log file will be closed and uploaded",
                  :filename => File.basename(@temp_file.to_path),
                  :size => @temp_file.size.to_s,
                  :uploader_interval_secs => @uploader_interval_secs.to_s)
    # Close alone does not guarantee that data is physically written to disk,
    # so flushing it before.
    @temp_file.fsync()
    @temp_file.close()
    initialize_next_log()
  end

  @temp_file.write(message)
  @temp_file.write("\n")

  sync_log_file()

  @logger.debug("BQ: event appended to log file",
                :filename => File.basename(@temp_file.to_path))
end

#registerObject



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/logstash/outputs/google_bigquery.rb', line 139

def register
  require 'csv'
  require "fileutils"
  require "thread"

  @logger.debug("BQ: register plugin")

  @fields = Array.new

  CSV.parse(@csv_schema.gsub('\"', '""')).flatten.each do |field|
    temp = field.strip.split(":")

    # Check that the field in the schema follows the format (<name>:<value>)
    if temp.length != 2
      raise "BigQuery schema must follow the format <field-name>:<field-value>"
    end

    @fields << { "name" => temp[0], "type" => temp[1] }
  end

  # Check that we have at least one field in the schema
  if @fields.length == 0
    raise "BigQuery schema must contain at least one field"
  end

  @json_schema = { "fields" => @fields }

  @upload_queue = Queue.new
  @delete_queue = Queue.new
  @last_flush_cycle = Time.now
  initialize_temp_directory()
  initialize_current_log()
  initialize_google_client()
  initialize_uploader()
  initialize_deleter()
end

#teardownObject



214
215
216
217
218
219
# File 'lib/logstash/outputs/google_bigquery.rb', line 214

def teardown
  @logger.debug("BQ: teardown method called")

  @temp_file.flush()
  @temp_file.close()
end