Class: Forklift::Patterns::Mysql

Inherits:
Object
  • Object
show all
Defined in:
lib/forklift/patterns/mysql_patterns.rb

Class Method Summary collapse

Class Method Details

.can_incremental_pipe?(source, from_table, destination, to_table, matcher = source.default_matcher) ⇒ Boolean

Returns:

  • (Boolean)
[View source]

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/forklift/patterns/mysql_patterns.rb', line 76

def self.can_incremental_pipe?(source, from_table, destination, to_table, matcher=source.default_matcher)
  return false unless source.tables.include?(from_table)
  return false unless destination.tables.include?(to_table)
  source_cols      = source.columns(from_table, source.current_database)
  destination_cols = destination.columns(to_table, destination.current_database)
  return false unless source_cols.include?(matcher)
  return false unless destination_cols.include?(matcher)
  source_cols.each do |source_col|
    return false unless destination_cols.include?(source_col)
  end
  destination_cols.each do |destination_col|
    return false unless source_cols.include?(destination_col)
  end
  true
end

.incremental_pipe(source, from_table, destination, to_table, matcher = source.default_matcher, primary_key = 'id') ⇒ Object

[View source]

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/forklift/patterns/mysql_patterns.rb', line 21

def self.incremental_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id')
  start = Time.new.to_i
  from_db = source.current_database 
  to_db = destination.current_database 
  source.forklift.logger.log("mysql incremental_pipe: `#{from_db}`.`#{from_table}` => `#{to_db}`.`#{to_table}`")
  source.q("create table if not exists `#{to_db}`.`#{to_table}` like `#{from_db}`.`#{from_table}`")

  # Count the number of rows in to_table
  original_count = source.count(to_table, to_db)

  # Find the latest/max/newest timestamp from the final table
  # in order to determine the last copied row.
  latest_timestamp = source.max_timestamp(to_table, matcher, to_db)

  # If to_table has existing rows, ensure none of them are "stale."
  # A stale row in to_table means a previously copied row was
  # updated in from_table, so let's delete it from the to_table
  # so we can get a fresh copy of that row.
  if original_count > 0
    # Get the ids of rows in from_table that are newer than the newest row in to_table.
    # Some of these rows could either be a) stale or b) new.
    source.read("select `#{primary_key}` from `#{from_db}`.`#{from_table}` where `#{matcher}` > \"#{latest_timestamp}\" order by `#{matcher}`") do |stale_rows|
      if stale_rows.length > 0
        # Delete these ids from to_table.
        # If the ids are stale, then they'll be deleted. If they're new, they won't exist, and nothing will happen.
        stale_ids = stale_rows.map { |row| row[primary_key.to_sym] }.join(',')
        source.q("delete from `#{to_db}`.`#{to_table}` where `#{primary_key}` in (#{stale_ids})")
        source.forklift.logger.log("  ^ deleted up to #{stale_rows.length} stale rows from `#{to_db}`.`#{to_table}`")
      end
    end
  end

  # Do the insert into to_table
  destination.q("insert into `#{to_db}`.`#{to_table}` select * from `#{from_db}`.`#{from_table}` where `#{matcher}` > \"#{latest_timestamp}\" order by `#{matcher}`")
  delta = Time.new.to_i - start
  new_count = destination.count(to_table, to_db) - original_count
  source.forklift.logger.log("  ^ created #{new_count} new rows in #{delta}s")
end

.mysql_optimistic_import(source, destination, matcher = source.default_matcher) ⇒ Object

When you are copying data to and from mysql An implementation of “pipe” for remote databases

[View source]

94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/forklift/patterns/mysql_patterns.rb', line 94

def self.mysql_optimistic_import(source, destination, matcher=source.default_matcher)
  source.tables.each do |table|
    if( source.columns(table).include?(matcher) && destination.tables.include?(table) && destination.columns(table).include?(matcher) )
      since = destination.max_timestamp(table)
      source.read_since(table, since){ |data| destination.write(data, table) }
    else
      # destination.truncate table
      destination.drop! table if destination.tables.include?(table)
      source.read("select * from #{table}"){ |data| destination.write(data, table) }
    end
  end
end

.optimistic_pipe(source, from_table, destination, to_table, matcher = source.default_matcher, primary_key = 'id') ⇒ Object

[View source]

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/forklift/patterns/mysql_patterns.rb', line 60

def self.optimistic_pipe(source, from_table, destination, to_table, matcher=source.default_matcher, primary_key='id')
  from_db = source.current_database 
  to_db = destination.current_database 
  if self.can_incremental_pipe?(source, from_table, destination, to_table, matcher)
    begin
      incremental_pipe(source, from_table, destination, to_table, matcher, primary_key)
    rescue Exception => e
      source.forklift.logger.log("! incremental_pipe failure on #{from_table} => #{to_table}: #{e} ")
      source.forklift.logger.log("! falling back to pipe...")
      pipe(source, from_table, destination, to_table)
    end
  else
    pipe(source, from_table, destination, to_table)
  end
end

.pipe(source, from_table, destination, to_table, tmp_table = "_forklift_tmp") ⇒ Object

[View source]

5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/forklift/patterns/mysql_patterns.rb', line 5

def self.pipe(source, from_table, destination, to_table, tmp_table="_forklift_tmp")
  start = Time.new.to_i
  from_db = source.current_database 
  to_db = destination.current_database 
  source.forklift.logger.log("mysql pipe: `#{from_db}`.`#{from_table}` => `#{to_db}`.`#{to_table}`")

  source.q("drop table if exists `#{to_db}`.`#{tmp_table}`")
  source.q("create table `#{to_db}`.`#{tmp_table}` like `#{from_db}`.`#{from_table}`")
  source.q("insert into `#{to_db}`.`#{tmp_table}` select * from `#{from_db}`.`#{from_table}`")
  source.q("drop table if exists `#{to_db}`.`#{to_table}`")
  source.q("rename table `#{to_db}`.`#{tmp_table}` to `#{to_db}`.`#{to_table}`")

  delta = Time.new.to_i - start
  source.forklift.logger.log("  ^ moved #{destination.count(to_table, to_db)} rows in #{delta}s")
end

.write_high_water_mark(db, time, matcher = db.default_matcher) ⇒ Object

The high water method will stub a row in all tables with a ‘default_matcher` column prentending to have a record from `time` This enabled partial forklift funs which will only extract data “later than X” TODO: assumes all columns have a default NULL setting

[View source]

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/forklift/patterns/mysql_patterns.rb', line 110

def self.write_high_water_mark(db, time, matcher=db.default_matcher)
  db.tables.each do |table|
    columns, types = db.columns(table, db.current_database, true)
    if columns.include?(matcher)
      row = {}
      i = 0
      while( i < columns.length )
        if(columns[i] == matcher)
          row[columns[i]] = time.to_s(:db)
        elsif( types[i] =~ /text/ )
          row[columns[i]] = "~~stub~~" 
        elsif( types[i] =~ /varchar/  )
          row[columns[i]] = "~~stub~~".to_sym
        elsif( types[i] =~ /float/ || types[i] =~ /int/ || types[i] =~ /decimal/ )
          row[columns[i]] = 0
        elsif( types[i] =~ /datetime/ || types[i] =~ /timetsamp/ )
          row[columns[i]] = time.to_s(:db)
        elsif( types[i] =~ /date/ )
          row[columns[i]] = time.to_s(:db).split(" ").first
        else
          row[columns[i]] = "NULL"
        end
        i = i + 1
      end
      db.write([row], table)
    end
  end
end