Class: Fluent::Plugin::PgStatStatementsInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::PgStatStatementsInput
- Defined in:
- lib/fluent/plugin/in_pg_stat_statements.rb
Overview
PgStatStatementsInput will periodically poll postgres, querying pg_stat_statements for queryid to query mappings. These are then normalized for security purposes fingerprinted and emitted as records with the following format:
'fingerprint' => '8a6e9896bd9048a2',
'query' => 'SELECT * FROM table ORDER BY queryid LIMIT $1',
'query_length' => 58,
'queryid' => 3239318621761098074
Instance Method Summary collapse
-
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row.
- #shutdown ⇒ Object
- #start ⇒ Object
- #thread_main ⇒ Object
Instance Method Details
#record_for_row(row) ⇒ Object
Returns a fluentd record for a query row
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 79 def record_for_row(row) query = row['query'] # We record the query_length as it will help in understanding whether unparseable # queries are truncated. record = { 'queryid' => row['queryid'], 'query_length' => query&.length } return record unless query normalized = PgQuery.normalize(query) record['query'] = normalized record[@fingerprint_key] = PgQuery.parse(normalized).fingerprint if @fingerprint_key record rescue PgQuery::ParseError record['query_unparseable'] = true record end |
#shutdown ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 54 def shutdown @stop_flag = true # Interrupt thread and wait for it to finish Thread.new { @thread.run } if @thread @thread.join end |
#start ⇒ Object
49 50 51 52 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 49 def start @stop_flag = false @thread = Thread.new(&method(:thread_main)) end |
#thread_main ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/in_pg_stat_statements.rb', line 62 def thread_main until @stop_flag sleep @interval break if @stop_flag begin with_connection do |conn| emit_statements_to_stream(conn) end rescue StandardError => e log.error 'unexpected error', error: e., error_class: e.class log.error_backtrace e.backtrace end end end |