Class: Sq::Dbsync::IncrementalLoadAction

Inherits:
LoadAction
  • Object
show all
Defined in:
lib/sq/dbsync/incremental_load_action.rb

Overview

Load action to incrementally keep a table up-to-date by loading deltas from the source system. Note that this technique is unable by itself to detect deletes, but behaviour can be added to delete records based on a separate audit log. See documentation for more details.

Constant Summary

Constants inherited from LoadAction

LoadAction::EPOCH

Instance Method Summary collapse

Methods inherited from LoadAction

#call, #do_prepare, #initialize, stages, #tag

Constructor Details

This class inherits a constructor from Sq::Dbsync::LoadAction

Instance Method Details

#extract_dataObject



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/sq/dbsync/incremental_load_action.rb', line 32

def extract_data
  @metadata   = registry.get(plan.table_name)
  @start_time = now.call
  since       = (
    @metadata[:last_row_at] ||
    @metadata[:last_synced_at]
  ) - overlap

  @file, @last_row_at = measure(:extract) { extract_to_file(since) }
  self
end

#filter_columnsObject



73
74
75
76
77
78
# File 'lib/sq/dbsync/incremental_load_action.rb', line 73

def filter_columns
  source         = plan.source_db
  source_columns = source.hash_schema(plan).keys
  plan.columns   = resolve_columns(plan, source_columns) &
    (target_columns || source_columns)
end

#load_dataObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/sq/dbsync/incremental_load_action.rb', line 44

def load_data
  measure(:load) do
    db.transaction do
      db.load_incrementally_from_file(
        plan.prefixed_table_name,
        plan.columns,
        @file.path
      )

      process_deletes

      registry.update(plan.table_name, @metadata[:last_batch_synced_at],
        last_synced_at: @start_time,
        last_row_at:    @last_row_at
      )
    end
    @file.close!
  end
  self
end

#operationObject



10
# File 'lib/sq/dbsync/incremental_load_action.rb', line 10

def operation; 'increment'; end

#post_loadObject



65
66
67
# File 'lib/sq/dbsync/incremental_load_action.rb', line 65

def post_load
  self
end

#prefixObject



69
70
71
# File 'lib/sq/dbsync/incremental_load_action.rb', line 69

def prefix
  ''
end

#prepareObject



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/sq/dbsync/incremental_load_action.rb', line 12

def prepare
  if super
    if plan.always_sync
      registry.set(plan.table_name,
        last_synced_at:       EPOCH,
        last_batch_synced_at: EPOCH,
        last_row_at:          nil
      )
    end

    !!registry.get(plan.table_name)
  else
    if plan.always_sync
      registry.delete(plan.table_name)
      target.drop_table(plan.table_name)
    end
    false
  end
end

#process_deletesObject



90
91
92
# File 'lib/sq/dbsync/incremental_load_action.rb', line 90

def process_deletes
  # Provided as a hook for subclasses
end

#target_columnsObject



80
81
82
83
84
85
86
87
88
# File 'lib/sq/dbsync/incremental_load_action.rb', line 80

def target_columns
  # Because we may create the target table later if necessary,
  # we need to check if it *really* exists
  target_columns = if target.table_exists?(plan.table_name)
    target.hash_schema(plan).keys
  else
    nil
  end
end