35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/flydata/source_mysql/parser/dump_parser.rb', line 35
def dump(file_path = nil, &block)
unless file_path || block
raise ArgumentError.new("file_path or block must be given.")
end
dump_cmd = generate_dump_cmd(@conf, file_path)
table_locker = create_table_locker(@conf["database"], @conf["tables"])
table_locker.resume
begin
rd_io, wr_io = IO.pipe("utf-8", "utf-8")
wr_io.sync = true
wr_io.set_encoding("utf-8", "utf-8")
rd_io.extend(DumpStreamIO)
binlog_pos = nil
Open3.popen3 dump_cmd do |cmd_in, cmd_out, cmd_err, wait_thr|
cmd_in.close_write
cmd_out.set_encoding("utf-8", "utf-8")
if file_path
loop do
if File.size? file_path
break
end
sleep 1
end
else
cmd_out.wait_readable end
binfile, pos = table_locker.resume
binlog_pos = {binfile: binfile, pos: pos}
threads = []
threads << Thread.new do
begin
filter_dump_stream(cmd_out, wr_io)
ensure
wr_io.close rescue nil
end
end
errors = ""
threads << Thread.new do
cmd_err.each_line do |line|
errors << line unless /^Warning:/ === line
end
end
if block
block.call(rd_io, binlog_pos)
end
threads.each(&:join)
unless wait_thr.value == 0
errors = "Failed to run mysqldump command." if errors.empty?
end
raise errors unless errors.empty?
end
binlog_pos
rescue
FileUtils.rm(file_path) if file_path && File.exists?(file_path)
raise
ensure
table_locker.resume if table_locker.alive?
rd_io.close rescue nil
wr_io.close rescue nil
end
end
|