Class: Webhookdb::Organization

Inherits:
Object
  • Object
show all
Includes:
Admin::Linked
Defined in:
lib/webhookdb/organization.rb

Defined Under Namespace

Classes: Alerting, DatabaseMigration, DbBuilder, QueryResult, SchemaMigrationError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Admin::Linked

#admin_link

Class Method Details

.create_if_unique(params) ⇒ Object



104
105
106
107
108
109
110
# File 'lib/webhookdb/organization.rb', line 104

def self.create_if_unique(params)
  self.db.transaction(savepoint: true) do
    return Webhookdb::Organization.create(name: params[:name])
  end
rescue Sequel::UniqueConstraintViolation
  return nil
end

.enqueue_migrate_all_replication_tablesObject

As part of the release process, we enqueue a job that will migrate the replication schemas for all organizations. However this job must use the NEW code being released; it should not use the CURRENT code the workers may be using when this method is run during the release process.

We can get around this by enqueing the jobs with the ‘target’ release creation date. Only jobs that execute with this release creation date will perform the migration; if the job is running using an older release creation date (ie still running old code), it will re-enqueue the migration to run in the future, using a worker that will eventually be using newer code.

For example:

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • The workers, using Release A code (with a release_created_at of 0), run the ReplicationMigration job. They see the target release_created_at of 1 is greater than/after the current release_created_at of 0, so re-enqueue the job.

  • Eventually the workers are using Release B code, which has a release_created_at of 1. This matches the target, so the job is run.

For a more complex example, which involves releases created in quick succession (we need to be careful to avoid jobs that never run):

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • Release C, created at 2, runs this method.

  • Workers are backed up, so nothing is processed until all workers are using Release C.

  • Workers using Release C code process two sets of jobs:

    • Jobs with a target release_created_at of 1

    • Jobs with a target release_created_at of 2

  • Jobs with a target of 2 run the actual migration, because the times match.

  • Jobs with a target of 1, see that the target is less than/before current release_created_at of 2. This indicates the migration is stale, and the job is discarded.

NOTE: There will always be a race condition where we may process webhooks using the new code, before we’ve migrated the replication schemas into the new code. This will error during the upsert because the column doesn’t yet exist. However these will be retried automatically, and quickly, so we don’t worry about them yet.



326
327
328
329
330
# File 'lib/webhookdb/organization.rb', line 326

def self.enqueue_migrate_all_replication_tables
  Webhookdb::Organization.each do |org|
    Webhookdb::Jobs::ReplicationMigration.perform_in(2, org.id, Webhookdb::RELEASE_CREATED_AT)
  end
end

Instance Method Details

#active_subscription?Boolean

SUBSCRIPTION PERMISSIONS

Returns:

  • (Boolean)


472
473
474
475
476
477
478
# File 'lib/webhookdb/organization.rb', line 472

def active_subscription?
  subscription = Webhookdb::Subscription[stripe_customer_id: self.stripe_customer_id]
  # return false if no subscription
  return false if subscription.nil?
  # otherwise check stripe subscription string
  return ["trialing", "active", "past due"].include? subscription.status
end

#add_membership(opts = {}) ⇒ Object

:section: Memberships



451
452
453
454
455
456
457
# File 'lib/webhookdb/organization.rb', line 451

def add_membership(opts={})
  if !opts.is_a?(Webhookdb::OrganizationMembership) && !opts.key?(:verified)
    raise ArgumentError, "must pass :verified or a model into add_membership, it is ambiguous otherwise"
  end
  self.associations.delete(opts[:verified] ? :verified_memberships : :invited_memberships)
  return self.add_all_membership(opts)
end

#admin_connection(**kw) ⇒ Object



128
129
130
# File 'lib/webhookdb/organization.rb', line 128

def admin_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.admin_connection_url_raw, **kw, &)
end

#admin_connection_urlObject

Return the admin connection url, with the host set to public_host if set.



189
190
191
# File 'lib/webhookdb/organization.rb', line 189

def admin_connection_url
  return self._public_host_connection_url(self.admin_connection_url_raw)
end

#admin_customersObject



112
113
114
# File 'lib/webhookdb/organization.rb', line 112

def admin_customers
  return self.verified_memberships.filter(&:admin?).map(&:customer)
end

#admin_userObject



214
215
216
217
# File 'lib/webhookdb/organization.rb', line 214

def admin_user
  ur = URI(self.admin_connection_url)
  return ur.user
end

#alertingObject



116
117
118
# File 'lib/webhookdb/organization.rb', line 116

def alerting
  return @alerting ||= Alerting.new(self)
end

#available_replicatorsObject



488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/webhookdb/organization.rb', line 488

def available_replicators
  available = Webhookdb::Replicator.registry.values.filter do |desc|
    # The org must have any of the flags required for the service. In other words,
    # the intersection of desc[:feature_roles] & org.feature_roles must
    # not be empty
    no_restrictions = desc.feature_roles.empty?
    next true if no_restrictions
    org_has_access = (self.feature_roles.map(&:name) & desc.feature_roles).present?
    org_has_access
  end
  return available
end

#before_validationObject



97
98
99
100
101
102
# File 'lib/webhookdb/organization.rb', line 97

def before_validation
  self.minimum_sync_seconds ||= Webhookdb::SyncTarget.default_min_period_seconds
  self.key ||= Webhookdb.to_slug(self.name)
  self.replication_schema ||= Webhookdb::Organization::DbBuilder.new(self).default_replication_schema
  super
end

#can_add_new_integration?Boolean

Returns:

  • (Boolean)


480
481
482
483
484
485
486
# File 'lib/webhookdb/organization.rb', line 480

def can_add_new_integration?
  # if the sint's organization has an active subscription, return true
  return true if self.active_subscription?
  # if there is no active subscription, check number of integrations against free tier max
  limit = Webhookdb::Subscription.max_free_integrations
  return Webhookdb::ServiceIntegration.where(organization: self).count < limit
end

#cli_editable_fieldsObject



120
121
122
# File 'lib/webhookdb/organization.rb', line 120

def cli_editable_fields
  return ["name", "billing_email"]
end

#close(confirm:) ⇒ Object



459
460
461
462
463
464
465
466
467
468
# File 'lib/webhookdb/organization.rb', line 459

def close(confirm:)
  raise Webhookdb::InvalidPrecondition, "confirm must be true to close the org" unless confirm
  unless self.service_integrations_dataset.empty?
    msg = "Organization[#{self.key} cannot close with active service integrations"
    raise Webhookdb::InvalidPrecondition, msg
  end
  memberships = self.all_memberships_dataset.all.each(&:destroy)
  self.destroy
  return [self, memberships]
end

#create_public_host_cname(safe: false) ⇒ Object

Create a CNAME in Cloudflare for the currently configured connection urls.

Parameters:

  • safe (*) (defaults to: false)

    If true, noop if the public host is set.



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/webhookdb/organization.rb', line 257

def create_public_host_cname(safe: false)
  self.db.transaction do
    self.lock!
    # We must have a host to create a CNAME to.
    raise Webhookdb::InvalidPrecondition, "connection urls must be set" if self.readonly_connection_url_raw.blank?
    # Should only be used once when creating the org DBs.
    if self.public_host.present?
      return if safe
      raise Webhookdb::InvalidPrecondition, "public_host must not be set"
    end
    # Use the raw URL, even though we know at this point
    # public_host is empty so raw and public host urls are the same.
    Webhookdb::Organization::DbBuilder.new(self).create_public_host_cname(self.readonly_connection_url_raw)
    self.save_changes
  end
end

#dbnameObject



208
209
210
211
212
# File 'lib/webhookdb/organization.rb', line 208

def dbname
  raise Webhookdb::InvalidPrecondition, "no db has been created, call prepare_database_connections first" if
    self.admin_connection_url.blank?
  return URI(self.admin_connection_url).path.tr("/", "")
end

#display_stringObject



230
231
232
# File 'lib/webhookdb/organization.rb', line 230

def display_string
  return "#{self.name} (#{self.key})"
end

#execute_readonly_query(sql) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/webhookdb/organization.rb', line 132

def execute_readonly_query(sql)
  max_rows = self.max_query_rows || self.class.max_query_rows
  return self.readonly_connection do |conn|
    ds = conn.fetch(sql)
    r = QueryResult.new
    r.max_rows_reached = false
    r.columns = ds.columns
    r.rows = []
    # Stream to avoid pulling in all rows of unlimited queries
    ds.stream.each do |row|
      if r.rows.length >= max_rows
        r.max_rows_reached = true
        break
      end
      r.rows << row.values
    end
    return r
  end
end

#execute_readonly_query_with_help(sql) ⇒ Array<Webhookdb::Organization::QueryResult,String,nil>

Run the given SQL inside the org, and use special error handling if it fails.

Returns:

  • (Array<Webhookdb::Organization::QueryResult,String,nil>)

    Tuple of query result, and optional message. On query success, return <QueryResult, nil>. On DatabaseError, return <nil, message>. On other types of errors, raise.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/webhookdb/organization.rb', line 157

def execute_readonly_query_with_help(sql)
  result = self.execute_readonly_query(sql)
  return result, nil
rescue Sequel::DatabaseError => e
  self.logger.error("db_query_database_error", error: e)
  # We want to handle InsufficientPrivileges and UndefinedTable explicitly
  # since we can hint the user at what to do.
  # Otherwise, we should just return the Postgres exception.
  msg = ""
  case e.wrapped_exception
    when PG::UndefinedTable
      missing_table = e.wrapped_exception.message.match(/relation (.+) does not/)&.captures&.first
      msg = "The table #{missing_table} does not exist. Run `webhookdb db tables` to see available tables." if
        missing_table
    when PG::InsufficientPrivilege
      msg = "You do not have permission to perform this query. Queries must be read-only."
    else
      msg = e.wrapped_exception.message
  end
  return [nil, msg]
end

#get_stripe_billing_portal_urlObject



416
417
418
419
420
421
422
423
424
425
426
# File 'lib/webhookdb/organization.rb', line 416

def get_stripe_billing_portal_url
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::BillingPortal::Session.create(
    {
      customer: self.stripe_customer_id,
      return_url: Webhookdb.app_url + "/jump/portal-return",
    },
  )

  return session.url
end

#get_stripe_checkout_url(price_id) ⇒ Object



428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# File 'lib/webhookdb/organization.rb', line 428

def get_stripe_checkout_url(price_id)
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::Checkout::Session.create(
    {
      customer: self.stripe_customer_id,
      cancel_url: Webhookdb.app_url + "/jump/checkout-cancel",
      line_items: [{
        price: price_id, quantity: 1,
      }],
      mode: "subscription",
      payment_method_types: ["card"],
      allow_promotion_codes: true,
      success_url: Webhookdb.app_url + "/jump/checkout-success",
    },
  )

  return session.url
end

#log_tagsObject



90
91
92
93
94
95
# File 'lib/webhookdb/organization.rb', line 90

def log_tags
  return {
    organization_id: self.id,
    organization_key: self.key,
  }
end

#migrate_replication_schema(schema) ⇒ Object



388
389
390
391
392
393
394
395
396
397
398
# File 'lib/webhookdb/organization.rb', line 388

def migrate_replication_schema(schema)
  Webhookdb::DBAdapter.validate_identifier!(schema, type: "schema")
  Webhookdb::Organization::DatabaseMigration.guard_ongoing!(self)
  raise SchemaMigrationError, "destination and target schema are the same" if schema == self.replication_schema
  builder = Webhookdb::Organization::DbBuilder.new(self)
  sql = builder.migration_replication_schema_sql(self.replication_schema, schema)
  self.admin_connection(transaction: true) do |db|
    db << sql
  end
  self.update(replication_schema: schema)
end

#migrate_replication_tablesObject

Get all the table names and column names for all integrations in the org Find any of those table/column pairs that are not present in information_schema.columns Ensure all columns for those integrations/tables.



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/webhookdb/organization.rb', line 335

def migrate_replication_tables
  tables = self.service_integrations.map(&:table_name)
  sequences_in_app_db = self.db[Sequel[:information_schema][:sequences]].
    grep(:sequence_name, "replicator_seq_org_#{self.id}_%").
    select_map(:sequence_name).
    to_set
  cols_in_org_db = {}
  indices_in_org_db = Set.new
  self.admin_connection do |db|
    cols_in_org_db = db[Sequel[:information_schema][:columns]].
      where(table_schema: self.replication_schema, table_name: tables).
      select(
        :table_name,
        Sequel.function(:array_agg, :column_name).cast("text[]").as(:columns),
      ).
      group_by(:table_name).
      all.
      to_h { |c| [c[:table_name], c[:columns]] }
    indices_in_org_db = db[Sequel[:pg_indexes]].
      where(schemaname: self.replication_schema, tablename: tables).
      select_map(:indexname).
      to_set
  end

  self.service_integrations.each do |sint|
    svc = sint.replicator
    existing_columns = cols_in_org_db.fetch(sint.table_name) { [] }
    cols_for_sint = svc.storable_columns.map { |c| c.name.to_s }
    all_sint_cols_exist = (cols_for_sint - existing_columns).empty?

    all_indices_exist = svc.indices(svc.dbadapter_table).all? do |ind|
      indices_in_org_db.include?(ind.name.to_s)
    end

    svc.ensure_all_columns unless all_sint_cols_exist && all_indices_exist
    if svc.requires_sequence? && !sequences_in_app_db.include?(sint.sequence_name)
      sint.ensure_sequence(skip_check: true)
    end
  end
end

#prepare_database_connections(safe: false) ⇒ Object

Build the org-specific users, database, and set our connection URLs to it.

Parameters:

  • safe (*) (defaults to: false)

    If true, noop if connection urls are set.



240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/webhookdb/organization.rb', line 240

def prepare_database_connections(safe: false)
  self.db.transaction do
    self.lock!
    if self.admin_connection_url.present?
      return if safe
      raise Webhookdb::InvalidPrecondition, "connections already set"
    end
    builder = Webhookdb::Organization::DbBuilder.new(self)
    builder.prepare_database_connections
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#prepare_database_connections?Boolean

Returns:

  • (Boolean)


234
235
236
# File 'lib/webhookdb/organization.rb', line 234

def prepare_database_connections?
  return self.prepare_database_connections(safe: true)
end

#readonly_connection(**kw) ⇒ Object



124
125
126
# File 'lib/webhookdb/organization.rb', line 124

def readonly_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.readonly_connection_url_raw, **kw, &)
end

#readonly_connection_urlObject

Return the readonly connection url, with the host set to public_host if set.



184
185
186
# File 'lib/webhookdb/organization.rb', line 184

def readonly_connection_url
  return self._public_host_connection_url(self.readonly_connection_url_raw)
end

#readonly_userObject



219
220
221
222
# File 'lib/webhookdb/organization.rb', line 219

def readonly_user
  ur = URI(self.readonly_connection_url)
  return ur.user
end

#register_in_stripeObject



400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/webhookdb/organization.rb', line 400

def register_in_stripe
  raise Webhookdb::InvalidPrecondition, "org already in Stripe" if self.stripe_customer_id.present?
  stripe_customer = Stripe::Customer.create(
    {
      name: self.name,
      email: self.billing_email,
      metadata: {
        org_id: self.id,
      },
    },
  )
  self.stripe_customer_id = stripe_customer.id
  self.save_changes
  return stripe_customer
end

Delete the org-specific database and remove the org connection strings. Use this when an org is to be deleted (either for real, or in test teardown).



276
277
278
279
280
281
282
283
284
# File 'lib/webhookdb/organization.rb', line 276

def remove_related_database
  self.db.transaction do
    self.lock!
    Webhookdb::Organization::DbBuilder.new(self).remove_related_database
    self.admin_connection_url_raw = ""
    self.readonly_connection_url_raw = ""
    self.save_changes
  end
end

#roll_database_credentialsObject

Modify the admin and readonly users to have new usernames and passwords.



377
378
379
380
381
382
383
384
385
386
# File 'lib/webhookdb/organization.rb', line 377

def roll_database_credentials
  self.db.transaction do
    self.lock!
    builder = Webhookdb::Organization::DbBuilder.new(self)
    builder.roll_connection_credentials
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#single_db_user?Boolean

In cases where the readonly and admin user are the same, we sometimes adapt queries to prevent revoking admin db priviliges.

Returns:

  • (Boolean)


226
227
228
# File 'lib/webhookdb/organization.rb', line 226

def single_db_user?
  return self.admin_user == self.readonly_user
end

#validateObject

:section: Validations



505
506
507
508
509
510
511
# File 'lib/webhookdb/organization.rb', line 505

def validate
  super
  validates_all_or_none(:admin_connection_url_raw, :readonly_connection_url_raw, predicate: :present?)
  validates_format(/^\D/, :name, message: "can't begin with a digit")
  validates_format(/^[a-z][a-z0-9_]*$/, :key, message: "is not valid as a CNAME")
  validates_max_length 63, :key, message: "is not valid as a CNAME"
end