Class: Webhookdb::SyncTarget

Inherits:
Object
  • Object
show all
Includes:
Appydays::Configurable, Dbutil
Defined in:
lib/webhookdb/sync_target.rb

Overview

Support exporting WebhookDB data into external services, such as another Postgres instance or data warehouse (Snowflake, etc).

At a high level, the way sync targets work are:

  • User uses the CLI to register a sync target for a specific integration using a database connection string for a supported db (ie, postgres://).

  • They include a period (how often it is synced), and an optional schema and table (if not used, we’ll use the default schema, and the service integration table name).

  • Every minute or so, we look for sync targets that are “due” and enqueue a sync for them. Customers can enqueue their own sync request; but it cannot run more than the minimum allowed sync time.

For the sync logic itself, see run_sync.

Defined Under Namespace

Classes: DatabaseRoutine, Deleted, HttpRoutine, InvalidConnection, Routine, SyncInProgress

Constant Summary collapse

ADVISORY_LOCK_KEYSPACE =

Advisory locks for sync targets use this as the first int, and the id as the second.

2_000_000_000
HTTP_VERIFY_TIMEOUT =
3
DB_VERIFY_TIMEOUT =
2000
DB_VERIFY_STATEMENT =
"SELECT 1"
RAND =
Random.new

Constants included from Dbutil

Dbutil::MOCK_CONN

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Dbutil

borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn

Instance Attribute Details

#connection_urlString

Returns:

  • (String)


# File 'lib/webhookdb/sync_target.rb', line 306

#service_integrationWebhookdb::ServiceIntegration



# File 'lib/webhookdb/sync_target.rb', line 303

Class Method Details

.default_valid_periodObject



72
73
74
# File 'lib/webhookdb/sync_target.rb', line 72

def self.default_valid_period
  return self.valid_period(Webhookdb::SyncTarget.default_min_period_seconds)
end

.valid_period(beginval) ⇒ Object



64
65
66
# File 'lib/webhookdb/sync_target.rb', line 64

def self.valid_period(beginval)
  return beginval..self.max_period_seconds
end

.valid_period_for(org) ⇒ Object



68
69
70
# File 'lib/webhookdb/sync_target.rb', line 68

def self.valid_period_for(org)
  return self.valid_period(org.minimum_sync_seconds)
end

.validate_db_url(s) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/webhookdb/sync_target.rb', line 108

def self.validate_db_url(s)
  begin
    url = URI(s)
  rescue URI::InvalidURIError
    return "That's not a valid URL."
  end
  protocols = ["postgres", "snowflake"]
  unless protocols.include?(url.scheme)
    protostr = protocols.join(", ")
    # rubocop:disable Layout/LineLength
    msg = "The '#{url.scheme}' protocol is not supported for database sync targets. Supported protocols are: #{protostr}."
    # rubocop:enable Layout/LineLength
    return msg
  end
  return nil
end

.validate_http_url(s) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/webhookdb/sync_target.rb', line 125

def self.validate_http_url(s)
  begin
    url = URI(s)
  rescue URI::InvalidURIError
    return "That's not a valid URL."
  end
  case url.scheme
    when "https"
      return nil if url.user.present? || url.password.present?
      url.user = "user"
      url.password = "pass"
      return "https urls must include a Basic Auth username and/or password, like '#{url}'"
    when "http"
      # http does not require a username/pass since it's only for internal use.
      return Webhookdb::SyncTarget.allow_http ? nil : "Url must be https, not http."
    else
      return "Must be an https url."
  end
end

.verify_db_connection(url) ⇒ Object



145
146
147
148
149
150
151
152
153
154
# File 'lib/webhookdb/sync_target.rb', line 145

def self.verify_db_connection(url)
  adapter = Webhookdb::DBAdapter.adapter(url)
  begin
    adapter.verify_connection(url, timeout: DB_VERIFY_TIMEOUT, statement: DB_VERIFY_STATEMENT)
  rescue StandardError => e
    # noinspection RailsParamDefResolve
    msg = e.try(:wrapped_exception).try(:to_s) || e.to_s
    raise InvalidConnection, "Could not SELECT 1: #{msg.strip}"
  end
end

.verify_http_connection(url) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/webhookdb/sync_target.rb', line 156

def self.verify_http_connection(url)
  cleanurl, authparams = Webhookdb::Http.extract_url_auth(url)
  body = {
    rows: [],
    integration_id: "svi_test",
    integration_service: "httpsync_test",
    table: "test",
  }
  begin
    Webhookdb::Http.post(
      cleanurl,
      body,
      logger: self.logger,
      basic_auth: authparams,
      timeout: HTTP_VERIFY_TIMEOUT,
      follow_redirects: true,
    )
  rescue Timeout::Error => e
    raise InvalidConnection, "POST to #{cleanurl} timed out: #{e.message}"
  rescue Webhookdb::Http::Error => e
    raise InvalidConnection, "POST to #{cleanurl} failed: #{e.message}"
  end
end

Instance Method Details

#advisory_lock(db) ⇒ Sequel::AdvisoryLock



258
259
260
# File 'lib/webhookdb/sync_target.rb', line 258

def advisory_lock(db)
  return Sequel::AdvisoryLock.new(db, ADVISORY_LOCK_KEYSPACE, self.id)
end

#associated_idString

Returns:

  • (String)


273
274
275
276
# File 'lib/webhookdb/sync_target.rb', line 273

def associated_id
  # Eventually we need to support orgs
  return self.service_integration.opaque_id
end

#associated_object_displayObject



278
279
280
# File 'lib/webhookdb/sync_target.rb', line 278

def associated_object_display
  return "#{self.service_integration.opaque_id}/#{self.service_integration.table_name}"
end

#associated_typeString

Returns:

  • (String)


267
268
269
270
# File 'lib/webhookdb/sync_target.rb', line 267

def associated_type
  # Eventually we need to support orgs
  return "service_integration"
end

#before_createObject



299
300
301
# File 'lib/webhookdb/sync_target.rb', line 299

def before_create
  self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("syt")
end

#before_validationObject



294
295
296
297
# File 'lib/webhookdb/sync_target.rb', line 294

def before_validation
  self.page_size ||= Webhookdb::SyncTarget.default_page_size
  super
end

#db?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/webhookdb/sync_target.rb', line 104

def db?
  return !self.http?
end

#displaysafe_connection_urlObject



262
263
264
# File 'lib/webhookdb/sync_target.rb', line 262

def displaysafe_connection_url
  return displaysafe_url(self.connection_url)
end

#http?Boolean

Returns:

  • (Boolean)


98
99
100
101
102
# File 'lib/webhookdb/sync_target.rb', line 98

def http?
  url = URI(self.connection_url)
  return true if ["http", "https"].include?(url.scheme)
  return false
end

#jitterObject

Return the jitter used for enqueing the next sync of the job. It should never be more than 20 seconds, nor should it be more than 1/4 of the total period, since it needs to run at a reasonably predictable time. Jitter is always >= 1, since it is helpful to be able to assert it will always be in the future.



199
200
201
202
203
# File 'lib/webhookdb/sync_target.rb', line 199

def jitter
  max_jitter = [20, self.period_seconds / 4].min
  max_jitter = [1, max_jitter].max
  return RAND.rand(1..max_jitter)
end

#next_possible_sync(now:) ⇒ Object



184
185
186
# File 'lib/webhookdb/sync_target.rb', line 184

def next_possible_sync(now:)
  return self.next_sync(self.organization.minimum_sync_seconds, now)
end

#next_scheduled_sync(now:) ⇒ Object



180
181
182
# File 'lib/webhookdb/sync_target.rb', line 180

def next_scheduled_sync(now:)
  return self.next_sync(self.period_seconds, now)
end

#organizationWebhookdb::Organization



290
291
292
# File 'lib/webhookdb/sync_target.rb', line 290

def organization
  return self.service_integration.organization
end

#run_sync(now:) ⇒ Object

Running a sync involves some work we always do (export, transform), and then work that varies per-adapter (load).

First, we lock using an advisory lock to make sure we never sync the same sync target concurrently. It can cause correctness and performance issues. Raise a SyncInProgress error if we’re currently syncing.

If the sync target is against an HTTP URL, see _run_http_sync.

If the sync target is a database connection:

  • Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.

  • Select rows created/updated since our last update in our ‘source’ database.

  • Write them to disk into a CSV file.

  • Pass this CSV file to the proper sync target adapter.

  • For example, the PG sync target will:

    • Create a temp table in the target database, using the schema from the sync target table.

    • Load the data into that temp table.

    • Insert rows into the target table temp table rows that do not appear in the target table.

    • Update rows in the target table temp table rows that already appear in the target table.

  • The snowflake sync target will:

    • PUT the CSV file into the stage for the table.

    • Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.

    • Purge the staged file.

will be synced.

Parameters:

  • now (Time)

    The current time. Rows that were updated <= to ‘now’, and >= the ‘last updated’ timestamp,

Raises:



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/webhookdb/sync_target.rb', line 235

def run_sync(now:)
  ran = false
  # Take the advisory lock with a separate connection. This seems to be pretty important-
  # it's possible that (for reasons not clear at this time) using the standard connection pool
  # results in the lock being held since the session remains open for a while on the worker.
  # Opening a separate connection ensures that, once this method exits, the lock will be released
  # since the session will be ended.
  Webhookdb::Dbutil.borrow_conn(Webhookdb::Postgres::Model.uri) do |db|
    self.advisory_lock(db).with_lock? do
      routine = if self.connection_url.start_with?("https://", "http://")
                  # Note that http links are not secure and should only be used for development purposes
                  HttpRoutine.new(now, self)
      else
        DatabaseRoutine.new(now, self)
      end
      routine.run
      ran = true
    end
  end
  raise SyncInProgress, "SyncTarget[#{self.id}] is already being synced" unless ran
end

#schema_and_table_stringString

Returns:

  • (String)


283
284
285
286
287
# File 'lib/webhookdb/sync_target.rb', line 283

def schema_and_table_string
  schema_name = self.schema.present? ? self.schema : self.class.default_schema
  table_name = self.table.present? ? self.table : self.service_integration.table_name
  return "#{schema_name}.#{table_name}"
end