Class: LogStash::Inputs::Jdbc

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::Jdbc
Defined in:
lib/logstash/inputs/jdbc.rb

Overview

This plugin was created as a way to ingest data in any database with a JDBC interface into Logstash. You can periodically schedule ingestion using a cron syntax (see ‘schedule` setting) or run the query one time to load data into Logstash. Each row in the resultset becomes a single event. Columns in the resultset are converted into fields in the event.

Drivers

This plugin does not come packaged with JDBC driver libraries. The desired jdbc driver library must be explicitly passed in to the plugin using the ‘jdbc_driver_library` configuration option.

Scheduling

Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by github.com/jmettraux/rufus-scheduler[rufus-scheduler]. The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).

Examples:

|========================================================== | ‘* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. | `0 * * * *` | will execute on the 0th minute of every hour every day. | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |==========================================================

Further documentation describing this syntax can be found github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].

State

The plugin will persist the ‘sql_last_value` parameter in the form of a metadata file stored in the configured `last_run_metadata_path`. Upon query execution, this file will be updated with the current value of `sql_last_value`. Next time the pipeline starts up, this value will be updated by reading from the file. If `clean_run` is set to true, this value will be ignored and `sql_last_value` will be set to Jan 1, 1970, or 0 if `use_column_value` is true, as if no query has ever been executed.

Dealing With Large Result-sets

Many JDBC drivers use the ‘fetch_size` parameter to limit how many results are pre-fetched at a time from the cursor into the client’s cache before retrieving more results from the result-set. This is configured in this plugin using the ‘jdbc_fetch_size` configuration option. No fetch size is set by default in this plugin, so the specific driver’s default size will be used.

Usage:

Here is an example of setting up the plugin to fetch data from a MySQL database. First, we place the appropriate JDBC driver library in our current path (this can be placed anywhere on your filesystem). In this example, we connect to the ‘mydb’ database using the user: ‘mysql’ and wish to input all rows in the ‘songs’ table that match a specific artist. The following examples demonstrates a possible Logstash configuration for this. The ‘schedule` option in this example will instruct the plugin to execute this input statement on the minute, every minute.

source,ruby

input {

jdbc {
  jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
  jdbc_user => "mysql"
  parameters => { "favorite_artist" => "Beethoven" }
  schedule => "* * * * *"
  statement => "SELECT * from songs where artist = :favorite_artist"
}

}


Configuring SQL statement

A sql statement is required for this input. This can be passed-in via a statement option in the form of a string, or read from a file (‘statement_filepath`). File option is typically used when the SQL statement is large or cumbersome to supply in the config. The file option only supports one SQL statement. The plugin will only accept one of the options. It cannot read a statement from a file as well as from the `statement` configuration parameter.

Predefined Parameters

Some parameters are built-in and can be used from within your queries. Here is the list:

|========================================================== |sql_last_value | The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if ‘use_column_value` is true and `tracking_column` is set. It is updated accordingly after subsequent queries are run. |==========================================================

Example:

source,ruby

input {

jdbc {
  statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
  use_column_value => true
  tracking_column => id
  # ... other configuration bits
}

}


Instance Method Summary collapse

Instance Method Details

#registerObject



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/logstash/inputs/jdbc.rb', line 188

def register
  @logger = self.logger
  require "rufus/scheduler"
  prepare_jdbc_connection

  # Raise an error if @use_column_value is true, but no @tracking_column is set
  if @use_column_value
    if @tracking_column.nil?
      raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
    end
  end

  @enable_encoding = !@charset.nil? || !@columns_charset.empty?

  # load sql_last_value from file if exists
  if @clean_run && File.exist?(@last_run_metadata_path)
    File.delete(@last_run_metadata_path)
  elsif File.exist?(@last_run_metadata_path)
    @sql_last_value = YAML.load(File.read(@last_run_metadata_path))
  end

  unless @statement.nil? ^ @statement_filepath.nil?
    raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
  end

  @statement = File.read(@statement_filepath) if @statement_filepath

  if (@jdbc_password_filepath and @jdbc_password)
    raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
  end

  @jdbc_password = File.read(@jdbc_password_filepath).strip if @jdbc_password_filepath

  if enable_encoding?
    @converters = {}
    @columns_charset.each do |column_name, encoding|
      @converters[encoding] = LogStash::Util::Charset.new(encoding)
    end
    @converters[@charset] = LogStash::Util::Charset.new(@charset) if @charset
  end
end

#run(queue) ⇒ Object

def register



230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/logstash/inputs/jdbc.rb', line 230

def run(queue)
  if @schedule
    @scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
    @scheduler.cron @schedule do
      execute_query(queue)
      update_state_file
    end

    @scheduler.join
  else
    execute_query(queue)
    update_state_file
  end
end

#stopObject

def run



245
246
247
248
249
# File 'lib/logstash/inputs/jdbc.rb', line 245

def stop
  @scheduler.stop if @scheduler

  close_jdbc_connection
end