Class: RR::ReplicationInitializer

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyrep/replication_initializer.rb

Overview

Ensures all preconditions are met to start with replication

Constant Summary collapse

DIFF_DUMP_SIZE =

Size of the replication log column diff_dump

2000
DESCRIPTION_SIZE =

Size fo the event log column ‘description’

255
LONG_DESCRIPTION_SIZE =

Size of the event log column ‘long_description’

1000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session) ⇒ ReplicationInitializer

Creates a new RepInititializer for the given Session



14
15
16
# File 'lib/rubyrep/replication_initializer.rb', line 14

def initialize(session)
  self.session = session
end

Instance Attribute Details

#sessionObject

The active Session



11
12
13
# File 'lib/rubyrep/replication_initializer.rb', line 11

def session
  @session
end

Instance Method Details

#call_after_infrastructure_setup_handlerObject

Calls the potentially provided :after_init handler after infrastructure tables are created.



273
274
275
276
# File 'lib/rubyrep/replication_initializer.rb', line 273

def call_after_infrastructure_setup_handler
  handler = session.configuration.options[:after_infrastructure_setup]
  handler.call(session) if handler
end

#change_log_exists?(database) ⇒ Boolean

Returns true if the change log exists in the specified database.

  • database: either :left or :right

Returns:

  • (Boolean)


104
105
106
# File 'lib/rubyrep/replication_initializer.rb', line 104

def change_log_exists?(database)
  session.send(database).tables.include? "#{options[:rep_prefix]}_pending_changes"
end

#clear_sequence_setup(database, table) ⇒ Object

Restores the original sequence settings for the named table. (Actually it sets the sequence increment to 1. If before, it had a different value, then the restoration will not be correct.)

  • database: either :left or :right

  • table_name: name of the table



93
94
95
96
97
98
99
100
# File 'lib/rubyrep/replication_initializer.rb', line 93

def clear_sequence_setup(database, table)
  table_options = options(table)
  if table_options[:adjust_sequences]
    session.send(database).clear_sequence_setup(
      table_options[:rep_prefix], table
    )
  end
end

#create_change_log(database) ⇒ Object

Creates the change log table in the specified database

  • database: either :left or :right



171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/rubyrep/replication_initializer.rb', line 171

def create_change_log(database)
  silence_ddl_notices(database) do
    connection = session.send(database)
    table_name = "#{options[:rep_prefix]}_pending_changes"
    connection.create_table table_name
    connection.add_column table_name, :change_table, :string
    connection.add_column table_name, :change_key, :string
    connection.add_column table_name, :change_new_key, :string
    connection.add_column table_name, :change_type, :string
    connection.add_column table_name, :change_time, :timestamp
    connection.remove_column table_name, 'id'
    connection.add_big_primary_key table_name, 'id'
  end
end

#create_event_logObject

Creates the replication log table.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/rubyrep/replication_initializer.rb', line 150

def create_event_log
  silence_ddl_notices(:left) do
    table_name = "#{options[:rep_prefix]}_logged_events"
    session.left.create_table "#{options[:rep_prefix]}_logged_events"
    session.left.add_column table_name, :activity, :string
    session.left.add_column table_name, :change_table, :string
    session.left.add_column table_name, :diff_type, :string
    session.left.add_column table_name, :change_key, :string
    session.left.add_column table_name, :left_change_type, :string
    session.left.add_column table_name, :right_change_type, :string
    session.left.add_column table_name, :description, :string, :limit => DESCRIPTION_SIZE
    session.left.add_column table_name, :long_description, :string, :limit => LONG_DESCRIPTION_SIZE
    session.left.add_column table_name, :event_time, :timestamp
    session.left.add_column table_name, :diff_dump, :string, :limit => DIFF_DUMP_SIZE
    session.left.remove_column table_name, 'id'
    session.left.add_big_primary_key table_name, 'id'
  end
end

#create_trigger(database, table) ⇒ Object

Creates a trigger logging all table changes

  • database: either :left or :right

  • table: name of the table



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/rubyrep/replication_initializer.rb', line 31

def create_trigger(database, table)
  options = self.options(table)

  params = {
    :trigger_name => "#{options[:rep_prefix]}_#{table}",
    :table => table,
    :keys => session.send(database).primary_key_names(table),
    :log_table => "#{options[:rep_prefix]}_pending_changes",
    :activity_table => "#{options[:rep_prefix]}_running_flags",
    :key_sep => options[:key_sep],
    :exclude_rr_activity => false,
  }

  session.send(database).create_replication_trigger params
end

#drop_activity_markersObject

Checks in both databases, if the activity_marker tables exist. If yes, drops them.



236
237
238
239
240
241
242
243
# File 'lib/rubyrep/replication_initializer.rb', line 236

def drop_activity_markers
  table_name = "#{options[:rep_prefix]}_running_flags"
  [:left, :right].each do |database|
    if session.send(database).tables.include? table_name
      session.send(database).drop_table table_name
    end
  end
end

#drop_change_log(database) ⇒ Object

Drops the change log table in the specified database

  • database: either :left or :right



115
116
117
# File 'lib/rubyrep/replication_initializer.rb', line 115

def drop_change_log(database)
  session.send(database).drop_table "#{options[:rep_prefix]}_pending_changes"
end

#drop_change_logsObject

Checks in both databases, if the change_log tables exist. If yes, drops them.



229
230
231
232
233
# File 'lib/rubyrep/replication_initializer.rb', line 229

def drop_change_logs
  [:left, :right].each do |database|
    drop_change_log(database) if change_log_exists?(database)
  end
end

#drop_event_logObject

Drops the replication log table.



120
121
122
# File 'lib/rubyrep/replication_initializer.rb', line 120

def drop_event_log
  session.left.drop_table "#{options[:rep_prefix]}_logged_events"
end

#drop_infrastructureObject

Removes all rubyrep infrastructure tables from both databases.



246
247
248
249
250
# File 'lib/rubyrep/replication_initializer.rb', line 246

def drop_infrastructure
  drop_event_log if event_log_exists?
  drop_change_logs
  drop_activity_markers
end

#drop_trigger(database, table) ⇒ Object

Drops the replication trigger of the named table.

  • database: either :left or :right

  • table: name of the table



58
59
60
61
# File 'lib/rubyrep/replication_initializer.rb', line 58

def drop_trigger(database, table)
  trigger_name = "#{options(table)[:rep_prefix]}_#{table}"
  session.send(database).drop_replication_trigger trigger_name, table
end

#ensure_activity_markersObject

Checks in both databases, if the activity marker tables exist and if not, creates them.



193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/rubyrep/replication_initializer.rb', line 193

def ensure_activity_markers
  table_name = "#{options[:rep_prefix]}_running_flags"
  [:left, :right].each do |database|
    connection = session.send(database)
    unless connection.tables.include? table_name
      silence_ddl_notices(database) do
        connection.create_table table_name
        connection.add_column table_name, :active, :integer
        connection.remove_column table_name, 'id'
      end
    end
  end
end

#ensure_change_logsObject

Checks in both databases, if the change log tables exists and creates them if necessary



214
215
216
217
218
# File 'lib/rubyrep/replication_initializer.rb', line 214

def ensure_change_logs
  [:left, :right].each do |database|
    create_change_log(database) unless change_log_exists?(database)
  end
end

#ensure_event_logObject

Checks if the event log table already exists and creates it if necessary



208
209
210
# File 'lib/rubyrep/replication_initializer.rb', line 208

def ensure_event_log
  create_event_log unless event_log_exists?
end

#ensure_infrastructureObject

Checks in both databases, if the infrastructure tables (change log, event log) exist and creates them if necessary.



222
223
224
225
226
# File 'lib/rubyrep/replication_initializer.rb', line 222

def ensure_infrastructure
  ensure_activity_markers
  ensure_change_logs
  ensure_event_log
end

#ensure_sequence_setup(table_pair, increment, left_offset, right_offset) ⇒ Object

Ensures that the sequences of the named table (normally the primary key column) are generated with the correct increment and offset in both left and right database. The sequence is always updated in both databases.

  • table_pair: a hash of names of corresponding :left and :right tables

  • increment: increment of the sequence

  • left_offset: offset of table in left database

  • right_offset: offset of table in right database

    1. an increment of 2 and offset of 1 will lead to generation of odd

numbers.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/rubyrep/replication_initializer.rb', line 73

def ensure_sequence_setup(table_pair, increment, left_offset, right_offset)
  table_options = options(table_pair[:left])
  if table_options[:adjust_sequences]
    rep_prefix = table_options[:rep_prefix]
    left_sequence_values = session.left.sequence_values rep_prefix, table_pair[:left]
    right_sequence_values = session.right.sequence_values rep_prefix, table_pair[:right]
    [:left, :right].each do |database|
      offset = database == :left ? left_offset : right_offset
      session.send(database).update_sequences \
        rep_prefix, table_pair[database], increment, offset,
        left_sequence_values, right_sequence_values, table_options[:sequence_adjustment_buffer]
    end
  end
end

#event_log_exists?Boolean

Returns true if the replication log exists.

Returns:

  • (Boolean)


109
110
111
# File 'lib/rubyrep/replication_initializer.rb', line 109

def event_log_exists?
  session.left.tables.include? "#{options[:rep_prefix]}_logged_events"
end

#exclude_rubyrep_tablesObject

Adds to the current session’s configuration an exclusion of rubyrep tables.



187
188
189
# File 'lib/rubyrep/replication_initializer.rb', line 187

def exclude_rubyrep_tables
  session.configuration.exclude_rubyrep_tables
end

#options(table = nil) ⇒ Object

Returns the options for the given table. If table is nil, returns general options.



20
21
22
23
24
25
26
# File 'lib/rubyrep/replication_initializer.rb', line 20

def options(table = nil)
  if table
    session.configuration.options_for_table table
  else
    session.configuration.options
  end
end

#prepare_replicationObject

Prepares the database / tables for replication.



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/rubyrep/replication_initializer.rb', line 279

def prepare_replication
  exclude_rubyrep_tables

  puts "Verifying RubyRep tables"
  ensure_infrastructure

  call_after_infrastructure_setup_handler

  puts "Checking for and removing rubyrep triggers from unconfigured tables"
  restore_unconfigured_tables

  puts "Verifying rubyrep triggers of configured tables"
  unsynced_table_pairs = []
  table_pairs = session.sort_table_pairs(session.configured_table_pairs)
  table_pairs.each do |table_pair|
    table_options = options(table_pair[:left])
    ensure_sequence_setup table_pair,
      table_options[:sequence_increment],
      table_options[:left_sequence_offset],
      table_options[:right_sequence_offset]

    unsynced = false
    [:left, :right].each do |database|
      unless trigger_exists? database, table_pair[database]
        create_trigger database, table_pair[database]
        unsynced = true
      end
    end
    if unsynced and table_options[:initial_sync]
      unsynced_table_pairs << table_pair
    end
  end
  unsynced_table_specs = unsynced_table_pairs.map do |table_pair|
    "#{table_pair[:left]}, #{table_pair[:right]}"
  end

  unless unsynced_table_specs.empty?
    puts "Executing initial table syncs"
    runner = SyncRunner.new
    runner.session = session
    runner.options = {:table_specs => unsynced_table_specs}
    runner.execute
  end

  puts "Starting replication"
end

#restore_unconfigured_tables(configured_table_pairs = session.configured_table_pairs) ⇒ Object

Checks for tables that have triggers but are not in the list of configured tables. Removes triggers and restores sequences of those tables.

  • configured_table_pairs: An array of table pairs (e. g. [=> ‘xy’, :right => ‘xy2’]).



256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/rubyrep/replication_initializer.rb', line 256

def restore_unconfigured_tables(configured_table_pairs = session.configured_table_pairs)
  [:left, :right].each do |database|
    configured_tables = configured_table_pairs.map {|table_pair| table_pair[database]}
    unconfigured_tables = session.send(database).tables - configured_tables
    unconfigured_tables.each do |table|
      if trigger_exists?(database, table)
        drop_trigger(database, table)
        session.send(database).execute(
          "delete from #{options[:rep_prefix]}_pending_changes where change_table = '#{table}'")
      end
      clear_sequence_setup(database, table)
    end
  end
end

#silence_ddl_notices(database) ⇒ Object

Ensures that create_table and related statements don’t print notices to stdout. Then restored original message setting.

  • database: either :left or :right



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rubyrep/replication_initializer.rb', line 136

def silence_ddl_notices(database)
  if session.configuration.send(database)[:adapter] =~ /postgres/
    old_message_level = session.send(database).
      select_one("show client_min_messages")['client_min_messages']
    session.send(database).execute "set client_min_messages = warning"
  end
  yield
ensure
  if session.configuration.send(database)[:adapter] =~ /postgres/
    session.send(database).execute "set client_min_messages = #{old_message_level}"
  end
end

#trigger_exists?(database, table) ⇒ Boolean

Returns true if the replication trigger for the given table exists.

  • database: either :left or :right

  • table: name of the table

Returns:

  • (Boolean)


50
51
52
53
# File 'lib/rubyrep/replication_initializer.rb', line 50

def trigger_exists?(database, table)
  trigger_name = "#{options(table)[:rep_prefix]}_#{table}"
  session.send(database).replication_trigger_exists? trigger_name, table
end