Class: Fluent::RdsMysqlSlowLog

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_rds_mysql_slow_log.rb

Defined Under Namespace

Classes: Server, TimerWatcher

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



26
27
28
29
30
31
32
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 26

def configure(conf)
  super

  configure_servers
  configure_timezone
  configure_encoding
end

#configure_encodingObject



51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 51

def configure_encoding
  if !@encoding && @from_encoding
    raise ConfigError, "'from_encoding' parameter must be specified with 'encoding' parameter."
  end

  @encoding = parse_encoding_param(@encoding) if @encoding
  @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding
end

#configure_serversObject



34
35
36
37
38
39
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 34

def configure_servers
  @servers.map! do |s|
    tag = @tag_prefix ? "#{@tag_prefix}.#{s.tag}" : s.tag
    [tag, Server.new(s.host, s.port, s.username, s.password)]
  end
end

#configure_timezoneObject



41
42
43
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 41

def configure_timezone
  @database_timezone = parse_timezone_param(@database_timezone) if @database_timezone
end

#emit_slow_log(tag, server) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 88

def emit_slow_log(tag, server)
  server.connect do |client|
    client.query('CALL mysql.rds_rotate_slow_log')

    es = MultiEventStream.new
    client.query('SELECT * FROM slow_log_backup').each do |row|
      es.add(*process(row))
    end

    router.emit_stream(tag, es)
  end
rescue
  log.error $!.to_s
  log.error_backtrace
end

#encode(str, dst, src) ⇒ Object



136
137
138
139
140
141
142
143
144
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 136

def encode(str, dst, src)
  if str.nil?
    nil
  elsif src
    str.encode!(dst, src)
  else
    str.force_encoding(dst)
  end
end

#extract_time(record) ⇒ Object



172
173
174
175
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 172

def extract_time(record)
  time = @keep_time_key ? record['start_time'] : record.delete('start_time')
  (time || Engine.now).to_i
end

#on_timerObject



82
83
84
85
86
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 82

def on_timer
  @servers.each.with_index do |(tag, server), i|
    emit_slow_log(tag, server)
  end
end

#parse_encoding_param(encoding) ⇒ Object



60
61
62
63
64
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 60

def parse_encoding_param(encoding)
  Encoding.find(encoding)
rescue ArgumentError => e
  raise ConfigError, e.message
end

#parse_timezone_param(timezone) ⇒ Object



45
46
47
48
49
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 45

def parse_timezone_param(timezone)
  TZInfo::Timezone.get(timezone)
rescue InvalidTimezoneIdentifier => e
  raise ConfigError, e.message
end

#process(record) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 104

def process(record)
  process_timestamp(record)
  process_string(record)
  process_time(record)
  process_integer(record)
  [extract_time(record), record]
end

#process_integer(record) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 158

def process_integer(record)
  %w[
    rows_sent
    rows_examined
    last_insert_id
    insert_id
    server_id
    thread_id
    rows_affected
  ].each do |field|
    record[field] &&= record[field].to_i
  end
end

#process_string(record) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 122

def process_string(record)
  if @null_empty_string
    %w[user_host db sql_text].each do |field|
      record[field] = nil if (record[field] || '').empty?
    end
  end

  if @encoding
    encode(record['user_host'], @encoding, Encoding::UTF_8)
    encode(record['db'], @encoding, Encoding::UTF_8)
    encode(record['sql_text'], @encoding, @from_encoding)
  end
end

#process_time(record) ⇒ Object



146
147
148
149
150
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 146

def process_time(record)
  %w[query_time lock_time].each do |field|
    record[field] &&= time_to_microseconds(record[field])
  end
end

#process_timestamp(record) ⇒ Object



112
113
114
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 112

def process_timestamp(record)
  record['start_time'] &&= timestamp_to_time(record['start_time'])
end

#runObject



75
76
77
78
79
80
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 75

def run
  @loop.run
rescue
  log.error $!.to_s
  log.error_backtrace
end

#shutdownObject



177
178
179
180
181
182
183
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 177

def shutdown
  super

  @loop.watchers.each(&:detach)
  @loop.stop
  @thread.join
end

#startObject



66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 66

def start
  super

  @loop = Coolio::Loop.new
  @timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer))
  @loop.attach(@timer)
  @thread = Thread.new(&method(:run))
end

#time_to_microseconds(time) ⇒ Object



152
153
154
155
156
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 152

def time_to_microseconds(time)
  hour, min, sec, usec = time.scanf('%4d:%2u:%2u.%6u')
  sign = hour < 0 ? -1 : 1
  (hour.abs * 3_600_000_000 + min * 60_000_000 + sec * 1_000_000 + usec.to_i) * sign
end

#timestamp_to_time(timestamp) ⇒ Object



116
117
118
119
120
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 116

def timestamp_to_time(timestamp)
  year, month, day, hour, min, sec, usec = timestamp.scanf('%4u-%2u-%2u %2u:%2u:%2u.%6u')
  t = Time.utc(year, month, day, hour, min, sec, usec)
  @database_timezone ? @database_timezone.local_to_utc(t) : t
end