Class: Fluent::MysqlBinlogFlydataInput
Constant Summary
Fluent::MysqlBinlogFlydataInputPreference::CUSTOM_CONFIG_PARAMS
Instance Method Summary
collapse
included
Constructor Details
Returns a new instance of MysqlBinlogFlydataInput.
43
44
45
46
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 43
def initialize
super
install_custom_signal_handler
end
|
Instance Method Details
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 52
def configure(conf)
super
@binlog_position_file = Mysql::BinLogPositionFile.new(@position_file)
unless @binlog_position_file.exists?
raise "No position file(#{@position_file}). Initial synchronization is required before starting."
end
load_custom_conf
$log.info "mysql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\""
$log.info "mysql client version: #{`mysql -V`}"
server_version = `echo 'select version();' | mysql -h #{@host} --port #{@port} -u #{@username} -p#{@password} 2>/dev/null`
server_version = server_version[(server_version.index("\n") + 1)..-1]
$log.info "mysql server version: #{server_version}"
@tables = @tables.split(/,\s*/)
@omit_events = Hash.new
@tables_append_only.split(/,\s*/).each do |table|
@tables << table unless @tables.include?(table)
@omit_events[table] = [:delete]
end
sync_fm = Flydata::FileUtil::SyncFileManager.new(nil)
@context = Mysql::Context.new(
database: @database, tables: @tables,
tag: @tag, sync_fm: sync_fm, omit_events: @omit_events
)
@record_dispatcher = Mysql::FlydataBinlogRecordDispatcher.new(@context)
end
|
#event_listener(event) ⇒ Object
129
130
131
132
133
134
135
136
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 129
def event_listener(event)
@record_dispatcher.dispatch(event)
rescue Exception => e
position = @binlog_position_file.read
$log.error "error occured while processing #{event.event_type} event at #{position}\n#{e.message}\n#{$!.backtrace.join("\n")}"
raise unless e.kind_of?(StandardError)
end
|
#install_custom_signal_handler ⇒ Object
Hack: All that has been added here is ‘Fluent::Engine.shutdown_source`. This should be in fluentd’s supervisor#install_main_process_signal_handlers
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 162
def install_custom_signal_handler
trap :USR1 do
$log.debug "fluentd main process get SIGUSR1"
$log.info "force flushing buffered events"
Thread.new {
begin
Fluent::Engine.shutdown_source
Fluent::Engine.flush!
$log.debug "flushing thread: flushed"
rescue Exception => e
$log.warn "flushing thread error: #{e}"
end
}.run
end
end
|
#run ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 100
def run
start_kodama(mysql_url) do |c|
c.binlog_position_file = @position_file
c.connection_retry_limit = @retry_limit
c.connection_retry_wait = @retry_wait
c.log_level = @log_level.to_sym
@listen_events.each do |event_type|
$log.trace { "registered binlog event listener '#{event_type}'" }
c.send("on_#{event_type}", &method(:event_listener))
end
end
rescue
$log.error "unexpected error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
raise
rescue SignalException
$log.debug "signal exception. exception: #{$!.class.to_s}, error: #{$!.to_s}"
raise
rescue Exception
$log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
raise
end
|
#shutdown ⇒ Object
138
139
140
141
142
143
144
145
146
147
148
149
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 138
def shutdown
if @thread and @thread.alive?
$log.info "Requesting stop Kodama"
@kodama_client.stop_request
if wait_till_safe_to_stop
$log.info "Killing Kodama client"
Thread.kill(@thread)
else
$log.error "Unable to stop Kodama"
end
end
end
|
#start ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 80
def start
super
positions_path = @context.sync_fm.table_positions_dir_path
Dir.mkdir positions_path unless File.exists? positions_path
rescue Binlog::Error
if (/basic_string::_M_replace_aux/ === $!.to_s)
$log.error <<EOS
a mysql-replication-listener error. This could have been caused by one of the following reasons.
- Failed on connect: Your host is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
EOS
else
$log.error "unexpected mysql-replication-listener error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
end
raise
rescue Exception
$log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
raise
end
|
#start_kodama(options, &block) ⇒ Object
122
123
124
125
126
127
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 122
def start_kodama(options, &block)
@kodama_client = Kodama::Client.new(Kodama::Client.mysql_url(options))
@kodama_client.logger = $log
block.call(@kodama_client)
@kodama_client.start
end
|
#wait_till_safe_to_stop ⇒ Object
151
152
153
154
155
156
157
158
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 151
def wait_till_safe_to_stop
retry_count = 5
1.upto(retry_count) do |i|
return true if @kodama_client.safe_to_stop?
sleep 3
end
false
end
|