Class: Fluent::MySQLSlowQueryExInput
- Inherits:
-
Plugin::TailInput
- Object
- Plugin::TailInput
- Fluent::MySQLSlowQueryExInput
- Defined in:
- lib/fluent/plugin/in_mysqlslowquery_ex.rb
Instance Method Summary collapse
- #apply_dbname_to_record(parsed_query) ⇒ Object
- #configure(conf) ⇒ Object
- #get_last_dbname ⇒ Object
-
#initialize ⇒ MySQLSlowQueryExInput
constructor
A new instance of MySQLSlowQueryExInput.
- #parser ⇒ Object
- #prepare_lines_to_parse(lines, slow_queries = []) ⇒ Object
- #receive_lines(lines, tail_watcher) ⇒ Object
- #save_last_dbname ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ MySQLSlowQueryExInput
Returns a new instance of MySQLSlowQueryExInput.
9 10 11 12 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 9 def initialize super require 'mysql-slowquery-parser' end |
Instance Method Details
#apply_dbname_to_record(parsed_query) ⇒ Object
103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 103 def apply_dbname_to_record(parsed_query) database_name = parsed_query[:db] || parsed_query[:schema] || @last_dbname_of[@path.to_sym] || @dbname_if_missing_dbname_in_log @last_dbname_of[@path.to_sym] = database_name parsed_query[:database] = database_name parsed_query.delete(:db) parsed_query.delete(:schema) parsed_query end |
#configure(conf) ⇒ Object
14 15 16 17 18 19 20 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 14 def configure(conf) conf['format'] = 'none' super if conf['pos_file'] == @last_dbname_file raise Fluet::ConfigError, '' end end |
#get_last_dbname ⇒ Object
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 39 def get_last_dbname return unless @last_dbname_file_handle @last_dbname_file_handle.pos = 0 last_db = @last_dbname_file_handle.read.chomp begin JSON.parse(last_db, symbolize_names: true) rescue JSON::ParserError {} end end |
#parser ⇒ Object
60 61 62 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 60 def parser MySQLSlowQueryParser end |
#prepare_lines_to_parse(lines, slow_queries = []) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 88 def prepare_lines_to_parse(lines, slow_queries = []) @query_unit = [] unless @query_unit while !lines.empty? line = lines.shift @query_unit << line if line.end_with?(';', ";\n") && !line.start_with?('use ', 'SET timestamp=') slow_queries << @query_unit @query_unit = nil prepare_lines_to_parse(lines, slow_queries) break # For when refactoring. Just in case. end end slow_queries end |
#receive_lines(lines, tail_watcher) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 64 def receive_lines(lines, tail_watcher) es = Fluent::MultiEventStream.new prepare_lines_to_parse(lines).each do |query_unit| begin parsed_query_unit = parser.parse_slow_log(query_unit) rescue log.warn %Q{in_mysqlslowquery_ex: parse error: #{$!.}, (#{query_unit.to_s})} next end parsed_query = apply_dbname_to_record(parsed_query_unit) es.add(Fluent::EventTime.now.to_i, parsed_query) save_last_dbname() end if !es.empty? begin router.emit_stream(@tag, es) rescue # ignore errors. Engine shows logs and backtraces. end end end |
#save_last_dbname ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 50 def save_last_dbname return unless @last_dbname_file_handle current = get_last_dbname() unless current == @last_dbname_of @last_dbname_file_handle.pos = 0 @last_dbname_file_handle.truncate(0) @last_dbname_file_handle.write(JSON.generate(current.merge(@last_dbname_of))) end end |
#shutdown ⇒ Object
33 34 35 36 37 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 33 def shutdown save_last_dbname() @last_dbname_file_handle.close if @last_dbname_file_handle super end |
#start ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 22 def start @last_dbname_of = if @last_dbname_file @last_dbname_file_handle = File.open(@last_dbname_file, File::RDWR|File::CREAT, @file_perm) @last_dbname_file_handle.sync = true get_last_dbname() else {} end super end |