Class: Fluent::Plugin::PgStatStatementsInput
- Inherits:
-
PollingPostgresInputPlugin
- Object
- Input
- PollingPostgresInputPlugin
- Fluent::Plugin::PgStatStatementsInput
- Defined in:
- lib/fluent/plugin/in_pg_stat_statements.rb
Overview
PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:
'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => '3239318621761098074'
Constant Summary collapse
- POSTGRES_SERVER_VERSION_QUERY =
"SELECT current_setting('server_version_num')"
- PG12_STAT_STATEMENTS_QUERY =
<<-SQL SELECT queryid, query, calls, rows, total_time FROM public.pg_stat_statements SQL
- PG13_STAT_STATEMENTS_QUERY =
<<-SQL SELECT queryid, query, calls, rows, (total_plan_time + total_exec_time) total_time FROM public.pg_stat_statements SQL
Instance Method Summary collapse
-
#emit_statements_to_stream(conn) ⇒ Object
Query the database and emit statements to fluentd router.
-
#initialize ⇒ PgStatStatementsInput
constructor
A new instance of PgStatStatementsInput.
-
#postgres_server_version_num(conn) ⇒ Object
Returns the PG_VERSION_NUM value from the database will memoize the result.
-
#query_for_postgres_version(conn) ⇒ Object
pg_stat_statements columns changed in pg13, so we use different queries depending on the version www.postgresql.org/docs/12/pgstatstatements.html www.postgresql.org/docs/13/pgstatstatements.html.
-
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row.
Methods inherited from PollingPostgresInputPlugin
#shutdown, #start, #thread_main
Constructor Details
#initialize ⇒ PgStatStatementsInput
Returns a new instance of PgStatStatementsInput.
52 53 54 55 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 52 def initialize super @postgres_server_version_num = nil end |
Instance Method Details
#emit_statements_to_stream(conn) ⇒ Object
Query the database and emit statements to fluentd router
86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 86 def emit_statements_to_stream(conn) me = Fluent::MultiEventStream.new now = Fluent::Engine.now query = query_for_postgres_version(conn) conn.exec(query).each do |row| record = record_for_row(row) me.add(now, record) end @router.emit_stream(@tag, me) end |
#postgres_server_version_num(conn) ⇒ Object
Returns the PG_VERSION_NUM value from the database will memoize the result
103 104 105 106 107 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 103 def postgres_server_version_num(conn) return @postgres_server_version_num if @postgres_server_version_num @postgres_server_version_num = conn.exec(POSTGRES_SERVER_VERSION_QUERY).getvalue(0,0).to_i end |
#query_for_postgres_version(conn) ⇒ Object
pg_stat_statements columns changed in pg13, so we use different queries depending on the version www.postgresql.org/docs/12/pgstatstatements.html www.postgresql.org/docs/13/pgstatstatements.html
112 113 114 115 116 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 112 def query_for_postgres_version(conn) return PG13_STAT_STATEMENTS_QUERY if postgres_server_version_num(conn) >= 13_00_00 PG12_STAT_STATEMENTS_QUERY end |
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 58 def record_for_row(row) query = row['query'] # We record the query_length as it will help in understanding whether unparseable # queries are truncated. record = { 'queryid' => row['queryid'].to_s, 'query_length' => query&.length, 'calls' => row['calls']&.to_i, 'total_time_ms' => row['total_time']&.to_f, 'rows' => row['rows']&.to_i } return record unless query normalized = PgQuery.normalize(query) record['query'] = normalized record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key record rescue PgQuery::ParseError record['query_unparseable'] = true record end |