Class: Sq::Dbsync::Manager

Inherits:
Object
  • Object
show all
Includes:
Sq::Dbsync
Defined in:
lib/sq/dbsync/manager.rb

Overview

The manager orchestrates the high level functions of the sync, such as keeping the database up-to-date and batch loading.

This is the main entry point for the application.

Constant Summary collapse

EPOCH =
Date.new(2000, 1, 1).to_time
MAX_RETRIES =
10

Constants included from Sq::Dbsync

VERSION

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, plans) ⇒ Manager

Returns a new instance of Manager.



20
21
22
23
24
# File 'lib/sq/dbsync/manager.rb', line 20

def initialize(config, plans)
  @config        = Sq::Dbsync::Config.make(config)
  @plans         = plans
  @error_handler = ErrorHandler.new(config)
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



131
132
133
# File 'lib/sq/dbsync/manager.rb', line 131

def config
  @config
end

#error_handlerObject

Returns the value of attribute error_handler.



131
132
133
# File 'lib/sq/dbsync/manager.rb', line 131

def error_handler
  @error_handler
end

#plansObject

Returns the value of attribute plans.



131
132
133
# File 'lib/sq/dbsync/manager.rb', line 131

def plans
  @plans
end

Instance Method Details

#batch(tables = :all) ⇒ Object



26
27
28
29
30
31
# File 'lib/sq/dbsync/manager.rb', line 26

def batch(tables = :all)
  error_handler.wrap do
    batch_nonactive(tables)
    refresh_recent(tables)
  end
end

#batch_nonactive(tables = :all) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/sq/dbsync/manager.rb', line 39

def batch_nonactive(tables = :all)
  registry.ensure_storage_exists

  measure(:batch_total) do
    raise_all_if_pipeline_failure(
      run_load(BatchLoadAction, Pipeline::ThreadedContext, tables)
    )
  end
end

#incrementObject



33
34
35
36
37
# File 'lib/sq/dbsync/manager.rb', line 33

def increment
  error_handler.wrap do
    incremental
  end
end

#increment_checkpointObject

Actions that need to be performed regularly, but not every cycle. Please do suggest a better name for this method.



96
97
98
99
100
101
# File 'lib/sq/dbsync/manager.rb', line 96

def increment_checkpoint
  # No need to do this every cycle, 100 is chosen to be as good as any
  # other number. It should run on the very first cycle however so that
  # our specs will cover it.
  purge_registry
end

#incrementalObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/sq/dbsync/manager.rb', line 59

def incremental
  @running = true
  counter = 0

  loop_with_retry_on(->{ @running }, transient_exceptions) do
    incremental_once

    counter = (counter + 1) % 100
    if counter == 1
      # No need to do this every cycle, 100 is chosen to be as good as any
      # other number. It should run on the very first cycle however so that
      # the specs will cover it.
      increment_checkpoint
    end
  end
end

#incremental_onceObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/sq/dbsync/manager.rb', line 76

def incremental_once
  # In theory, this ensures that any changes to the source IP (such as from a
  # virtual IP flip) are picked up.
  sources.each do |_, db|
    db.disconnect
  end

  raise_if_pipeline_failure(
    # ThreadedContext would be ideal here, but it leaks memory in JRuby. Not
    # sure why yet, but mass creation of threads seems like an obvious
    # candidate for brokenness.
    #
    # TODO: Above comment probably isn't true with 1.7 and ThreadedContext
    # fixes.
    run_load(incremental_action, Pipeline::SimpleContext)
  )
end

#plans_with_sourcesObject



119
120
121
122
123
# File 'lib/sq/dbsync/manager.rb', line 119

def plans_with_sources
  @plans_with_sources ||= plans.map do |plan, source_name|
    [plan, sources.fetch(source_name)]
  end
end

#refresh_recent(tables = :all) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/sq/dbsync/manager.rb', line 49

def refresh_recent(tables = :all)
  registry.ensure_storage_exists

  measure(:refresh_recent_total) do
    raise_all_if_pipeline_failure(
      run_load(RefreshRecentLoadAction, Pipeline::ThreadedContext, tables)
    )
  end
end

#sourcesObject



125
126
127
128
129
# File 'lib/sq/dbsync/manager.rb', line 125

def sources
  @sources ||= Hash[config[:sources].map do |name, opts|
    [name, Sq::Dbsync::Database::Connection.create(opts)]
  end]
end

#stop!Object



103
104
105
# File 'lib/sq/dbsync/manager.rb', line 103

def stop!
  @running = false
end

#tables_to_loadObject



111
112
113
114
115
116
117
# File 'lib/sq/dbsync/manager.rb', line 111

def tables_to_load
  plans_with_sources.map do |plan, source|
    plan.tables(source).map do |x|
      x.update(source_db: source)
    end
  end.reduce([], :+).uniq {|x| x[:table_name] }
end

#targetObject



107
108
109
# File 'lib/sq/dbsync/manager.rb', line 107

def target
  @target ||= Sq::Dbsync::Database::Connection.create(config[:target])
end