Class: Flydata::SyncFileManager
- Inherits:
-
Object
- Object
- Flydata::SyncFileManager
- Defined in:
- lib/flydata/sync_file_manager.rb
Constant Summary collapse
- DUMP_DIR =
FLYDATA_DUMP_DIR
- BACKUP_DIR =
FLYDATA_BACKUP_DIR
- TABLE_POSITIONS_DIR =
FLYDATA_TABLE_POSITIONS_DIR
- INITIAL_SYNC_SEQ =
1
- SOURCE_TABLE_EXT =
"mysql_table"
Class Method Summary collapse
-
.clear_sync_client_resources ⇒ Object
Rename instead of deleting files.
Instance Method Summary collapse
- #backup_dir ⇒ Object
- #backup_dump_dir ⇒ Object
- #close ⇒ Object
- #delete_dump_file ⇒ Object
- #delete_dump_files ⇒ Object
- #delete_master_position_files ⇒ Object
- #delete_sync_info ⇒ Object
- #delete_table_ddl_files(*tables) ⇒ Object
- #delete_table_position_files(*tables) ⇒ Object
- #delete_table_rev_files(*tables) ⇒ Object
- #delete_table_source_pos(table_name) ⇒ Object
- #dump_file_path ⇒ Object
-
#dump_pos_path ⇒ Object
dump pos file for resume.
- #get_new_table_list(tables, file_type) ⇒ Object
- #get_next_table_position(table_name) ⇒ Object
- #get_table_position(table_name) ⇒ Object
- #get_table_source_pos(table_name) ⇒ Object
- #get_table_source_pos_init(table_name) ⇒ Object
-
#get_table_source_raw_pos(table_name) ⇒ Object
returns String.
-
#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object
Read a sequence number from the table’s position file, increment the number and pass the number to a block.
- #increment_table_position(seq) ⇒ Object
- #increment_table_rev(table_name, base_rev) ⇒ Object
-
#initialize(data_entry, source = nil) ⇒ SyncFileManager
constructor
A new instance of SyncFileManager.
- #install_table_source_pos_files(tables) ⇒ Object
- #load_dump_pos ⇒ Object
- #load_generated_ddl(tables) ⇒ Object
- #load_sent_source_pos(file_path = sent_source_pos_path) ⇒ Object
- #load_source_pos(file_path = source_pos_path) ⇒ Object
- #load_stats ⇒ Object
- #load_sync_info ⇒ Object
- #lock_pid_file ⇒ Object
- #open_table_position_file(table_name) ⇒ Object
-
#reset_table_position_files(tables, options = {}) ⇒ Object
table files.
- #save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) ⇒ Object
- #save_generated_ddl(tables, contents = "1") ⇒ Object
- #save_record_count_stat(table, record_count) ⇒ Object
-
#save_sent_source_pos(source_pos) ⇒ Object
sent source pos file (binlog.pos).
-
#save_source_pos(source_pos) ⇒ Object
master binlog.pos file.
- #save_source_table_marshal_dump(source_table) ⇒ Object
- #save_ssl_ca(ssl_ca_content, path = ssl_ca_path) ⇒ Object
- #save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) ⇒ Object
- #save_sync_info(initial_sync, tables, auto_create_status = nil) ⇒ Object
- #save_table_position(table_name, seq) ⇒ Object
-
#save_table_positions(table_names, seq) ⇒ Object
This doen’t cache the File object.
- #save_table_source_pos(tables, source_pos, options = {}) ⇒ Object
- #sent_source_pos_path(master_source_pos_path = source_pos_path) ⇒ Object
- #source ⇒ Object
- #source_pos_path ⇒ Object
-
#source_table_marshal_dump_path ⇒ Object
SourceTable marshal file.
-
#ssl_ca_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_ca file path.
-
#ssl_cipher_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_cipher file path.
- #stats_path ⇒ Object
-
#sync_info_file ⇒ Object
“sync.info” file includes initial sync information for resuming - initial_sync: True if initial sync is full initial sync - tables: target tables for initial sync - auto_create_status: START|SENT_DDL|CREATED_TABLES.
- #table_ddl_file_paths(*tables) ⇒ Object
- #table_position_file_path(table_name) ⇒ Object
- #table_position_file_paths(*tables) ⇒ Object
- #table_positions_dir_path ⇒ Object
- #table_rev(table_name) ⇒ Object
- #table_rev_file_path(table_name) ⇒ Object
- #table_rev_file_paths(*tables) ⇒ Object
- #table_source_pos_file_path(table_name) ⇒ Object
- #table_source_pos_init_paths(*tables) ⇒ Object
- #table_source_pos_paths(*tables) ⇒ Object
- #tables_from_positions_dir ⇒ Object
Constructor Details
#initialize(data_entry, source = nil) ⇒ SyncFileManager
Returns a new instance of SyncFileManager.
16 17 18 19 20 |
# File 'lib/flydata/sync_file_manager.rb', line 16 def initialize(data_entry, source = nil) @data_entry = data_entry @source = source #for Source dependent objects @table_position_files = {} # File objects keyed by table name end |
Class Method Details
.clear_sync_client_resources ⇒ Object
Rename instead of deleting files
515 516 517 518 519 520 521 522 523 524 525 |
# File 'lib/flydata/sync_file_manager.rb', line 515 def self.clear_sync_client_resources = Time.now.utc.strftime("%Y%m%d_%H%M%S") [FLYDATA_CLIENT_BUFFER_DIR, FLYDATA_TABLE_POSITIONS_DIR, DUMP_DIR, ].each do |dir_path| if Dir.exists?(dir_path) FileUtils.mv(dir_path, "#{dir_path}_#{}") end end end |
Instance Method Details
#backup_dir ⇒ Object
495 496 497 |
# File 'lib/flydata/sync_file_manager.rb', line 495 def backup_dir BACKUP_DIR end |
#backup_dump_dir ⇒ Object
485 486 487 488 489 490 491 492 493 |
# File 'lib/flydata/sync_file_manager.rb', line 485 def backup_dump_dir backup_dir = BACKUP_DIR.dup FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir) dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S")) FileUtils.mkdir(dest_dir) ['info', 'pos', 'stats', SOURCE_TABLE_EXT].each do |ext| FileUtils.mv(Dir.glob("#{dump_dir}/*.#{ext}"), dest_dir) end end |
#close ⇒ Object
26 27 28 29 |
# File 'lib/flydata/sync_file_manager.rb', line 26 def close @table_position_files.values.each {|f| f.close } @table_position_files = {} end |
#delete_dump_file ⇒ Object
481 482 483 |
# File 'lib/flydata/sync_file_manager.rb', line 481 def delete_dump_file FileUtils.rm(dump_file_path) if File.exists?(dump_file_path) end |
#delete_dump_files ⇒ Object
398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/flydata/sync_file_manager.rb', line 398 def delete_dump_files files_to_delete = [ dump_file_path, dump_pos_path, source_table_marshal_dump_path, sync_info_file, stats_path ] files_to_delete.flatten.each do |file_to_delete| FileUtils.rm(file_to_delete) if File.exists?(file_to_delete) end end |
#delete_master_position_files ⇒ Object
411 412 413 414 415 416 417 418 419 420 |
# File 'lib/flydata/sync_file_manager.rb', line 411 def delete_master_position_files files_to_delete = [ source_pos_path, sent_source_pos_path, lock_pid_file, ] files_to_delete.flatten.each do |file_to_delete| FileUtils.rm(file_to_delete) if File.exists?(file_to_delete) end end |
#delete_sync_info ⇒ Object
296 297 298 |
# File 'lib/flydata/sync_file_manager.rb', line 296 def delete_sync_info FileUtils.rm(sync_info_file) end |
#delete_table_ddl_files(*tables) ⇒ Object
380 381 382 383 384 385 |
# File 'lib/flydata/sync_file_manager.rb', line 380 def delete_table_ddl_files(*tables) files_to_delete = table_ddl_file_paths(*tables) files_to_delete.each do |path| FileUtils.rm(path) if File.exists?(path) end end |
#delete_table_position_files(*tables) ⇒ Object
362 363 364 365 366 367 368 369 370 371 |
# File 'lib/flydata/sync_file_manager.rb', line 362 def delete_table_position_files(*tables) files_to_delete = [ table_position_file_paths(*tables), table_source_pos_paths(*tables), table_source_pos_init_paths(*tables), ] files_to_delete.flatten.each do |path| FileUtils.rm(path) if File.exists?(path) end end |
#delete_table_rev_files(*tables) ⇒ Object
373 374 375 376 377 378 |
# File 'lib/flydata/sync_file_manager.rb', line 373 def delete_table_rev_files(*tables) files_to_delete = table_rev_file_paths(*tables) files_to_delete.each do |path| FileUtils.rm(path) if File.exists?(path) end end |
#delete_table_source_pos(table_name) ⇒ Object
423 424 425 426 427 428 429 430 |
# File 'lib/flydata/sync_file_manager.rb', line 423 def delete_table_source_pos(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos") if File.exists?(file) FileUtils.rm(file, :force => true) else puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?" end end |
#dump_file_path ⇒ Object
31 32 33 |
# File 'lib/flydata/sync_file_manager.rb', line 31 def dump_file_path File.join(dump_dir, @data_entry['name']) + ".dump" end |
#dump_pos_path ⇒ Object
dump pos file for resume
36 37 38 |
# File 'lib/flydata/sync_file_manager.rb', line 36 def dump_pos_path dump_file_path + ".pos" end |
#get_new_table_list(tables, file_type) ⇒ Object
79 80 81 82 83 84 85 |
# File 'lib/flydata/sync_file_manager.rb', line 79 def get_new_table_list(tables, file_type) new_tables = [] tables.each do |table| new_tables << table unless File.exists?(File.join(table_positions_dir_path, "#{table}.#{file_type}")) end new_tables end |
#get_next_table_position(table_name) ⇒ Object
278 279 280 281 |
# File 'lib/flydata/sync_file_manager.rb', line 278 def get_next_table_position(table_name) seq = get_table_position(table_name) increment_table_position(seq) end |
#get_table_position(table_name) ⇒ Object
271 272 273 274 275 276 |
# File 'lib/flydata/sync_file_manager.rb', line 271 def get_table_position(table_name) f = open_table_position_file(table_name) seq = f.read f.rewind seq end |
#get_table_source_pos(table_name) ⇒ Object
448 449 450 451 452 453 |
# File 'lib/flydata/sync_file_manager.rb', line 448 def get_table_source_pos(table_name) source_pos_str = get_table_source_raw_pos(table_name) return nil unless source_pos_str source.source_pos.create_source_pos( source_pos_str ) end |
#get_table_source_pos_init(table_name) ⇒ Object
324 325 326 327 328 329 |
# File 'lib/flydata/sync_file_manager.rb', line 324 def get_table_source_pos_init(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos.init") return nil unless File.exists?(file) source.source_pos.create_source_pos( File.open(file, 'r').readline ) end |
#get_table_source_raw_pos(table_name) ⇒ Object
returns String. interface for fluentd
455 456 457 458 459 460 |
# File 'lib/flydata/sync_file_manager.rb', line 455 def get_table_source_raw_pos(table_name) #returns String. interface for fluentd file = table_source_pos_file_path(table_name) return nil unless File.exists?(file) File.open(file, 'r').readline end |
#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object
Read a sequence number from the table’s position file, increment the number and pass the number to a block. After executing the block, saves the value to the position file.
217 218 219 220 221 222 223 224 225 226 |
# File 'lib/flydata/sync_file_manager.rb', line 217 def increment_and_save_table_position(table_name) seq = get_table_position(table_name) seq = increment_table_position(seq) # logical transaction starts yield(seq) save_table_position(table_name, seq) # logical transaction ends end |
#increment_table_position(seq) ⇒ Object
207 208 209 210 211 |
# File 'lib/flydata/sync_file_manager.rb', line 207 def increment_table_position(seq) seq = seq.to_i + 1 seq = "#{seq}.sync" if seq == 1 seq end |
#increment_table_rev(table_name, base_rev) ⇒ Object
353 354 355 356 357 358 359 360 |
# File 'lib/flydata/sync_file_manager.rb', line 353 def increment_table_rev(table_name, base_rev) file = table_rev_file_path(table_name) new_rev = base_rev + 1 File.open(file, "w") do |f| f.write(new_rev) end new_rev end |
#install_table_source_pos_files(tables) ⇒ Object
466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
# File 'lib/flydata/sync_file_manager.rb', line 466 def install_table_source_pos_files(tables) FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path) tables.each do |table_name| file_name = table_name + ".binlog.pos" src_file = File.join(dump_dir, file_name) if ! File.exists?(src_file) raise "#{src_file} does not exist. Error!!" end FileUtils.mv(src_file, table_positions_dir_path) # save the position at initial sync. this is used for repair if # necessary. FileUtils.cp(File.join(table_positions_dir_path, file_name), File.join(table_positions_dir_path, file_name + ".init")) end end |
#load_dump_pos ⇒ Object
47 48 49 50 51 52 53 54 |
# File 'lib/flydata/sync_file_manager.rb', line 47 def load_dump_pos path = dump_pos_path return {} unless File.exists?(path) content = File.open(path, 'r').readline source_table = load_source_table_marshal_dump dump_pos_content_to_hash(content).merge( { source_table: source_table} ) end |
#load_generated_ddl(tables) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/flydata/sync_file_manager.rb', line 56 def load_generated_ddl(tables) tables = [ tables ] unless tables.kind_of?(Array) paths = table_ddl_file_paths(*tables) paths.collect{|path| begin File.open(path) {|f| f.read } rescue Errno::ENOENT nil end } end |
#load_sent_source_pos(file_path = sent_source_pos_path) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/flydata/sync_file_manager.rb', line 132 def load_sent_source_pos(file_path = sent_source_pos_path) return nil unless File.exists?(file_path) source_pos_str = IO.read(file_path).strip begin source_pos = source.source_pos.create_source_pos(source_pos_str) rescue RuntimeError return nil end source_pos end |
#load_source_pos(file_path = source_pos_path) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/flydata/sync_file_manager.rb', line 108 def load_source_pos(file_path = source_pos_path) return nil unless File.exists?(file_path) source_pos_str = IO.read(file_path).strip begin context = source.source_pos source_pos = context.create_source_pos(source_pos_str) rescue RuntimeError return nil end source_pos end |
#load_stats ⇒ Object
509 510 511 512 |
# File 'lib/flydata/sync_file_manager.rb', line 509 def load_stats return nil unless File.exists?(stats_path) Hash[*File.read(stats_path).split(/\t/)] end |
#load_sync_info ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/flydata/sync_file_manager.rb', line 311 def load_sync_info return nil unless File.exists?(sync_info_file) line = File.open(sync_info_file, 'r').readline begin JSON.parse(line, symbolize_names: true) rescue # For compatibility items = line.split("\t") { initial_sync: (items[0] == 'true'), tables: items[1].split(" ") } end end |
#lock_pid_file ⇒ Object
283 284 285 |
# File 'lib/flydata/sync_file_manager.rb', line 283 def lock_pid_file FLYDATA_LOCK end |
#open_table_position_file(table_name) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/flydata/sync_file_manager.rb', line 232 def open_table_position_file(table_name) file = table_position_file_path(table_name) retry_count = 0 begin @table_position_files[table_name] ||= (f = File.open(file, File::RDWR); f.sync = true; f) rescue Errno::ENOENT raise if retry_count > 0 # Already retried. Must be a differentfile causing the error # File not exist. Create one with initial value of '0' File.open(file, "w") {|f| f.write('0') } retry_count += 1 retry end @table_position_files[table_name] end |
#reset_table_position_files(tables, options = {}) ⇒ Object
table files
176 177 178 179 180 181 |
# File 'lib/flydata/sync_file_manager.rb', line 176 def reset_table_position_files(tables, = {}) tables.each do |table_name| file = File.join(table_positions_dir_path, table_name + ".pos") File.open(file, "w") {|f| f.write("0") } end end |
#save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) ⇒ Object
40 41 42 43 44 45 |
# File 'lib/flydata/sync_file_manager.rb', line 40 def save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) raise "Cannot create dump pos file because source position is unavailable." unless source_pos File.open(dump_pos_path, 'w') do |f| f.write(dump_pos_content(status, table_name, last_pos, source_pos, state, substate)) end end |
#save_generated_ddl(tables, contents = "1") ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/flydata/sync_file_manager.rb', line 68 def save_generated_ddl(tables, contents = "1") tables = [ tables ] unless tables.kind_of?(Array) #Create positions if dir does not exist unless File.directory?(table_positions_dir_path) FileUtils.mkdir_p(table_positions_dir_path) end tables.each do |tab| File.open(File.join(table_positions_dir_path, "#{tab}.generated_ddl"), 'w') {|f| f.write(contents) } end end |
#save_record_count_stat(table, record_count) ⇒ Object
503 504 505 506 507 |
# File 'lib/flydata/sync_file_manager.rb', line 503 def save_record_count_stat(table, record_count) stats = load_stats || Hash.new stats[table] = stats[table] ? stats[table].to_i + record_count : record_count save_stats(stats) end |
#save_sent_source_pos(source_pos) ⇒ Object
sent source pos file (binlog.pos)
126 127 128 129 130 |
# File 'lib/flydata/sync_file_manager.rb', line 126 def save_sent_source_pos(source_pos) File.open(sent_source_pos_path, 'w') do |f| f.write(source_pos.to_s) end end |
#save_source_pos(source_pos) ⇒ Object
master binlog.pos file
101 102 103 104 105 106 |
# File 'lib/flydata/sync_file_manager.rb', line 101 def save_source_pos(source_pos) path = source_pos_path File.open(path, 'w') do |f| f.write(source_pos.to_s) end end |
#save_source_table_marshal_dump(source_table) ⇒ Object
94 95 96 97 98 |
# File 'lib/flydata/sync_file_manager.rb', line 94 def save_source_table_marshal_dump(source_table) File.open(source_table_marshal_dump_path, 'w') do |f| f.write Marshal.dump(source_table) end end |
#save_ssl_ca(ssl_ca_content, path = ssl_ca_path) ⇒ Object
156 157 158 159 160 |
# File 'lib/flydata/sync_file_manager.rb', line 156 def save_ssl_ca(ssl_ca_content, path = ssl_ca_path) File.open(path, 'w') do |f| f.write(ssl_ca_content) end end |
#save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) ⇒ Object
169 170 171 172 173 |
# File 'lib/flydata/sync_file_manager.rb', line 169 def save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) File.open(path, 'w') do |f| f.write(ssl_cipher_content) end end |
#save_sync_info(initial_sync, tables, auto_create_status = nil) ⇒ Object
300 301 302 303 304 305 306 307 308 309 |
# File 'lib/flydata/sync_file_manager.rb', line 300 def save_sync_info(initial_sync, tables, auto_create_status = nil) File.open(sync_info_file, "w") do |f| content = { initial_sync: initial_sync, tables: tables, auto_create_status: auto_create_status, }.to_json f.write(content) end end |
#save_table_position(table_name, seq) ⇒ Object
247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/flydata/sync_file_manager.rb', line 247 def save_table_position(table_name, seq) f = open_table_position_file(table_name) prev_seq_len = f.size seq_to_write = seq.to_s new_seq_len = seq_to_write.size if new_seq_len < prev_seq_len seq_to_write += " " * (prev_seq_len - new_seq_len) end f.write(seq_to_write) f.truncate(new_seq_len) if new_seq_len < prev_seq_len f.rewind end |
#save_table_positions(table_names, seq) ⇒ Object
This doen’t cache the File object
263 264 265 266 267 268 269 |
# File 'lib/flydata/sync_file_manager.rb', line 263 def save_table_positions(table_names, seq) table_names = Array(table_names) table_names.each do |table_name| file_path = table_position_file_path(table_name) File.write(file_path, seq.to_s) end end |
#save_table_source_pos(tables, source_pos, options = {}) ⇒ Object
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/flydata/sync_file_manager.rb', line 432 def save_table_source_pos(tables, source_pos, = {}) dest_dir = case [:destination] when :positions; table_positions_dir_path when :dump; dump_dir else dump_dir end tables = [ tables ] unless tables.kind_of?(Array) tables.each do |table_name| file = File.join(dest_dir, table_name + ".binlog.pos") File.open(file, "w") do |f| f.write(source_pos.to_s) end end end |
#sent_source_pos_path(master_source_pos_path = source_pos_path) ⇒ Object
144 145 146 147 |
# File 'lib/flydata/sync_file_manager.rb', line 144 def sent_source_pos_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) "#{master_source_pos_path[0..-5]}.sent.pos" end |
#source ⇒ Object
22 23 24 |
# File 'lib/flydata/sync_file_manager.rb', line 22 def source @source end |
#source_pos_path ⇒ Object
121 122 123 |
# File 'lib/flydata/sync_file_manager.rb', line 121 def source_pos_path File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos") end |
#source_table_marshal_dump_path ⇒ Object
SourceTable marshal file
90 91 92 |
# File 'lib/flydata/sync_file_manager.rb', line 90 def source_table_marshal_dump_path dump_file_path + ".#{SOURCE_TABLE_EXT}" end |
#ssl_ca_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_ca file path
150 151 152 153 154 |
# File 'lib/flydata/sync_file_manager.rb', line 150 def ssl_ca_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) # <data-entry-name>.ssl_ca.pem "#{master_source_pos_path[0..-12]}.ssl_ca.pem" end |
#ssl_cipher_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_cipher file path
163 164 165 166 167 |
# File 'lib/flydata/sync_file_manager.rb', line 163 def ssl_cipher_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) # <data-entry-name>.ssl_cipher "#{master_source_pos_path[0..-12]}.ssl_cipher" end |
#stats_path ⇒ Object
499 500 501 |
# File 'lib/flydata/sync_file_manager.rb', line 499 def stats_path File.join(dump_dir, @data_entry['name']) + ".stats" end |
#sync_info_file ⇒ Object
“sync.info” file includes initial sync information for resuming
-
initial_sync: True if initial sync is full initial sync
-
tables: target tables for initial sync
-
auto_create_status: START|SENT_DDL|CREATED_TABLES
292 293 294 |
# File 'lib/flydata/sync_file_manager.rb', line 292 def sync_info_file File.join(dump_dir, "sync.info") end |
#table_ddl_file_paths(*tables) ⇒ Object
192 193 194 195 |
# File 'lib/flydata/sync_file_manager.rb', line 192 def table_ddl_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.generated_ddl')) : tables.map{|table| File.join(table_positions_dir_path, table + '.generated_ddl')} end |
#table_position_file_path(table_name) ⇒ Object
228 229 230 |
# File 'lib/flydata/sync_file_manager.rb', line 228 def table_position_file_path(table_name) File.join(table_positions_dir_path, table_name + ".pos") end |
#table_position_file_paths(*tables) ⇒ Object
187 188 189 190 |
# File 'lib/flydata/sync_file_manager.rb', line 187 def table_position_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.pos')} end |
#table_positions_dir_path ⇒ Object
183 184 185 |
# File 'lib/flydata/sync_file_manager.rb', line 183 def table_positions_dir_path TABLE_POSITIONS_DIR end |
#table_rev(table_name) ⇒ Object
340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/flydata/sync_file_manager.rb', line 340 def table_rev(table_name) file = table_rev_file_path(table_name) return 1 unless File.exists?(file) #default revision is 1 File.open(file, "r+") do |f| seq = f.read if seq.empty? return 1 else return seq.to_i end end end |
#table_rev_file_path(table_name) ⇒ Object
331 332 333 |
# File 'lib/flydata/sync_file_manager.rb', line 331 def table_rev_file_path(table_name) File.join(table_positions_dir_path, table_name + ".rev") end |
#table_rev_file_paths(*tables) ⇒ Object
335 336 337 338 |
# File 'lib/flydata/sync_file_manager.rb', line 335 def table_rev_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) : tables.map{|table| table_rev_file_path(table)} end |
#table_source_pos_file_path(table_name) ⇒ Object
462 463 464 |
# File 'lib/flydata/sync_file_manager.rb', line 462 def table_source_pos_file_path(table_name) File.join(table_positions_dir_path, table_name + ".binlog.pos") end |
#table_source_pos_init_paths(*tables) ⇒ Object
202 203 204 205 |
# File 'lib/flydata/sync_file_manager.rb', line 202 def table_source_pos_init_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos.init')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos.init')} end |
#table_source_pos_paths(*tables) ⇒ Object
197 198 199 200 |
# File 'lib/flydata/sync_file_manager.rb', line 197 def table_source_pos_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos')} end |
#tables_from_positions_dir ⇒ Object
387 388 389 390 391 392 393 394 395 396 |
# File 'lib/flydata/sync_file_manager.rb', line 387 def tables_from_positions_dir all_table_control_files = Dir.glob(File.join(table_positions_dir_path, '*.{pos,generated_ddl,init,rev}')) tables = Set.new all_table_control_files.each do |control_file| file_name = File.basename(control_file) file_name = file_name.slice(0...(file_name.index('.'))) tables << file_name end tables.to_a end |