Class: LogStash::Outputs::GoogleBigQuery
- Defined in:
- lib/logstash/outputs/google_bigquery.rb
Overview
b - BigQuery charges for storage and for queries, depending on how much data it reads to perform a query. These are other aspects to consider when considering the date pattern which will be used to create new tables and also how to compose the queries when using BQ. For more info on BigQuery Pricing, please access: developers.google.com/bigquery/pricing
USAGE: This is an example of logstash config:
output {
google_bigquery {
project_id => "folkloric-guru-278" (required)
dataset => "logs" (required)
csv_schema => "path:STRING,status:INTEGER,score:FLOAT" (required)
key_path => "/path/to/privatekey.p12" (required)
key_password => "notasecret" (optional)
service_account => "[email protected]" (required)
temp_directory => "/tmp/logstash-bq" (optional)
temp_file_prefix => "logstash_bq" (optional)
date_pattern => "%Y-%m-%dT%H:00" (optional)
flush_interval_secs => 2 (optional)
uploader_interval_secs => 60 (optional)
deleter_interval_secs => 60 (optional)
}
}
Improvements TODO list:
-
Refactor common code between Google BQ and GCS plugins.
-
Turn Google API code into a Plugin Mixin (like AwsConfig).
-
There’s no recover method, so if logstash/plugin crashes, files may not
be uploaded to BQ.
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
Methods inherited from Base
#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Outputs::Base
Instance Method Details
#receive(event) ⇒ Object
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/logstash/outputs/google_bigquery.rb', line 179 def receive(event) return unless output?(event) @logger.debug("BQ: receive method called", :event => event) # Message must be written as json = event.to_json # Remove "@" from property names = .gsub(/\"@(\w+)\"/, '"\1"') new_base_path = get_base_path() # Time to roll file based on the date pattern? Or are we due to upload it to BQ? if (@current_base_path != new_base_path || Time.now - @last_file_time >= @uploader_interval_secs) @logger.debug("BQ: log file will be closed and uploaded", :filename => File.basename(@temp_file.to_path), :size => @temp_file.size.to_s, :uploader_interval_secs => @uploader_interval_secs.to_s) # Close alone does not guarantee that data is physically written to disk, # so flushing it before. @temp_file.fsync() @temp_file.close() initialize_next_log() end @temp_file.write() @temp_file.write("\n") sync_log_file() @logger.debug("BQ: event appended to log file", :filename => File.basename(@temp_file.to_path)) end |
#register ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/logstash/outputs/google_bigquery.rb', line 139 def register require 'csv' require "fileutils" require "thread" @logger.debug("BQ: register plugin") @fields = Array.new CSV.parse(@csv_schema.gsub('\"', '""')).flatten.each do |field| temp = field.strip.split(":") # Check that the field in the schema follows the format (<name>:<value>) if temp.length != 2 raise "BigQuery schema must follow the format <field-name>:<field-value>" end @fields << { "name" => temp[0], "type" => temp[1] } end # Check that we have at least one field in the schema if @fields.length == 0 raise "BigQuery schema must contain at least one field" end @json_schema = { "fields" => @fields } @upload_queue = Queue.new @delete_queue = Queue.new @last_flush_cycle = Time.now initialize_temp_directory() initialize_current_log() initialize_google_client() initialize_uploader() initialize_deleter() end |
#teardown ⇒ Object
214 215 216 217 218 219 |
# File 'lib/logstash/outputs/google_bigquery.rb', line 214 def teardown @logger.debug("BQ: teardown method called") @temp_file.flush() @temp_file.close() end |