Class: Fluent::Plugin::PgStatStatementsInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_pg_stat_statements.rb

Overview

PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:

'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => 3239318621761098074

Instance Method Summary collapse

Instance Method Details

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 79

def record_for_row(row)
  query = row['query']

  # We record the query_length as it will help in understanding whether unparseable
  # queries are truncated.
  record = { 'queryid' => row['queryid'], 'query_length' => query&.length }

  return record unless query

  normalized = PgQuery.normalize(query)
  record['query'] = normalized

  record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key

  record
rescue PgQuery::ParseError
  record['query_unparseable'] = true

  record
end

#shutdownObject



54
55
56
57
58
59
60
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 54

def shutdown
  @stop_flag = true

  # Interrupt thread and wait for it to finish
  Thread.new { @thread.run } if @thread
  @thread.join
end

#startObject



49
50
51
52
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 49

def start
  @stop_flag = false
  @thread = Thread.new(&method(:thread_main))
end

#thread_mainObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 62

def thread_main
  until @stop_flag
    sleep @interval
    break if @stop_flag

    begin
      with_connection do |conn|
        emit_statements_to_stream(conn)
      end
    rescue StandardError => e
      log.error 'unexpected error', error: e.message, error_class: e.class
      log.error_backtrace e.backtrace
    end
  end
end