Class: Fluent::MySQLSlowQueryExInput

Inherits:
Plugin::TailInput
  • Object
show all
Defined in:
lib/fluent/plugin/in_mysqlslowquery_ex.rb

Instance Method Summary collapse

Constructor Details

#initializeMySQLSlowQueryExInput

Returns a new instance of MySQLSlowQueryExInput.



9
10
11
12
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 9

def initialize
  super
  require 'mysql-slowquery-parser'
end

Instance Method Details

#apply_dbname_to_record(parsed_query) ⇒ Object



103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 103

def apply_dbname_to_record(parsed_query)
  database_name = parsed_query[:db] || parsed_query[:schema] || @last_dbname_of[@path.to_sym] || @dbname_if_missing_dbname_in_log
  @last_dbname_of[@path.to_sym] = database_name
  parsed_query[:database] = database_name
  parsed_query.delete(:db)
  parsed_query.delete(:schema)
  parsed_query
end

#configure(conf) ⇒ Object



14
15
16
17
18
19
20
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 14

def configure(conf)
  conf['format'] = 'none'
  super
  if conf['pos_file'] == @last_dbname_file
    raise Fluet::ConfigError, ''
  end
end

#get_last_dbnameObject



39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 39

def get_last_dbname
  return unless @last_dbname_file_handle
  @last_dbname_file_handle.pos = 0
  last_db = @last_dbname_file_handle.read.chomp
  begin
    JSON.parse(last_db, symbolize_names: true)
  rescue JSON::ParserError
    {}
  end
end

#parserObject



60
61
62
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 60

def parser
  MySQLSlowQueryParser
end

#prepare_lines_to_parse(lines, slow_queries = []) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 88

def prepare_lines_to_parse(lines, slow_queries = [])
  @query_unit = [] unless @query_unit
  while !lines.empty?
    line = lines.shift
    @query_unit << line
    if line.end_with?(';', ";\n") && !line.start_with?('use ', 'SET timestamp=')
      slow_queries << @query_unit
      @query_unit = nil
      prepare_lines_to_parse(lines, slow_queries)
      break # For when refactoring. Just in case.
    end
  end
  slow_queries
end

#receive_lines(lines, tail_watcher) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 64

def receive_lines(lines, tail_watcher)
  es = Fluent::MultiEventStream.new

  prepare_lines_to_parse(lines).each do |query_unit|
    begin
      parsed_query_unit = parser.parse_slow_log(query_unit)
    rescue
      log.warn %Q{in_mysqlslowquery_ex: parse error: #{$!.message}, (#{query_unit.to_s})}
      next
    end
    parsed_query = apply_dbname_to_record(parsed_query_unit)
    es.add(Fluent::EventTime.now.to_i, parsed_query)
    save_last_dbname()
  end

  if !es.empty?
    begin
      router.emit_stream(@tag, es)
    rescue
      # ignore errors. Engine shows logs and backtraces.
    end
  end
end

#save_last_dbnameObject



50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 50

def save_last_dbname
  return unless @last_dbname_file_handle
  current = get_last_dbname()
  unless current == @last_dbname_of
    @last_dbname_file_handle.pos = 0
    @last_dbname_file_handle.truncate(0)
    @last_dbname_file_handle.write(JSON.generate(current.merge(@last_dbname_of)))
  end
end

#shutdownObject



33
34
35
36
37
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 33

def shutdown
  save_last_dbname()
  @last_dbname_file_handle.close if @last_dbname_file_handle
  super
end

#startObject



22
23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/in_mysqlslowquery_ex.rb', line 22

def start
  @last_dbname_of = if @last_dbname_file
                      @last_dbname_file_handle = File.open(@last_dbname_file, File::RDWR|File::CREAT, @file_perm)
                      @last_dbname_file_handle.sync = true
                      get_last_dbname()
                    else
                      {}
                    end
  super
end