Class: Webhookdb::Organization

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/organization.rb

Defined Under Namespace

Classes: Alerting, DatabaseMigration, DbBuilder, QueryResult, SchemaMigrationError

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.create_if_unique(params) ⇒ Object



102
103
104
105
106
107
108
# File 'lib/webhookdb/organization.rb', line 102

def self.create_if_unique(params)
  self.db.transaction(savepoint: true) do
    return Webhookdb::Organization.create(name: params[:name])
  end
rescue Sequel::UniqueConstraintViolation
  return nil
end

.enqueue_migrate_all_replication_tablesObject

As part of the release process, we enqueue a job that will migrate the replication schemas for all organizations. However this job must use the NEW code being released; it should not use the CURRENT code the workers may be using when this method is run during the release process.

We can get around this by enqueing the jobs with the ‘target’ release creation date. Only jobs that execute with this release creation date will perform the migration; if the job is running using an older release creation date (ie still running old code), it will re-enqueue the migration to run in the future, using a worker that will eventually be using newer code.

For example:

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • The workers, using Release A code (with a release_created_at of 0), run the ReplicationMigration job. They see the target release_created_at of 1 is greater than/after the current release_created_at of 0, so re-enqueue the job.

  • Eventually the workers are using Release B code, which has a release_created_at of 1. This matches the target, so the job is run.

For a more complex example, which involves releases created in quick succession (we need to be careful to avoid jobs that never run):

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • Release C, created at 2, runs this method.

  • Workers are backed up, so nothing is processed until all workers are using Release C.

  • Workers using Release C code process two sets of jobs:

    • Jobs with a target release_created_at of 1

    • Jobs with a target release_created_at of 2

  • Jobs with a target of 2 run the actual migration, because the times match.

  • Jobs with a target of 1, see that the target is less than/before current release_created_at of 2. This indicates the migration is stale, and the job is discarded.

NOTE: There will always be a race condition where we may process webhooks using the new code, before we’ve migrated the replication schemas into the new code. This will error during the upsert because the column doesn’t yet exist. However these will be retried automatically, and quickly, so we don’t worry about them yet.



324
325
326
327
328
# File 'lib/webhookdb/organization.rb', line 324

def self.enqueue_migrate_all_replication_tables
  Webhookdb::Organization.each do |org|
    Webhookdb::Jobs::ReplicationMigration.perform_in(2, org.id, Webhookdb::RELEASE_CREATED_AT)
  end
end

Instance Method Details

#active_subscription?Boolean

SUBSCRIPTION PERMISSIONS

Returns:

  • (Boolean)


459
460
461
462
463
464
465
# File 'lib/webhookdb/organization.rb', line 459

def active_subscription?
  subscription = Webhookdb::Subscription[stripe_customer_id: self.stripe_customer_id]
  # return false if no subscription
  return false if subscription.nil?
  # otherwise check stripe subscription string
  return ["trialing", "active", "past due"].include? subscription.status
end

#add_membership(opts = {}) ⇒ Object

:section: Memberships



449
450
451
452
453
454
455
# File 'lib/webhookdb/organization.rb', line 449

def add_membership(opts={})
  if !opts.is_a?(Webhookdb::OrganizationMembership) && !opts.key?(:verified)
    raise ArgumentError, "must pass :verified or a model into add_membership, it is ambiguous otherwise"
  end
  self.associations.delete(opts[:verified] ? :verified_memberships : :invited_memberships)
  return self.add_all_membership(opts)
end

#admin_connection(**kw) ⇒ Object



126
127
128
# File 'lib/webhookdb/organization.rb', line 126

def admin_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.admin_connection_url_raw, **kw, &)
end

#admin_connection_urlObject

Return the admin connection url, with the host set to public_host if set.



187
188
189
# File 'lib/webhookdb/organization.rb', line 187

def admin_connection_url
  return self._public_host_connection_url(self.admin_connection_url_raw)
end

#admin_customersObject



110
111
112
# File 'lib/webhookdb/organization.rb', line 110

def admin_customers
  return self.verified_memberships.filter(&:admin?).map(&:customer)
end

#admin_userObject



212
213
214
215
# File 'lib/webhookdb/organization.rb', line 212

def admin_user
  ur = URI(self.admin_connection_url)
  return ur.user
end

#alertingObject



114
115
116
# File 'lib/webhookdb/organization.rb', line 114

def alerting
  return @alerting ||= Alerting.new(self)
end

#available_replicatorsObject



475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/webhookdb/organization.rb', line 475

def available_replicators
  available = Webhookdb::Replicator.registry.values.filter do |desc|
    # The org must have any of the flags required for the service. In other words,
    # the intersection of desc[:feature_roles] & org.feature_roles must
    # not be empty
    no_restrictions = desc.feature_roles.empty?
    next true if no_restrictions
    org_has_access = (self.feature_roles.map(&:name) & desc.feature_roles).present?
    org_has_access
  end
  return available
end

#before_validationObject



95
96
97
98
99
100
# File 'lib/webhookdb/organization.rb', line 95

def before_validation
  self.minimum_sync_seconds ||= Webhookdb::SyncTarget.default_min_period_seconds
  self.key ||= Webhookdb.to_slug(self.name)
  self.replication_schema ||= Webhookdb::Organization::DbBuilder.new(self).default_replication_schema
  super
end

#can_add_new_integration?Boolean

Returns:

  • (Boolean)


467
468
469
470
471
472
473
# File 'lib/webhookdb/organization.rb', line 467

def can_add_new_integration?
  # if the sint's organization has an active subscription, return true
  return true if self.active_subscription?
  # if there is no active subscription, check number of integrations against free tier max
  limit = Webhookdb::Subscription.max_free_integrations
  return Webhookdb::ServiceIntegration.where(organization: self).count < limit
end

#cli_editable_fieldsObject



118
119
120
# File 'lib/webhookdb/organization.rb', line 118

def cli_editable_fields
  return ["name", "billing_email"]
end

#create_public_host_cname(safe: false) ⇒ Object

Create a CNAME in Cloudflare for the currently configured connection urls.

Parameters:

  • safe (*) (defaults to: false)

    If true, noop if the public host is set.



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/webhookdb/organization.rb', line 255

def create_public_host_cname(safe: false)
  self.db.transaction do
    self.lock!
    # We must have a host to create a CNAME to.
    raise Webhookdb::InvalidPrecondition, "connection urls must be set" if self.readonly_connection_url_raw.blank?
    # Should only be used once when creating the org DBs.
    if self.public_host.present?
      return if safe
      raise Webhookdb::InvalidPrecondition, "public_host must not be set"
    end
    # Use the raw URL, even though we know at this point
    # public_host is empty so raw and public host urls are the same.
    Webhookdb::Organization::DbBuilder.new(self).create_public_host_cname(self.readonly_connection_url_raw)
    self.save_changes
  end
end

#dbnameObject



206
207
208
209
210
# File 'lib/webhookdb/organization.rb', line 206

def dbname
  raise Webhookdb::InvalidPrecondition, "no db has been created, call prepare_database_connections first" if
    self.admin_connection_url.blank?
  return URI(self.admin_connection_url).path.tr("/", "")
end

#display_stringObject



228
229
230
# File 'lib/webhookdb/organization.rb', line 228

def display_string
  return "#{self.name} (#{self.key})"
end

#execute_readonly_query(sql) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/webhookdb/organization.rb', line 130

def execute_readonly_query(sql)
  max_rows = self.max_query_rows || self.class.max_query_rows
  return self.readonly_connection do |conn|
    ds = conn.fetch(sql)
    r = QueryResult.new
    r.max_rows_reached = false
    r.columns = ds.columns
    r.rows = []
    # Stream to avoid pulling in all rows of unlimited queries
    ds.stream.each do |row|
      if r.rows.length >= max_rows
        r.max_rows_reached = true
        break
      end
      r.rows << row.values
    end
    return r
  end
end

#execute_readonly_query_with_help(sql) ⇒ Array<Webhookdb::Organization::QueryResult,String,nil>

Run the given SQL inside the org, and use special error handling if it fails.

Returns:

  • (Array<Webhookdb::Organization::QueryResult,String,nil>)

    Tuple of query result, and optional message. On query success, return <QueryResult, nil>. On DatabaseError, return <nil, message>. On other types of errors, raise.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/webhookdb/organization.rb', line 155

def execute_readonly_query_with_help(sql)
  result = self.execute_readonly_query(sql)
  return result, nil
rescue Sequel::DatabaseError => e
  self.logger.error("db_query_database_error", error: e)
  # We want to handle InsufficientPrivileges and UndefinedTable explicitly
  # since we can hint the user at what to do.
  # Otherwise, we should just return the Postgres exception.
  msg = ""
  case e.wrapped_exception
    when PG::UndefinedTable
      missing_table = e.wrapped_exception.message.match(/relation (.+) does not/)&.captures&.first
      msg = "The table #{missing_table} does not exist. Run `webhookdb db tables` to see available tables." if
        missing_table
    when PG::InsufficientPrivilege
      msg = "You do not have permission to perform this query. Queries must be read-only."
    else
      msg = e.wrapped_exception.message
  end
  return [nil, msg]
end

#get_stripe_billing_portal_urlObject



414
415
416
417
418
419
420
421
422
423
424
# File 'lib/webhookdb/organization.rb', line 414

def get_stripe_billing_portal_url
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::BillingPortal::Session.create(
    {
      customer: self.stripe_customer_id,
      return_url: Webhookdb.app_url + "/jump/portal-return",
    },
  )

  return session.url
end

#get_stripe_checkout_url(price_id) ⇒ Object



426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/webhookdb/organization.rb', line 426

def get_stripe_checkout_url(price_id)
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::Checkout::Session.create(
    {
      customer: self.stripe_customer_id,
      cancel_url: Webhookdb.app_url + "/jump/checkout-cancel",
      line_items: [{
        price: price_id, quantity: 1,
      }],
      mode: "subscription",
      payment_method_types: ["card"],
      allow_promotion_codes: true,
      success_url: Webhookdb.app_url + "/jump/checkout-success",
    },
  )

  return session.url
end

#log_tagsObject



88
89
90
91
92
93
# File 'lib/webhookdb/organization.rb', line 88

def log_tags
  return {
    organization_id: self.id,
    organization_key: self.key,
  }
end

#migrate_replication_schema(schema) ⇒ Object



386
387
388
389
390
391
392
393
394
395
396
# File 'lib/webhookdb/organization.rb', line 386

def migrate_replication_schema(schema)
  Webhookdb::DBAdapter.validate_identifier!(schema, type: "schema")
  Webhookdb::Organization::DatabaseMigration.guard_ongoing!(self)
  raise SchemaMigrationError, "destination and target schema are the same" if schema == self.replication_schema
  builder = Webhookdb::Organization::DbBuilder.new(self)
  sql = builder.migration_replication_schema_sql(self.replication_schema, schema)
  self.admin_connection(transaction: true) do |db|
    db << sql
  end
  self.update(replication_schema: schema)
end

#migrate_replication_tablesObject

Get all the table names and column names for all integrations in the org Find any of those table/column pairs that are not present in information_schema.columns Ensure all columns for those integrations/tables.



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/webhookdb/organization.rb', line 333

def migrate_replication_tables
  tables = self.service_integrations.map(&:table_name)
  sequences_in_app_db = self.db[Sequel[:information_schema][:sequences]].
    grep(:sequence_name, "replicator_seq_org_#{self.id}_%").
    select_map(:sequence_name).
    to_set
  cols_in_org_db = {}
  indices_in_org_db = Set.new
  self.admin_connection do |db|
    cols_in_org_db = db[Sequel[:information_schema][:columns]].
      where(table_schema: self.replication_schema, table_name: tables).
      select(
        :table_name,
        Sequel.function(:array_agg, :column_name).cast("text[]").as(:columns),
      ).
      group_by(:table_name).
      all.
      to_h { |c| [c[:table_name], c[:columns]] }
    indices_in_org_db = db[Sequel[:pg_indexes]].
      where(schemaname: self.replication_schema, tablename: tables).
      select_map(:indexname).
      to_set
  end

  self.service_integrations.each do |sint|
    svc = sint.replicator
    existing_columns = cols_in_org_db.fetch(sint.table_name) { [] }
    cols_for_sint = svc.storable_columns.map { |c| c.name.to_s }
    all_sint_cols_exist = (cols_for_sint - existing_columns).empty?

    all_indices_exist = svc.indices(svc.dbadapter_table).all? do |ind|
      indices_in_org_db.include?(ind.name.to_s)
    end

    svc.ensure_all_columns unless all_sint_cols_exist && all_indices_exist
    if svc.requires_sequence? && !sequences_in_app_db.include?(sint.sequence_name)
      sint.ensure_sequence(skip_check: true)
    end
  end
end

#prepare_database_connections(safe: false) ⇒ Object

Build the org-specific users, database, and set our connection URLs to it.

Parameters:

  • safe (*) (defaults to: false)

    If true, noop if connection urls are set.



238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/webhookdb/organization.rb', line 238

def prepare_database_connections(safe: false)
  self.db.transaction do
    self.lock!
    if self.admin_connection_url.present?
      return if safe
      raise Webhookdb::InvalidPrecondition, "connections already set"
    end
    builder = Webhookdb::Organization::DbBuilder.new(self)
    builder.prepare_database_connections
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#prepare_database_connections?Boolean

Returns:

  • (Boolean)


232
233
234
# File 'lib/webhookdb/organization.rb', line 232

def prepare_database_connections?
  return self.prepare_database_connections(safe: true)
end

#readonly_connection(**kw) ⇒ Object



122
123
124
# File 'lib/webhookdb/organization.rb', line 122

def readonly_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.readonly_connection_url_raw, **kw, &)
end

#readonly_connection_urlObject

Return the readonly connection url, with the host set to public_host if set.



182
183
184
# File 'lib/webhookdb/organization.rb', line 182

def readonly_connection_url
  return self._public_host_connection_url(self.readonly_connection_url_raw)
end

#readonly_userObject



217
218
219
220
# File 'lib/webhookdb/organization.rb', line 217

def readonly_user
  ur = URI(self.readonly_connection_url)
  return ur.user
end

#register_in_stripeObject



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/webhookdb/organization.rb', line 398

def register_in_stripe
  raise Webhookdb::InvalidPrecondition, "org already in Stripe" if self.stripe_customer_id.present?
  stripe_customer = Stripe::Customer.create(
    {
      name: self.name,
      email: self.billing_email,
      metadata: {
        org_id: self.id,
      },
    },
  )
  self.stripe_customer_id = stripe_customer.id
  self.save_changes
  return stripe_customer
end

Delete the org-specific database and remove the org connection strings. Use this when an org is to be deleted (either for real, or in test teardown).



274
275
276
277
278
279
280
281
282
# File 'lib/webhookdb/organization.rb', line 274

def remove_related_database
  self.db.transaction do
    self.lock!
    Webhookdb::Organization::DbBuilder.new(self).remove_related_database
    self.admin_connection_url_raw = ""
    self.readonly_connection_url_raw = ""
    self.save_changes
  end
end

#roll_database_credentialsObject

Modify the admin and readonly users to have new usernames and passwords.



375
376
377
378
379
380
381
382
383
384
# File 'lib/webhookdb/organization.rb', line 375

def roll_database_credentials
  self.db.transaction do
    self.lock!
    builder = Webhookdb::Organization::DbBuilder.new(self)
    builder.roll_connection_credentials
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#single_db_user?Boolean

In cases where the readonly and admin user are the same, we sometimes adapt queries to prevent revoking admin db priviliges.

Returns:

  • (Boolean)


224
225
226
# File 'lib/webhookdb/organization.rb', line 224

def single_db_user?
  return self.admin_user == self.readonly_user
end

#validateObject

:section: Validations



492
493
494
495
496
497
498
# File 'lib/webhookdb/organization.rb', line 492

def validate
  super
  validates_all_or_none(:admin_connection_url_raw, :readonly_connection_url_raw, predicate: :present?)
  validates_format(/^\D/, :name, message: "can't begin with a digit")
  validates_format(/^[a-z][a-z0-9_]*$/, :key, message: "is not valid as a CNAME")
  validates_max_length 63, :key, message: "is not valid as a CNAME"
end