SQL input plugin for Fluentd event collector

Overview

This sql input plugin reads records from a RDBMS periodically. Thus you can copy tables to other storages through Fluentd.

How does it work?

This plugin runs following SQL periodically:

SELECT * FROM table WHERE update_column > last_update_column_value ORDER BY update_column ASC LIMIT 500

What you need to configure is update_column. The column should be an incremental column (such as AUTO_ INCREMENT primary key) so that this plugin reads newly INSERTed rows. Alternatively, you can use a column incremented every time when you update the row (such as last_updated_at column) so that this plugin reads the UPDATEd rows as well. If you omit to set update_column parameter, it uses primary key.

It stores last selected rows to a file (named state_file) to not forget the last row when Fluentd restarts.

Configuration

<source>
  type sql

  host rdb_host
  database rdb_database
  adapter mysql2_or_postgresql_etc
  username myusername
  password mypassword

  tag_prefix my.rdb  # optional, but recommended

  select_interval 60s  # optional
  select_limit 500     # optional

  state_file /var/run/fluentd/sql_state

  <table>
    table table1
    tag table1  # optional
    update_column update_col1
    time_column time_col2  # optional
  </table>

  <table>
    table table2
    tag table2  # optional
    update_column updated_at
    time_column updated_at  # optional
  </table>

  # detects all tables instead of <table> sections
  #all_tables
</source>
  • host RDBMS host
  • port RDBMS port
  • database RDBMS database name
  • adapter RDBMS driver name (mysql2 for MySQL, postgresql for PostgreSQL, etc.)
  • user RDBMS login user name
  • password RDBMS login password
  • tag_prefix prefix of tags of events. actual tag will be this_tag_prefix.tables_tag (optional)
  • select_interval interval to run SQLs (optional)
  • select_limit LIMIT of number of rows for each SQL (optional)
  • state_file path to a file to store last rows
  • all_tables reads all tables instead of configuring each tables in <table> sections

<table> sections:

  • tag tag name of events (optional; default value is table name)
  • table RDBM table name
  • update_column: see above description
  • time_column (optional): if this option is set, this plugin uses this column's value as the the event's time. Otherwise it uses current time.

Limitation

You should make sure target tables have index (and/or partitions) on the update_column. Otherwise SELECT causes full table scan and serious performance problem.

You can't replicate DELETEd rows.