Class: LogStash::Inputs::Sqlite

Inherits:
Base show all
Defined in:
lib/logstash/inputs/sqlite.rb

Overview

Read rows from an sqlite database.

This is most useful in cases where you are logging directly to a table. Any tables being watched must have an ‘id’ column that is monotonically increasing.

All tables are read by default except:

  • ones matching ‘sqlite_%’ - these are internal/adminstrative tables for sqlite

  • ‘since_table’ - this is used by this plugin to track state.

## Example

% sqlite /tmp/example.db
sqlite> CREATE TABLE weblogs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ip STRING,
    request STRING,
    response INTEGER);
sqlite> INSERT INTO weblogs (ip, request, response) 
    VALUES ("1.2.3.4", "/index.html", 200);

Then with this logstash config:

input {
  sqlite {
    path => "/tmp/example.db"
    type => weblogs
  }
}
output {
  stdout {
    debug => true
  }
}

Sample output:

{
  "@source"      => "sqlite://sadness/tmp/x.db",
  "@tags"        => [],
  "@fields"      => {
    "ip"       => "1.2.3.4",
    "request"  => "/index.html",
    "response" => 200
  },
  "@timestamp"   => "2013-05-29T06:16:30.850Z",
  "@source_host" => "sadness",
  "@source_path" => "/tmp/x.db",
  "@message"     => "",
  "@type"        => "foo"
}

Constant Summary collapse

SINCE_TABLE =
:since_table

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes inherited from Base

#params, #threadable

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#initialize, #tag

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Inputs::Base

Instance Method Details

#get_all_tables(db) ⇒ Object



114
115
116
# File 'lib/logstash/inputs/sqlite.rb', line 114

def get_all_tables(db)
  return db["SELECT * FROM sqlite_master WHERE type = 'table' AND tbl_name != '#{SINCE_TABLE}' AND tbl_name NOT LIKE 'sqlite_%'"].map { |t| t[:name] }.select { |n| !@exclude_tables.include?(n) }
end

#get_n_rows_from_table(db, table, offset, limit) ⇒ Object



119
120
121
122
# File 'lib/logstash/inputs/sqlite.rb', line 119

def get_n_rows_from_table(db, table, offset, limit)
  dataset = db["SELECT * FROM #{table}"]
  return db["SELECT * FROM #{table} WHERE (id > #{offset}) ORDER BY 'id' LIMIT #{limit}"].map { |row| row }
end

#get_placeholder(db, table) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/logstash/inputs/sqlite.rb', line 87

def get_placeholder(db, table)
  since = db[SINCE_TABLE]
  x = since.where(:table => "#{table}")
  if x[:place].nil?
    init_placeholder(db, table) 
    return 0
  else
    @logger.debug("placeholder already exists, it is #{x[:place]}")
    return x[:place][:place]
  end
end

#init_placeholder(db, table) ⇒ Object



100
101
102
103
104
# File 'lib/logstash/inputs/sqlite.rb', line 100

def init_placeholder(db, table)
  @logger.debug("init placeholder for #{table}")
  since = db[SINCE_TABLE]
  since.insert(:table => table, :place => 0)
end

#init_placeholder_table(db) ⇒ Object



75
76
77
78
79
80
81
82
83
84
# File 'lib/logstash/inputs/sqlite.rb', line 75

def init_placeholder_table(db)
  begin
    db.create_table SINCE_TABLE do 
      String :table
      Int    :place
    end
  rescue
    @logger.debug("since tables already exists")
  end
end

#registerObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/logstash/inputs/sqlite.rb', line 125

def register
  require "sequel"
  require "jdbc/sqlite3" 
  @host = Socket.gethostname
  @logger.info("Registering sqlite input", :database => @path)
  @db = Sequel.connect("jdbc:sqlite:#{@path}") 
  @tables = get_all_tables(@db)
  @table_data = {}
  @tables.each do |table|
    init_placeholder_table(@db)
    last_place = get_placeholder(@db, table)
    @table_data[table] = { :name => table, :place => last_place }
  end
end

#run(queue) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/logstash/inputs/sqlite.rb', line 141

def run(queue)
  sleep_min = 0.01
  sleep_max = 5
  sleeptime = sleep_min

  begin
    @logger.debug("Tailing sqlite db", :path => @path)
    loop do
      count = 0
      @table_data.each do |k, table|
        table_name = table[:name]
        offset = table[:place]
        @logger.debug("offset is #{offset}", :k => k, :table => table_name)
        rows = get_n_rows_from_table(@db, table_name, offset, @batch)
        count += rows.count
        rows.each do |row| 
          event = LogStash::Event.new("host" => @host, "db" => @db)
          decorate(event)
          # store each column as a field in the event.
          row.each do |column, element|
            next if column == :id
            event[column.to_s] = element
          end
          queue << event
          @table_data[k][:place] = row[:id]
        end
        # Store the last-seen row in the database
        update_placeholder(@db, table_name, @table_data[k][:place])
      end

      if count == 0
        # nothing found in that iteration
        # sleep a bit
        @logger.debug("No new rows. Sleeping.", :time => sleeptime)
        sleeptime = [sleeptime * 2, sleep_max].min
        sleep(sleeptime)
      else
        sleeptime = sleep_min
      end
    end # loop
  end # begin/rescue
end

#update_placeholder(db, table, place) ⇒ Object



107
108
109
110
111
# File 'lib/logstash/inputs/sqlite.rb', line 107

def update_placeholder(db, table, place)
  @logger.debug("set placeholder to #{place}")
  since = db[SINCE_TABLE]
  since.where(:table => table).update(:place => place)
end