Class: Fluent::Plugin::PgStatStatementsInput

Inherits:
PollingPostgresInputPlugin show all
Defined in:
lib/fluent/plugin/in_pg_stat_statements.rb

Overview

PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:

'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => '3239318621761098074'

Constant Summary collapse

POSTGRES_SERVER_VERSION_QUERY =
"SELECT current_setting('server_version_num')"
PG12_STAT_STATEMENTS_QUERY =
<<-SQL
  SELECT queryid,
         query,
         calls,
         rows,
         total_time
    FROM public.pg_stat_statements
SQL
PG13_STAT_STATEMENTS_QUERY =
<<-SQL
  SELECT queryid,
         query,
         calls,
         rows,
         (total_plan_time + total_exec_time) total_time
    FROM public.pg_stat_statements
SQL

Instance Method Summary collapse

Methods inherited from PollingPostgresInputPlugin

#shutdown, #start, #thread_main

Constructor Details

#initializePgStatStatementsInput

Returns a new instance of PgStatStatementsInput.



52
53
54
55
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 52

def initialize
  super
  @postgres_server_version_num = nil
end

Instance Method Details

#emit_statements_to_stream(conn) ⇒ Object

Query the database and emit statements to fluentd router



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 86

def emit_statements_to_stream(conn)
  me = Fluent::MultiEventStream.new

  now = Fluent::Engine.now

  query = query_for_postgres_version(conn)

  conn.exec(query).each do |row|
    record = record_for_row(row)
    me.add(now, record)
  end

  @router.emit_stream(@tag, me)
end

#postgres_server_version_num(conn) ⇒ Object

Returns the PG_VERSION_NUM value from the database will memoize the result



103
104
105
106
107
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 103

def postgres_server_version_num(conn)
  return @postgres_server_version_num if @postgres_server_version_num

  @postgres_server_version_num = conn.exec(POSTGRES_SERVER_VERSION_QUERY).getvalue(0,0).to_i
end

#query_for_postgres_version(conn) ⇒ Object

pg_stat_statements columns changed in pg13, so we use different queries depending on the version www.postgresql.org/docs/12/pgstatstatements.html www.postgresql.org/docs/13/pgstatstatements.html



112
113
114
115
116
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 112

def query_for_postgres_version(conn)
  return PG13_STAT_STATEMENTS_QUERY if postgres_server_version_num(conn) >= 13_00_00

  PG12_STAT_STATEMENTS_QUERY
end

#record_for_row(row) ⇒ Object

Returns a fluentd record for a query row



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 58

def record_for_row(row)
  query = row['query']

  # We record the query_length as it will help in understanding whether unparseable
  # queries are truncated.
  record = {
    'queryid' => row['queryid'].to_s,
    'query_length' => query&.length,
    'calls' => row['calls']&.to_i,
    'total_time_ms' => row['total_time']&.to_f,
    'rows' => row['rows']&.to_i
  }

  return record unless query

  normalized = PgQuery.normalize(query)
  record['query'] = normalized

  record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key

  record
rescue PgQuery::ParseError
  record['query_unparseable'] = true

  record
end