Class: Fluent::RdsMysqlSlowLog
- Inherits:
-
Input
- Object
- Input
- Fluent::RdsMysqlSlowLog
show all
- Defined in:
- lib/fluent/plugin/in_rds_mysql_slow_log.rb
Defined Under Namespace
Classes: Server, TimerWatcher
Instance Method Summary
collapse
Instance Method Details
26
27
28
29
30
31
32
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 26
def configure(conf)
super
configure_servers
configure_timezone
configure_encoding
end
|
51
52
53
54
55
56
57
58
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 51
def configure_encoding
if !@encoding && @from_encoding
raise ConfigError, "'from_encoding' parameter must be specified with 'encoding' parameter."
end
@encoding = parse_encoding_param(@encoding) if @encoding
@from_encoding = parse_encoding_param(@from_encoding) if @from_encoding
end
|
34
35
36
37
38
39
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 34
def configure_servers
@servers.map! do |s|
tag = @tag_prefix ? "#{@tag_prefix}.#{s.tag}" : s.tag
[tag, Server.new(s.host, s.port, s.username, s.password)]
end
end
|
41
42
43
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 41
def configure_timezone
@database_timezone = parse_timezone_param(@database_timezone) if @database_timezone
end
|
#emit_slow_log(tag, server) ⇒ Object
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 88
def emit_slow_log(tag, server)
server.connect do |client|
client.query('CALL mysql.rds_rotate_slow_log')
es = MultiEventStream.new
client.query('SELECT * FROM slow_log_backup').each do |row|
es.add(*process(row))
end
router.emit_stream(tag, es)
end
rescue
log.error $!.to_s
log.error_backtrace
end
|
#encode(str, dst, src) ⇒ Object
136
137
138
139
140
141
142
143
144
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 136
def encode(str, dst, src)
if str.nil?
nil
elsif src
str.encode!(dst, src)
else
str.force_encoding(dst)
end
end
|
172
173
174
175
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 172
def (record)
time = @keep_time_key ? record['start_time'] : record.delete('start_time')
(time || Engine.now).to_i
end
|
#on_timer ⇒ Object
82
83
84
85
86
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 82
def on_timer
@servers.each.with_index do |(tag, server), i|
emit_slow_log(tag, server)
end
end
|
#parse_encoding_param(encoding) ⇒ Object
60
61
62
63
64
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 60
def parse_encoding_param(encoding)
Encoding.find(encoding)
rescue ArgumentError => e
raise ConfigError, e.message
end
|
#parse_timezone_param(timezone) ⇒ Object
45
46
47
48
49
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 45
def parse_timezone_param(timezone)
TZInfo::Timezone.get(timezone)
rescue InvalidTimezoneIdentifier => e
raise ConfigError, e.message
end
|
#process(record) ⇒ Object
104
105
106
107
108
109
110
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 104
def process(record)
process_timestamp(record)
process_string(record)
process_time(record)
process_integer(record)
[(record), record]
end
|
#process_integer(record) ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 158
def process_integer(record)
%w[
rows_sent
rows_examined
last_insert_id
insert_id
server_id
thread_id
rows_affected
].each do |field|
record[field] &&= record[field].to_i
end
end
|
#process_string(record) ⇒ Object
122
123
124
125
126
127
128
129
130
131
132
133
134
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 122
def process_string(record)
if @null_empty_string
%w[user_host db sql_text].each do |field|
record[field] = nil if (record[field] || '').empty?
end
end
if @encoding
encode(record['user_host'], @encoding, Encoding::UTF_8)
encode(record['db'], @encoding, Encoding::UTF_8)
encode(record['sql_text'], @encoding, @from_encoding)
end
end
|
#process_time(record) ⇒ Object
146
147
148
149
150
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 146
def process_time(record)
%w[query_time lock_time].each do |field|
record[field] &&= time_to_microseconds(record[field])
end
end
|
#process_timestamp(record) ⇒ Object
112
113
114
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 112
def process_timestamp(record)
record['start_time'] &&= timestamp_to_time(record['start_time'])
end
|
#run ⇒ Object
75
76
77
78
79
80
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 75
def run
@loop.run
rescue
log.error $!.to_s
log.error_backtrace
end
|
#shutdown ⇒ Object
177
178
179
180
181
182
183
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 177
def shutdown
super
@loop.watchers.each(&:detach)
@loop.stop
@thread.join
end
|
#start ⇒ Object
66
67
68
69
70
71
72
73
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 66
def start
super
@loop = Coolio::Loop.new
@timer = TimerWatcher.new(@emit_interval, true, log, &method(:on_timer))
@loop.attach(@timer)
@thread = Thread.new(&method(:run))
end
|
#time_to_microseconds(time) ⇒ Object
152
153
154
155
156
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 152
def time_to_microseconds(time)
hour, min, sec, usec = time.scanf('%4d:%2u:%2u.%6u')
sign = hour < 0 ? -1 : 1
(hour.abs * 3_600_000_000 + min * 60_000_000 + sec * 1_000_000 + usec.to_i) * sign
end
|
#timestamp_to_time(timestamp) ⇒ Object
116
117
118
119
120
|
# File 'lib/fluent/plugin/in_rds_mysql_slow_log.rb', line 116
def timestamp_to_time(timestamp)
year, month, day, hour, min, sec, usec = timestamp.scanf('%4u-%2u-%2u %2u:%2u:%2u.%6u')
t = Time.utc(year, month, day, hour, min, sec, usec)
@database_timezone ? @database_timezone.local_to_utc(t) : t
end
|