Class: Sq::Dbsync::Manager
- Inherits:
-
Object
- Object
- Sq::Dbsync::Manager
- Includes:
- Sq::Dbsync
- Defined in:
- lib/sq/dbsync/manager.rb
Overview
The manager orchestrates the high level functions of the sync, such as keeping the database up-to-date and batch loading.
This is the main entry point for the application.
Constant Summary collapse
- EPOCH =
Date.new(2000, 1, 1).to_time
- MAX_RETRIES =
10
Constants included from Sq::Dbsync
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#error_handler ⇒ Object
Returns the value of attribute error_handler.
-
#plans ⇒ Object
Returns the value of attribute plans.
Instance Method Summary collapse
- #batch(tables = :all) ⇒ Object
- #batch_nonactive(tables = :all) ⇒ Object
- #increment ⇒ Object
-
#increment_checkpoint ⇒ Object
Actions that need to be performed regularly, but not every cycle.
- #incremental ⇒ Object
- #incremental_once ⇒ Object
-
#initialize(config, plans) ⇒ Manager
constructor
A new instance of Manager.
- #plans_with_sources ⇒ Object
- #refresh_recent(tables = :all) ⇒ Object
- #sources ⇒ Object
- #stop! ⇒ Object
- #tables_to_load ⇒ Object
- #target ⇒ Object
Constructor Details
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
131 132 133 |
# File 'lib/sq/dbsync/manager.rb', line 131 def config @config end |
#error_handler ⇒ Object
Returns the value of attribute error_handler.
131 132 133 |
# File 'lib/sq/dbsync/manager.rb', line 131 def error_handler @error_handler end |
#plans ⇒ Object
Returns the value of attribute plans.
131 132 133 |
# File 'lib/sq/dbsync/manager.rb', line 131 def plans @plans end |
Instance Method Details
#batch(tables = :all) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/sq/dbsync/manager.rb', line 26 def batch(tables = :all) error_handler.wrap do batch_nonactive(tables) refresh_recent(tables) end end |
#batch_nonactive(tables = :all) ⇒ Object
39 40 41 42 43 44 45 46 47 |
# File 'lib/sq/dbsync/manager.rb', line 39 def batch_nonactive(tables = :all) registry.ensure_storage_exists measure(:batch_total) do raise_all_if_pipeline_failure( run_load(BatchLoadAction, Pipeline::ThreadedContext, tables) ) end end |
#increment ⇒ Object
33 34 35 36 37 |
# File 'lib/sq/dbsync/manager.rb', line 33 def increment error_handler.wrap do incremental end end |
#increment_checkpoint ⇒ Object
Actions that need to be performed regularly, but not every cycle. Please do suggest a better name for this method.
96 97 98 99 100 101 |
# File 'lib/sq/dbsync/manager.rb', line 96 def increment_checkpoint # No need to do this every cycle, 100 is chosen to be as good as any # other number. It should run on the very first cycle however so that # our specs will cover it. purge_registry end |
#incremental ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/sq/dbsync/manager.rb', line 59 def incremental @running = true counter = 0 loop_with_retry_on(->{ @running }, transient_exceptions) do incremental_once counter = (counter + 1) % 100 if counter == 1 # No need to do this every cycle, 100 is chosen to be as good as any # other number. It should run on the very first cycle however so that # the specs will cover it. increment_checkpoint end end end |
#incremental_once ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/sq/dbsync/manager.rb', line 76 def incremental_once # In theory, this ensures that any changes to the source IP (such as from a # virtual IP flip) are picked up. sources.each do |_, db| db.disconnect end raise_if_pipeline_failure( # ThreadedContext would be ideal here, but it leaks memory in JRuby. Not # sure why yet, but mass creation of threads seems like an obvious # candidate for brokenness. # # TODO: Above comment probably isn't true with 1.7 and ThreadedContext # fixes. run_load(incremental_action, Pipeline::SimpleContext) ) end |
#plans_with_sources ⇒ Object
119 120 121 122 123 |
# File 'lib/sq/dbsync/manager.rb', line 119 def plans_with_sources @plans_with_sources ||= plans.map do |plan, source_name| [plan, sources.fetch(source_name)] end end |
#refresh_recent(tables = :all) ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/sq/dbsync/manager.rb', line 49 def refresh_recent(tables = :all) registry.ensure_storage_exists measure(:refresh_recent_total) do raise_all_if_pipeline_failure( run_load(RefreshRecentLoadAction, Pipeline::ThreadedContext, tables) ) end end |
#sources ⇒ Object
125 126 127 128 129 |
# File 'lib/sq/dbsync/manager.rb', line 125 def sources @sources ||= Hash[config[:sources].map do |name, opts| [name, Sq::Dbsync::Database::Connection.create(opts)] end] end |
#stop! ⇒ Object
103 104 105 |
# File 'lib/sq/dbsync/manager.rb', line 103 def stop! @running = false end |
#tables_to_load ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/sq/dbsync/manager.rb', line 111 def tables_to_load plans_with_sources.map do |plan, source| plan.tables(source).map do |x| x.update(source_db: source) end end.reduce([], :+).uniq {|x| x[:table_name] } end |
#target ⇒ Object
107 108 109 |
# File 'lib/sq/dbsync/manager.rb', line 107 def target @target ||= Sq::Dbsync::Database::Connection.create(config[:target]) end |