Class: Webhookdb::SyncTarget
- Inherits:
-
Object
- Object
- Webhookdb::SyncTarget
- Includes:
- Appydays::Configurable, Dbutil
- Defined in:
- lib/webhookdb/sync_target.rb
Overview
Support exporting WebhookDB data into external services, such as another Postgres instance or data warehouse (Snowflake, etc).
At a high level, the way sync targets work are:
-
User uses the CLI to register a sync target for a specific integration using a database connection string for a supported db (ie, postgres://).
-
They include a period (how often it is synced), and an optional schema and table (if not used, we’ll use the default schema, and the service integration table name).
-
Every minute or so, we look for sync targets that are “due” and enqueue a sync for them. Customers can enqueue their own sync request; but it cannot run more than the minimum allowed sync time.
For the sync logic itself, see run_sync
.
Defined Under Namespace
Classes: DatabaseRoutine, Deleted, HttpRoutine, InvalidConnection, Routine, SyncInProgress
Constant Summary collapse
- ADVISORY_LOCK_KEYSPACE =
Advisory locks for sync targets use this as the first int, and the id as the second.
2_000_000_000
- HTTP_VERIFY_TIMEOUT =
3
- DB_VERIFY_TIMEOUT =
2000
- DB_VERIFY_STATEMENT =
"SELECT 1"
- RAND =
Random.new
Constants included from Dbutil
Instance Attribute Summary collapse
Class Method Summary collapse
- .default_valid_period ⇒ Object
- .valid_period(beginval) ⇒ Object
- .valid_period_for(org) ⇒ Object
- .validate_db_url(s) ⇒ Object
- .validate_http_url(s) ⇒ Object
- .verify_db_connection(url) ⇒ Object
- .verify_http_connection(url) ⇒ Object
Instance Method Summary collapse
- #advisory_lock(db) ⇒ Sequel::AdvisoryLock
- #associated_id ⇒ String
- #associated_object_display ⇒ Object
- #associated_type ⇒ String
- #before_create ⇒ Object
- #before_validation ⇒ Object
- #db? ⇒ Boolean
- #displaysafe_connection_url ⇒ Object
- #http? ⇒ Boolean
-
#jitter ⇒ Object
Return the jitter used for enqueing the next sync of the job.
- #next_possible_sync(now:) ⇒ Object
- #next_scheduled_sync(now:) ⇒ Object
- #organization ⇒ Webhookdb::Organization
-
#run_sync(now:) ⇒ Object
Running a sync involves some work we always do (export, transform), and then work that varies per-adapter (load).
- #schema_and_table_string ⇒ String
Methods included from Dbutil
borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn
Instance Attribute Details
#connection_url ⇒ String
|
# File 'lib/webhookdb/sync_target.rb', line 304
|
#service_integration ⇒ Webhookdb::ServiceIntegration
|
# File 'lib/webhookdb/sync_target.rb', line 301
|
Class Method Details
.default_valid_period ⇒ Object
72 73 74 |
# File 'lib/webhookdb/sync_target.rb', line 72 def self.default_valid_period return self.valid_period(Webhookdb::SyncTarget.default_min_period_seconds) end |
.valid_period(beginval) ⇒ Object
64 65 66 |
# File 'lib/webhookdb/sync_target.rb', line 64 def self.valid_period(beginval) return beginval..self.max_period_seconds end |
.valid_period_for(org) ⇒ Object
68 69 70 |
# File 'lib/webhookdb/sync_target.rb', line 68 def self.valid_period_for(org) return self.valid_period(org.minimum_sync_seconds) end |
.validate_db_url(s) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/webhookdb/sync_target.rb', line 106 def self.validate_db_url(s) begin url = URI(s) rescue URI::InvalidURIError return "That's not a valid URL." end protocols = ["postgres", "snowflake"] unless protocols.include?(url.scheme) protostr = protocols.join(", ") # rubocop:disable Layout/LineLength msg = "The '#{url.scheme}' protocol is not supported for database sync targets. Supported protocols are: #{protostr}." # rubocop:enable Layout/LineLength return msg end return nil end |
.validate_http_url(s) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/webhookdb/sync_target.rb', line 123 def self.validate_http_url(s) begin url = URI(s) rescue URI::InvalidURIError return "That's not a valid URL." end case url.scheme when "https" return nil if url.user.present? || url.password.present? url.user = "user" url.password = "pass" return "https urls must include a Basic Auth username and/or password, like '#{url}'" when "http" # http does not require a username/pass since it's only for internal use. return Webhookdb::SyncTarget.allow_http ? nil : "Url must be https, not http." else return "Must be an https url." end end |
.verify_db_connection(url) ⇒ Object
143 144 145 146 147 148 149 150 151 152 |
# File 'lib/webhookdb/sync_target.rb', line 143 def self.verify_db_connection(url) adapter = Webhookdb::DBAdapter.adapter(url) begin adapter.verify_connection(url, timeout: DB_VERIFY_TIMEOUT, statement: DB_VERIFY_STATEMENT) rescue StandardError => e # noinspection RailsParamDefResolve msg = e.try(:wrapped_exception).try(:to_s) || e.to_s raise InvalidConnection, "Could not SELECT 1: #{msg.strip}" end end |
.verify_http_connection(url) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/webhookdb/sync_target.rb', line 154 def self.verify_http_connection(url) cleanurl, authparams = Webhookdb::Http.extract_url_auth(url) body = { rows: [], integration_id: "svi_test", integration_service: "httpsync_test", table: "test", } begin Webhookdb::Http.post( cleanurl, body, logger: self.logger, basic_auth: authparams, timeout: HTTP_VERIFY_TIMEOUT, follow_redirects: true, ) rescue Timeout::Error => e raise InvalidConnection, "POST to #{cleanurl} timed out: #{e.}" rescue Webhookdb::Http::Error => e raise InvalidConnection, "POST to #{cleanurl} failed: #{e.}" end end |
Instance Method Details
#advisory_lock(db) ⇒ Sequel::AdvisoryLock
256 257 258 |
# File 'lib/webhookdb/sync_target.rb', line 256 def advisory_lock(db) return Sequel::AdvisoryLock.new(db, ADVISORY_LOCK_KEYSPACE, self.id) end |
#associated_id ⇒ String
271 272 273 274 |
# File 'lib/webhookdb/sync_target.rb', line 271 def associated_id # Eventually we need to support orgs return self.service_integration.opaque_id end |
#associated_object_display ⇒ Object
276 277 278 |
# File 'lib/webhookdb/sync_target.rb', line 276 def associated_object_display return "#{self.service_integration.opaque_id}/#{self.service_integration.table_name}" end |
#associated_type ⇒ String
265 266 267 268 |
# File 'lib/webhookdb/sync_target.rb', line 265 def associated_type # Eventually we need to support orgs return "service_integration" end |
#before_create ⇒ Object
297 298 299 |
# File 'lib/webhookdb/sync_target.rb', line 297 def before_create self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("syt") end |
#before_validation ⇒ Object
292 293 294 295 |
# File 'lib/webhookdb/sync_target.rb', line 292 def before_validation self.page_size ||= Webhookdb::SyncTarget.default_page_size super end |
#db? ⇒ Boolean
102 103 104 |
# File 'lib/webhookdb/sync_target.rb', line 102 def db? return !self.http? end |
#displaysafe_connection_url ⇒ Object
260 261 262 |
# File 'lib/webhookdb/sync_target.rb', line 260 def displaysafe_connection_url return displaysafe_url(self.connection_url) end |
#http? ⇒ Boolean
96 97 98 99 100 |
# File 'lib/webhookdb/sync_target.rb', line 96 def http? url = URI(self.connection_url) return true if ["http", "https"].include?(url.scheme) return false end |
#jitter ⇒ Object
Return the jitter used for enqueing the next sync of the job. It should never be more than 20 seconds, nor should it be more than 1/4 of the total period, since it needs to run at a reasonably predictable time. Jitter is always >= 1, since it is helpful to be able to assert it will always be in the future.
197 198 199 200 201 |
# File 'lib/webhookdb/sync_target.rb', line 197 def jitter max_jitter = [20, self.period_seconds / 4].min max_jitter = [1, max_jitter].max return RAND.rand(1..max_jitter) end |
#next_possible_sync(now:) ⇒ Object
182 183 184 |
# File 'lib/webhookdb/sync_target.rb', line 182 def next_possible_sync(now:) return self.next_sync(self.organization.minimum_sync_seconds, now) end |
#next_scheduled_sync(now:) ⇒ Object
178 179 180 |
# File 'lib/webhookdb/sync_target.rb', line 178 def next_scheduled_sync(now:) return self.next_sync(self.period_seconds, now) end |
#organization ⇒ Webhookdb::Organization
288 289 290 |
# File 'lib/webhookdb/sync_target.rb', line 288 def organization return self.service_integration.organization end |
#run_sync(now:) ⇒ Object
Running a sync involves some work we always do (export, transform), and then work that varies per-adapter (load).
First, we lock using an advisory lock to make sure we never sync the same sync target concurrently. It can cause correctness and performance issues. Raise a SyncInProgress
error if we’re currently syncing.
If the sync target is against an HTTP URL, see _run_http_sync
.
If the sync target is a database connection:
-
Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.
-
Select rows created/updated since our last update in our ‘source’ database.
-
Write them to disk into a CSV file.
-
Pass this CSV file to the proper sync target adapter.
-
For example, the PG sync target will:
-
Create a temp table in the target database, using the schema from the sync target table.
-
Load the data into that temp table.
-
Insert rows into the target table temp table rows that do not appear in the target table.
-
Update rows in the target table temp table rows that already appear in the target table.
-
-
The snowflake sync target will:
-
PUT the CSV file into the stage for the table.
-
Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.
-
Purge the staged file.
-
will be synced.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/webhookdb/sync_target.rb', line 233 def run_sync(now:) ran = false # Take the advisory lock with a separate connection. This seems to be pretty important- # it's possible that (for reasons not clear at this time) using the standard connection pool # results in the lock being held since the session remains open for a while on the worker. # Opening a separate connection ensures that, once this method exits, the lock will be released # since the session will be ended. Webhookdb::Dbutil.borrow_conn(Webhookdb::Postgres::Model.uri) do |db| self.advisory_lock(db).with_lock? do routine = if self.connection_url.start_with?("https://", "http://") # Note that http links are not secure and should only be used for development purposes HttpRoutine.new(now, self) else DatabaseRoutine.new(now, self) end routine.run ran = true end end raise SyncInProgress, "SyncTarget[#{self.id}] is already being synced" unless ran end |
#schema_and_table_string ⇒ String
281 282 283 284 285 |
# File 'lib/webhookdb/sync_target.rb', line 281 def schema_and_table_string schema_name = self.schema.present? ? self.schema : self.class.default_schema table_name = self.table.present? ? self.table : self.service_integration.table_name return "#{schema_name}.#{table_name}" end |