Class: Webhookdb::Replicator::Base

Inherits:
Object
  • Object
show all
Includes:
Appydays::Loggable, DBAdapter::ColumnTypes
Defined in:
lib/webhookdb/replicator/base.rb

Defined Under Namespace

Classes: CredentialVerificationResult, ServiceBackfiller

Constant Summary collapse

MAX_INDEX_NAME_LENGTH =
63

Constants included from DBAdapter::ColumnTypes

DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(service_integration) ⇒ Base

Returns a new instance of Base.



31
32
33
# File 'lib/webhookdb/replicator/base.rb', line 31

def initialize(service_integration)
  @service_integration = service_integration
end

Instance Attribute Details

#service_integrationWebhookdb::ServiceIntegration (readonly)



29
30
31
# File 'lib/webhookdb/replicator/base.rb', line 29

def service_integration
  @service_integration
end

Class Method Details

.chunked_row_update_bounds(max_pk, chunk_size: 1_000_000) ⇒ Object

Return an array of tuples used for splitting UPDATE queries so locks are not held on the entire table when backfilling values when adding new columns. See ensure_all_columns_modification.

The returned chunks are like: [[0, 100], [100, 200], [200]], and meant to be used like ‘0 < pk <= 100`, `100 < pk <= 200`, `p, > 200`.

Note that final value in the array is a single item, used like ‘pk > chunks[0]`.



594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
# File 'lib/webhookdb/replicator/base.rb', line 594

def self.chunked_row_update_bounds(max_pk, chunk_size: 1_000_000)
  result = []
  chunk_lower_pk = 0
  chunk_upper_pk = chunk_size
  while chunk_upper_pk <= max_pk
    # Get chunks like 0 < pk <= 100, 100 < pk <= 200, etc
    # Each loop we increment one row chunk size, until we find the chunk containing our max PK.
    # Ie if row chunk size is 100, and max_pk is 450, the final chunk here is 400-500.
    result << [chunk_lower_pk, chunk_upper_pk]
    chunk_lower_pk += chunk_size
    chunk_upper_pk += chunk_size
  end
  # Finally, one final chunk for all rows greater than our biggest chunk.
  # For example, with a row chunk size of 100, and max_pk of 450, we got a final chunk of 400-500.
  # But we could have gotten 100 writes (with a new max pk of 550), so this 'pk > 500' catches those.
  result << [chunk_lower_pk]
end

.descriptorWebhookdb::Replicator::Descriptor

This method is abstract.

Return the descriptor for this service.

Returns:

Raises:

  • (NotImplementedError)


24
25
26
# File 'lib/webhookdb/replicator/base.rb', line 24

def self.descriptor
  raise NotImplementedError, "#{self.class}: must return a descriptor that is used for registration purposes"
end

Instance Method Details

#_any_subscriptions_to_notify?Boolean

Returns:

  • (Boolean)


722
723
724
# File 'lib/webhookdb/replicator/base.rb', line 722

def _any_subscriptions_to_notify?
  return !self.service_integration.all_webhook_subscriptions_dataset.to_notify.empty?
end

#_backfill_state_change_fieldsObject

If we support backfilling, these keys are used for them. Override if other fields are used instead. There cannot be overlap between these and the webhook state change fields.



145
# File 'lib/webhookdb/replicator/base.rb', line 145

def _backfill_state_change_fields = ["backfill_key", "backfill_secret", "api_url"]

#_backfillersArray<Webhookdb::Backfiller>

Return backfillers for the replicator. We must use an array for ‘data-based’ backfillers, like when we need to paginate for each row in another table.

By default, return a ServiceBackfiller, which will call _fetch_backfill_page on the receiver.

Returns:



1091
1092
1093
# File 'lib/webhookdb/replicator/base.rb', line 1091

def _backfillers
  return [ServiceBackfiller.new(self)]
end

#_clear_backfill_informationObject



314
315
316
# File 'lib/webhookdb/replicator/base.rb', line 314

def _clear_backfill_information
  self.service_integration.set(api_url: "", backfill_key: "", backfill_secret: "")
end

#_clear_webook_informationObject



301
302
303
# File 'lib/webhookdb/replicator/base.rb', line 301

def _clear_webook_information
  self.service_integration.set(webhook_secret: "")
end

#_coalesce_excluded_on_update(update, column_names) ⇒ Object

Have a column set itself only on insert or if nil.

Given the payload to DO UPDATE, mutate it so that the column names included in ‘column_names’ use what is already in the table, and fall back to what’s being inserted. This new payload should be passed to the ‘update` kwarg of `insert_conflict`:

ds.insert_conflict(update: self._coalesce_excluded_on_update(payload, :created_at)).insert(payload)

Parameters:

  • update (Hash)
  • column_names (Array<Symbol>)


878
879
880
881
882
883
# File 'lib/webhookdb/replicator/base.rb', line 878

def _coalesce_excluded_on_update(update, column_names)
  # Now replace just the specific columns we're overriding.
  column_names.each do |c|
    update[c] = Sequel.function(:coalesce, self.qualified_table_sequel_identifier[c], Sequel[:excluded][c])
  end
end

#_denormalized_columnsArray<Webhookdb::Replicator::Column]

When an integration needs denormalized columns, specify them here. Indices are created for each column. Modifiers can be used if columns should have a default or whatever. See Webhookdb::Replicator::Column for more details about column fields.

Returns:



480
481
482
# File 'lib/webhookdb/replicator/base.rb', line 480

def _denormalized_columns
  return []
end

#_enqueue_backfill_jobs(incremental:, criteria: nil, recursive: true, enqueue: true) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/webhookdb/replicator/base.rb', line 215

def _enqueue_backfill_jobs(incremental:, criteria: nil, recursive: true, enqueue: true)
  m = recursive ? :create_recursive : :create
  j = Webhookdb::BackfillJob.send(
    m,
    service_integration:,
    incremental:,
    criteria: criteria || {},
    created_by: Webhookdb.request_user_and_admin[0],
  )
  j.enqueue if enqueue
  return j
end

#_extra_index_specsArray<Webhook::Replicator::IndexSpec>

Names of columns for multi-column indices. Each one must be in denormalized_columns.

Returns:

  • (Array<Webhook::Replicator::IndexSpec>)


432
433
434
# File 'lib/webhookdb/replicator/base.rb', line 432

def _extra_index_specs
  return []
end

#_fetch_enrichment(resource, event, request) ⇒ *

Given the resource that is going to be inserted and an optional event, make an API call to enrich it with further data if needed. The result of this is passed to _prepare_for_insert.

Parameters:

Returns:

  • (*)


759
760
761
# File 'lib/webhookdb/replicator/base.rb', line 759

def _fetch_enrichment(resource, event, request)
  return nil
end

#_find_dependency_candidate(value) ⇒ Object

Parameters:

  • value (String)

Raises:



229
230
231
232
233
234
235
236
237
# File 'lib/webhookdb/replicator/base.rb', line 229

def _find_dependency_candidate(value)
  int_val = value.strip.blank? ? 1 : value.to_i
  idx = int_val - 1
  dep_candidates = self.service_integration.dependency_candidates
  raise Webhookdb::InvalidPrecondition, "no dependency candidates" if dep_candidates.empty?
  raise Webhookdb::InvalidInput, "'#{value}' is not a valid dependency" if
    idx.negative? || idx >= dep_candidates.length
  return dep_candidates[idx]
end

#_notify_dependents(inserting, changed) ⇒ Object

Parameters:

  • changed (Boolean)


716
717
718
719
720
# File 'lib/webhookdb/replicator/base.rb', line 716

def _notify_dependents(inserting, changed)
  self.service_integration.dependents.each do |d|
    d.replicator.on_dependency_webhook_upsert(self, inserting, changed:)
  end
end

#_parallel_backfillObject

If this replicator supports backfilling in parallel (running multiple backfillers at a time), return the degree of paralellism (or nil if not running in parallel). We leave parallelism up to the replicator, not CPU count, since most work involves waiting on APIs to return.

NOTE: These threads are in addition to any worker threads, so it’s important to pay attention to memory use.



1079
1080
1081
# File 'lib/webhookdb/replicator/base.rb', line 1079

def _parallel_backfill
  return nil
end

#_prepare_for_insert(resource, event, request, enrichment) ⇒ Hash

Return the hash that should be inserted into the database, based on the denormalized columns and data given.

Parameters:

Returns:

  • (Hash)


811
812
813
814
815
816
817
818
# File 'lib/webhookdb/replicator/base.rb', line 811

def _prepare_for_insert(resource, event, request, enrichment)
  h = [self._remote_key_column].concat(self._denormalized_columns).each_with_object({}) do |col, memo|
    value = col.to_ruby_value(resource:, event:, enrichment:, service_integration:)
    skip = value.nil? && col.skip_nil?
    memo[col.name] = value unless skip
  end
  return h
end

#_publish_rowupsert(row, check_for_subscriptions: true) ⇒ Object



726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
# File 'lib/webhookdb/replicator/base.rb', line 726

def _publish_rowupsert(row, check_for_subscriptions: true)
  return unless check_for_subscriptions && self._any_subscriptions_to_notify?
  payload = [
    self.service_integration.id,
    {
      row:,
      external_id_column: self._remote_key_column.name,
      external_id: row[self._remote_key_column.name],
    },
  ]
  # We AVOID pubsub here because we do NOT want to go through the router
  # and audit logger for this.
  event = Amigo::Event.create("webhookdb.serviceintegration.rowupsert", payload.as_json)
  Webhookdb::Jobs::SendWebhook.perform_async(event.as_json)
end

#_remote_key_columnWebhookdb::Replicator::Column

This method is abstract.

Each integration needs a single remote key, like the Shopify order id for shopify orders, or sid for Twilio resources. This column must be unique for the table, like a primary key.

Returns:

Raises:

  • (NotImplementedError)


470
471
472
# File 'lib/webhookdb/replicator/base.rb', line 470

def _remote_key_column
  raise NotImplementedError
end

#_resource_and_event(request) ⇒ Array<Hash>?

This method is abstract.

Given a webhook/backfill item payload, return the resource hash, and an optional event hash. If ‘body’ is the resource itself, this method returns [body, nil]. If ‘body’ is an event, this method returns [body.resource-key, body]. Columns can check for whether there is an event and/or body when converting.

If this returns nil, the upsert is skipped.

For example, a Stripe customer backfill upsert would be ‘’cus_123’‘ when we backfill, but `’event’, data: {id: ‘cus_123’}‘ when handling an event.

Parameters:

Returns:

  • (Array<Hash>, nil)

Raises:

  • (NotImplementedError)


800
801
802
# File 'lib/webhookdb/replicator/base.rb', line 800

def _resource_and_event(request)
  raise NotImplementedError
end

#_resource_to_data(resource, event, request, enrichment) ⇒ Hash

Given the resource, return the value for the :data column. Only needed in rare situations where fields should be stored on the row, but not in :data. To skip :data column updates, return nil.

Parameters:

Returns:

  • (Hash)


829
830
831
# File 'lib/webhookdb/replicator/base.rb', line 829

def _resource_to_data(resource, event, request, enrichment)
  return resource
end

#_store_enrichment_body?Boolean

Use this to determine whether we should add an enrichment column in the create table modification to store the enrichment body.

Returns:

  • (Boolean)


338
339
340
# File 'lib/webhookdb/replicator/base.rb', line 338

def _store_enrichment_body?
  return false
end

#_timestamp_column_nameSymbol

This method is abstract.

The name of the timestamp column in the schema. This column is used primarily for conditional upserts (ie to know if a row has changed), but also as a general way of auditing changes.

Returns:

  • (Symbol)

Raises:

  • (NotImplementedError)


461
462
463
# File 'lib/webhookdb/replicator/base.rb', line 461

def _timestamp_column_name
  raise NotImplementedError
end

#_to_json(v) ⇒ Object

The NULL ASCII character (u0000), when present in a string (“u0000”), and then encoded into JSON (“\u0000”) is invalid in PG JSONB- its strings cannot contain NULLs (note that JSONB does not store the encoded string verbatim, it parses it into PG types, and a PG string cannot contain NULL since C strings are NULL-terminated).

So we remove the “\u0000” character from encoded JSON- for example, in the hash “u0000”, if we #to_json, we end up with ‘“x”:“\u0000”’. The removal of encoded NULL gives us ‘“x”:“”’.

HOWEVER, if the encoded null is itself escaped, we MUST NOT remove it. For example, in the hash “u0000”.to_json.to_json (ie, a JSON string which contains another JSON string), we end up with ‘“x”:“\\u0000”`, That is, a string containing the escaped null character. This is valid for PG, because it’s not a NULL- it’s an escaped “", followed by ”u0000“. If we were to remove the string ”\u0000“, we’d end up with ‘”x“:”\“’. This creates an invalid document.

So we remove only “\u0000” by not replacing “\\u0000”- replace all occurences of “<any one character except backslash>\u0000” with “<character before backslash>”.



711
712
713
# File 'lib/webhookdb/replicator/base.rb', line 711

def _to_json(v)
  return v.to_json.gsub(/(\\\\u0000|\\u0000)/, {"\\\\u0000" => "\\\\u0000", "\\u0000" => ""})
end

#_update_where_exprSequel::SQL::Expression

This method is abstract.

The argument for insert_conflict update_where clause. Used to conditionally update, like updating only if a row is newer than what’s stored. We must always have an ‘update where’ because we never want to overwrite with the same data as exists.

If an integration does not have any way to detect if a resource changed, it can compare data columns.

Examples:

With a meaningful timestmap

self.qualified_table_sequel_identifier[:updated_at] < Sequel[:excluded][:updated_at]

Without a meaingful timestamp

self.qualified_table_sequel_identifier[:data] !~ Sequel[:excluded][:data]

Returns:

  • (Sequel::SQL::Expression)

Raises:

  • (NotImplementedError)


779
780
781
# File 'lib/webhookdb/replicator/base.rb', line 779

def _update_where_expr
  raise NotImplementedError
end

#_upsert_update_expr(inserting, enrichment: nil) ⇒ Object

Given the hash that is passed to the Sequel insert (so contains all columns, including those from _prepare_for_insert), return the hash used for the insert_conflict(update:) keyword args.

Rather than sending over the literal values in the inserting statement (which is pretty verbose, like the large ‘data’ column), make a smaller statement by using ‘EXCLUDED’.

This can be overriden when the service requires different values for inserting vs. updating, such as when a column’s update value must use the EXCLUDED table in the upsert expression.

Most commonly, the use case for this is when you want to provide a row a value, but ONLY on insert, OR on update by ONLY if the column is nil. In that case, pass the result of this base method to _coalesce_excluded_on_update (see also for more details).

You can also use this method to merge :data columns together. For example: ‘super_result = Sequel.lit(“#Webhookdb::Replicator::Base.selfself.service_integrationself.service_integration.table_name.data || excluded.data”)`

By default, this will use the same values for UPDATE as are used for INSERT, like ‘email = EXCLUDED.email` (the ’EXCLUDED’ row being the one that failed to insert).



855
856
857
858
# File 'lib/webhookdb/replicator/base.rb', line 855

def _upsert_update_expr(inserting, enrichment: nil)
  result = inserting.each_with_object({}) { |(c, _), h| h[c] = Sequel[:excluded][c] }
  return result
end

#_upsert_webhook(request, upsert: true) ⇒ Object

Hook to be overridden, while still retaining top-level upsert_webhook functionality like error handling.

Parameters:

Raises:



667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
# File 'lib/webhookdb/replicator/base.rb', line 667

def _upsert_webhook(request, upsert: true)
  resource, event = self._resource_and_event(request)
  return nil if resource.nil?
  enrichment = self._fetch_enrichment(resource, event, request)
  prepared = self._prepare_for_insert(resource, event, request, enrichment)
  raise Webhookdb::InvalidPostcondition if prepared.key?(:data)
  inserting = {}
  data_col_val = self._resource_to_data(resource, event, request, enrichment)
  inserting[:data] = self._to_json(data_col_val)
  inserting[:enrichment] = self._to_json(enrichment) if self._store_enrichment_body?
  inserting.merge!(prepared)
  return inserting unless upsert
  remote_key_col = self._remote_key_column
  updating = self._upsert_update_expr(inserting, enrichment:)
  update_where = self._update_where_expr
  upserted_rows = self.admin_dataset(timeout: :fast) do |ds|
    ds.insert_conflict(
      target: remote_key_col.name,
      update: updating,
      update_where:,
    ).insert(inserting)
  end
  row_changed = upserted_rows.present?
  self._notify_dependents(inserting, row_changed)
  self._publish_rowupsert(inserting) if row_changed
  return inserting
end

#_verify_backfill_err_msgObject

Raises:

  • (NotImplementedError)


978
979
980
# File 'lib/webhookdb/replicator/base.rb', line 978

def _verify_backfill_err_msg
  raise NotImplementedError, "each integration must provide an error message for unanticipated errors"
end

#_webhook_response(request) ⇒ Webhookdb::WebhookResponse

This method is abstract.

Return a the response for the webhook. We must do this immediately in the endpoint itself, since verification may include info specific to the request content (like, it can be whitespace sensitive).

Parameters:

  • request (Rack::Request)

Returns:

Raises:

  • (NotImplementedError)


134
135
136
# File 'lib/webhookdb/replicator/base.rb', line 134

def _webhook_response(request)
  raise NotImplementedError
end

#_webhook_state_change_fieldsObject

If we support webhooks, these fields correspond to the webhook state machine. Override them if some other fields are also needed for webhooks.



140
# File 'lib/webhookdb/replicator/base.rb', line 140

def _webhook_state_change_fields = ["webhook_secret"]

#admin_dataset(**kw) ⇒ Sequel::Dataset

Yield to a dataset using the admin connection.

Returns:

  • (Sequel::Dataset)


887
888
889
# File 'lib/webhookdb/replicator/base.rb', line 887

def admin_dataset(**kw, &)
  self.with_dataset(self.service_integration.organization.admin_connection_url_raw, **kw, &)
end

#backfill(job) ⇒ Object

In order to backfill, we need to:

  • Iterate through pages of records from the external service

  • Upsert each record

The caveats/complexities are:

  • The backfill method should take care of retrying fetches for failed pages.

  • That means it needs to keep track of some pagination token.



991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
# File 'lib/webhookdb/replicator/base.rb', line 991

def backfill(job)
  raise Webhookdb::InvalidPrecondition, "job is for different service integration" unless
    job.service_integration === self.service_integration

  raise Webhookdb::InvariantViolation, "manual backfill not supported" unless self.descriptor.supports_backfill?

  sint = self.service_integration
  raise Webhookdb::Replicator::CredentialsMissing if
    sint.backfill_key.blank? && sint.backfill_secret.blank? && sint.depends_on.blank?
  last_backfilled = job.incremental? ? sint.last_backfilled_at : nil
  new_last_backfilled = Time.now
  job.update(started_at: Time.now)

  backfillers = self._backfillers(**job.criteria.symbolize_keys)
  begin
    if self._parallel_backfill && self._parallel_backfill > 1
      _do_parallel_backfill(backfillers, last_backfilled)
    else
      _do_serial_backfill(backfillers, last_backfilled)
    end
  rescue StandardError => e
    if self.on_backfill_error(e) == true
      job.update(finished_at: Time.now)
      return
    end
    raise e
  end

  sint.update(last_backfilled_at: new_last_backfilled) if job.incremental?
  job.update(finished_at: Time.now)
  job.enqueue_children
end

#backfill_not_supported_messageObject

When backfilling is not supported, this message is used. It can be overridden for custom explanations, or descriptor#documentation_url can be provided, which will use a default message. If no documentation is available, a fallback message is used.



277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/webhookdb/replicator/base.rb', line 277

def backfill_not_supported_message
  du = self.documentation_url
  if du.blank?
    msg = %(Sorry, you cannot backfill this integration. You may be looking for one of the following:

webhookdb integrations reset #{self.service_integration.table_name}
    )
    return msg
  end
  msg = %(Sorry, you cannot manually backfill this integration.
Please refer to the documentation at #{du}
for information on how to refresh data.)
  return msg
end

#calculate_and_backfill_state_machine(incremental:, criteria: nil, recursive: true, enqueue: true) ⇒ Array<Webhookdb::StateMachineStep, Webhookdb::BackfillJob>

Run calculate_backfill_state_machine. Then create and enqueue a new BackfillJob if it’s successful. Returns a tuple of the StateMachineStep and BackfillJob. If the BackfillJob is returned, the StateMachineStep was successful; otherwise no job is created and the second item is nil.

Returns:



265
266
267
268
269
270
# File 'lib/webhookdb/replicator/base.rb', line 265

def calculate_and_backfill_state_machine(incremental:, criteria: nil, recursive: true, enqueue: true)
  step = self.calculate_backfill_state_machine
  bfjob = nil
  bfjob = self._enqueue_backfill_jobs(incremental:, criteria:, recursive:, enqueue:) if step.successful?
  return step, bfjob
end

#calculate_backfill_state_machineWebhookdb::Replicator::StateMachineStep

Return the state machine that is used when adding backfill support to an integration. Usually this sets one or both of the backfill key and secret.

Returns:

Raises:

  • (NotImplementedError)


254
255
256
257
# File 'lib/webhookdb/replicator/base.rb', line 254

def calculate_backfill_state_machine
  # This is a pure function that can be tested on its own--the endpoints just need to return a state machine step
  raise NotImplementedError
end

#calculate_dependency_state_machine_step(dependency_help:) ⇒ Object



1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
# File 'lib/webhookdb/replicator/base.rb', line 1151

def calculate_dependency_state_machine_step(dependency_help:)
  raise Webhookdb::InvalidPrecondition, "#{self.descriptor.name} does not have a dependency" if
    self.class.descriptor.dependency_descriptor.nil?
  return nil if self.service_integration.depends_on_id
  step = Webhookdb::Replicator::StateMachineStep.new
  dep_descr = self.descriptor.dependency_descriptor
  candidates = self.service_integration.dependency_candidates
  if candidates.empty?
    step.output = %(This integration requires #{dep_descr.resource_name_plural} to sync.

You don't have any #{dep_descr.resource_name_singular} integrations yet. You can run:

webhookdb integrations create #{dep_descr.name}

to set one up. Then once that's complete, you can re-run:

webhookdb integrations create #{self.descriptor.name}

to keep going.
)
    step.error_code = "no_candidate_dependency"
    return step.completed
  end
  choice_lines = candidates.each_with_index.
    map { |si, idx| "#{idx + 1} - #{si.table_name}" }.
    join("\n")
  step.output = %(This integration requires #{dep_descr.resource_name_plural} to sync.
#{dependency_help.blank? ? '' : "\n#{dependency_help}\n"}
Enter the number for the #{dep_descr.resource_name_singular} integration you want to use,
or leave blank to choose the first option.

#{choice_lines}
)
  step.prompting("Parent integration number")
  step.post_to_url = self.service_integration.authed_api_path + "/transition/dependency_choice"
  return step
end

#calculate_preferred_create_state_machineWebhookdb::Replicator::StateMachineStep

See preferred_create_state_machine_method. If we prefer backfilling, and it’s successful, we also want to enqueue jobs; that is, use calculate_and_backfill_state_machine, not just calculate_backfill_state_machine.



209
210
211
212
213
# File 'lib/webhookdb/replicator/base.rb', line 209

def calculate_preferred_create_state_machine
  m = self.preferred_create_state_machine_method
  return self.calculate_and_backfill_state_machine(incremental: true)[0] if m == :calculate_backfill_state_machine
  return self.calculate_webhook_state_machine
end

#calculate_webhook_state_machineWebhookdb::Replicator::StateMachineStep

This method is abstract.

Return the state machine that is used when setting up this integration. Usually this entails providing the user the webhook url, and providing or asking for a webhook secret. In some cases, this can be a lot more complex though.

Returns:

Raises:

  • (NotImplementedError)


246
247
248
# File 'lib/webhookdb/replicator/base.rb', line 246

def calculate_webhook_state_machine
  raise NotImplementedError
end

#clear_backfill_informationObject

Remove all the information needed for backfilling from the integration so that it can be re-entered



306
307
308
309
310
311
312
# File 'lib/webhookdb/replicator/base.rb', line 306

def clear_backfill_information
  self._clear_backfill_information
  # If we don't support both webhooks and backfilling, we are safe to clear ALL fields
  # and get back into an initial state.
  self._clear_webook_information unless self.descriptor.supports_webhooks_and_backfill?
  self.service_integration.save_changes
end

#clear_webhook_informationObject

Remove all the information used in the initial creation of the integration so that it can be re-entered



293
294
295
296
297
298
299
# File 'lib/webhookdb/replicator/base.rb', line 293

def clear_webhook_information
  self._clear_webook_information
  # If we don't support both webhooks and backfilling, we are safe to clear ALL fields
  # and get back into an initial state.
  self._clear_backfill_information unless self.descriptor.supports_webhooks_and_backfill?
  self.service_integration.save_changes
end

#create_table(if_not_exists: false) ⇒ Object



342
343
344
345
346
347
# File 'lib/webhookdb/replicator/base.rb', line 342

def create_table(if_not_exists: false)
  cmd = self.create_table_modification(if_not_exists:)
  self.admin_dataset(timeout: :fast) do |ds|
    cmd.execute(ds.db)
  end
end

#create_table_modification(if_not_exists: false) ⇒ Webhookdb::Replicator::SchemaModification

Return the schema modification used to create the table where it does nto exist.



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/webhookdb/replicator/base.rb', line 351

def create_table_modification(if_not_exists: false)
  table = self.dbadapter_table
  columns = [self.primary_key_column, self.remote_key_column]
  columns.concat(self.storable_columns)
  # 'data' column should be last, since it's very large, we want to see other columns in psql/pgcli first
  columns << self.data_column
  adapter = Webhookdb::DBAdapter::PG.new
  result = Webhookdb::Replicator::SchemaModification.new
  result.transaction_statements << adapter.create_table_sql(table, columns, if_not_exists:)
  self.indices(table).each do |dbindex|
    result.transaction_statements << adapter.create_index_sql(dbindex, concurrently: false)
  end
  result.application_database_statements << self.service_integration.ensure_sequence_sql if self.requires_sequence?
  return result
end

#data_columnWebhookdb::DBAdapter::Column



413
414
415
# File 'lib/webhookdb/replicator/base.rb', line 413

def data_column
  return Webhookdb::DBAdapter::Column.new(name: :data, type: OBJECT, nullable: false)
end

#dbadapter_tableWebhookdb::DBAdapter::Table

Return a DBAdapter table based on the schema_and_table_symbols.



99
100
101
102
103
104
# File 'lib/webhookdb/replicator/base.rb', line 99

def dbadapter_table
  sch, tbl = self.schema_and_table_symbols
  schema = Webhookdb::DBAdapter::Schema.new(name: sch)
  table = Webhookdb::DBAdapter::Table.new(name: tbl, schema:)
  return table
end

#denormalized_columnsArray<Webhookdb::DBAdapter::Column>

Returns:



425
426
427
# File 'lib/webhookdb/replicator/base.rb', line 425

def denormalized_columns
  return self._denormalized_columns.map(&:to_dbadapter)
end

#descriptorWebhookdb::Replicator::Descriptor



36
37
38
# File 'lib/webhookdb/replicator/base.rb', line 36

def descriptor
  return @descriptor ||= self.class.descriptor
end

#dispatch_request_to(request) ⇒ Webhookdb::Replicator::Base

A given HTTP request may not be handled by the service integration it was sent to, for example where the service integration is part of some ‘root’ hierarchy. This method is called in the webhook endpoint, and should return the replicator used to handle the webhook request. The request is validated by the returned instance, and it is enqueued for processing.

By default, the service called by the webhook is the one we want to use, so return self.

Parameters:

  • request (Rack::Request)

Returns:



635
636
637
# File 'lib/webhookdb/replicator/base.rb', line 635

def dispatch_request_to(request)
  return self
end

#documentation_urlObject



982
# File 'lib/webhookdb/replicator/base.rb', line 982

def documentation_url = nil

#enqueue_sync_targetsObject

Some replicators support ‘instant sync’, because they are upserted en-masse rather than row-by-row. That is, usually we run sync targets on a cron, because otherwise we’d need to run the sync target for every row. But if inserting is always done through backfilling, we know we have a useful set of results to sync, so don’t need to wait for cron.



932
933
934
935
936
# File 'lib/webhookdb/replicator/base.rb', line 932

def enqueue_sync_targets
  self.service_integration.sync_targets.each do |stgt|
    Webhookdb::Jobs::SyncTargetRunSync.perform_async(stgt.id)
  end
end

#enrichment_columnWebhookdb::DBAdapter::Column?

Column used to store enrichments. Return nil if the service does not use enrichments.



419
420
421
422
# File 'lib/webhookdb/replicator/base.rb', line 419

def enrichment_column
  return nil unless self._store_enrichment_body?
  return Webhookdb::DBAdapter::Column.new(name: :enrichment, type: OBJECT, nullable: true)
end

#ensure_all_columnsObject

We support adding columns to existing integrations without having to bump the version; changing types, or removing/renaming columns, is not supported and should bump the version or must be handled out-of-band (like deleting the integration then backfilling). To figure out what columns we need to add, we can check what are currently defined, check what exists, and add denormalized columns and indices for those that are missing.



509
510
511
512
513
514
515
516
517
518
519
# File 'lib/webhookdb/replicator/base.rb', line 509

def ensure_all_columns
  modification = self.ensure_all_columns_modification
  return if modification.noop?
  self.admin_dataset(timeout: :slow_schema) do |ds|
    modification.execute(ds.db)
    # We need to clear cached columns on the data since we know we're adding more.
    # It's probably not a huge deal but may as well keep it in sync.
    ds.send(:clear_columns_cache)
  end
  self.readonly_dataset { |ds| ds.send(:clear_columns_cache) }
end

#ensure_all_columns_modificationWebhookdb::Replicator::SchemaModification



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
# File 'lib/webhookdb/replicator/base.rb', line 522

def ensure_all_columns_modification
  existing_cols, existing_indices = nil
  max_pk = 0
  sint = self.service_integration
  self.admin_dataset do |ds|
    return self.create_table_modification unless ds.db.table_exists?(self.qualified_table_sequel_identifier)
    existing_cols = ds.columns.to_set
    existing_indices = ds.db[:pg_indexes].where(
      schemaname: sint.organization.replication_schema,
      tablename: sint.table_name,
    ).select_map(:indexname).to_set
    max_pk = ds.max(:pk) || 0
  end
  adapter = Webhookdb::DBAdapter::PG.new
  table = self.dbadapter_table
  result = Webhookdb::Replicator::SchemaModification.new

  missing_columns = self._denormalized_columns.delete_if { |c| existing_cols.include?(c.name) }
  # Add missing columns
  missing_columns.each do |whcol|
    # Don't bother bulking the ADDs into a single ALTER TABLE, it won't really matter.
    result.transaction_statements << adapter.add_column_sql(table, whcol.to_dbadapter)
  end
  # Easier to handle this explicitly than use storage_columns, but it a duplicated concept so be careful.
  if (enrich_col = self.enrichment_column) && !existing_cols.include?(enrich_col.name)
    result.transaction_statements << adapter.add_column_sql(table, enrich_col)
  end

  # Backfill values for new columns.
  if missing_columns.any?
    # We need to backfill values into the new column, but we don't want to lock the entire table
    # as we update each row. So we need to update in chunks of rows.
    # Chunk size should be large for speed (and sending over fewer queries), but small enough
    # to induce a viable delay if another query is updating the same row.
    # Note that the delay will only be for writes to those rows; reads will not block,
    # so something a bit longer should be ok.
    #
    # Note that at the point these UPDATEs are running, we have the new column AND the new code inserting
    # into that new column. We could in theory skip all the PKs that were added after this modification
    # started to run. However considering the number of rows in this window will always be relatively low
    # (though not absolutely low), and the SQL backfill operation should yield the same result
    # as the Ruby operation, this doesn't seem too important.
    result.nontransaction_statements.concat(missing_columns.filter_map(&:backfill_statement))
    update_expr = missing_columns.to_h { |c| [c.name, c.backfill_expr || c.to_sql_expr] }
    self.admin_dataset do |ds|
      chunks = Webhookdb::Replicator::Base.chunked_row_update_bounds(max_pk)
      chunks[...-1].each do |(lower, upper)|
        update_query = ds.where { pk > lower }.where { pk <= upper }.update_sql(update_expr)
        result.nontransaction_statements << update_query
      end
      final_update_query = ds.where { pk > chunks[-1][0] }.update_sql(update_expr)
      result.nontransaction_statements << final_update_query
    end
  end

  # Add missing indices. This should happen AFTER the UPDATE calls so the UPDATEs don't have to update indices.
  self.indices(table).map do |index|
    next if existing_indices.include?(index.name.to_s)
    result.nontransaction_statements << adapter.create_index_sql(index, concurrently: true)
  end

  result.application_database_statements << sint.ensure_sequence_sql if self.requires_sequence?
  return result
end

#find_dependent(service_name) ⇒ Webhookdb::ServiceIntegration?

Find a dependent service integration with the given service name. If none are found, return nil. If multiple are found, raise, as this should only be used for automatically managed integrations.



322
323
324
325
326
327
# File 'lib/webhookdb/replicator/base.rb', line 322

def find_dependent(service_name)
  sints = self.service_integration.dependents.filter { |si| si.service_name == service_name }
  raise Webhookdb::InvalidPrecondition, "there are multiple #{service_name} integrations in dependents" if
    sints.length > 1
  return sints.first
end

#find_dependent!(service_name) ⇒ Webhookdb::ServiceIntegration



330
331
332
333
334
# File 'lib/webhookdb/replicator/base.rb', line 330

def find_dependent!(service_name)
  sint = self.find_dependent(service_name)
  raise Webhookdb::InvalidPrecondition, "there is no #{service_name} integration in dependents" if sint.nil?
  return sint
end

#indices(table) ⇒ Array<Webhookdb::DBAdapter::Index>

Returns:



485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# File 'lib/webhookdb/replicator/base.rb', line 485

def indices(table)
  dba_columns = [self.primary_key_column, self.remote_key_column]
  dba_columns.concat(self.storable_columns)
  dba_cols_by_name = dba_columns.index_by(&:name)

  result = []
  dba_columns.select(&:index?).each do |c|
    targets = [c]
    idx_name = self.index_name(targets)
    result << Webhookdb::DBAdapter::Index.new(name: idx_name.to_sym, table:, targets:, where: c.index_where)
  end
  self._extra_index_specs.each do |spec|
    targets = spec.columns.map { |n| dba_cols_by_name.fetch(n) }
    idx_name = self.index_name(targets)
    result << Webhookdb::DBAdapter::Index.new(name: idx_name.to_sym, table:, targets:, where: spec.where)
  end
  return result
end

#on_backfill_error(e) ⇒ Object

Called when the #backfill method errors. This can do something like dispatch a developer alert. The handler must raise in order to stop the job from processing- if nothing is raised, the original exception will be raised instead. By default, this method noops, so the original exception is raised.

Parameters:

  • e (Exception)


1070
# File 'lib/webhookdb/replicator/base.rb', line 1070

def on_backfill_error(e) = nil

#on_dependency_webhook_upsert(replicator, payload, changed:) ⇒ Object

Called when the upstream dependency upserts. In most cases, you can noop; but in some cases, you may want to update or fetch rows. One example would be a ‘db only’ integration, where values are taken from the parent service and added to this service’s table. We may want to upsert rows in our table whenever a row in our parent table changes.

Parameters:

Raises:

  • (NotImplementedError)


1147
1148
1149
# File 'lib/webhookdb/replicator/base.rb', line 1147

def on_dependency_webhook_upsert(replicator, payload, changed:)
  raise NotImplementedError, "this must be overridden for replicators that have dependencies"
end

#preferred_create_state_machine_methodSymbol

If the integration supports webhooks, then we want to do that on create. If it’s backfill only, then we fall back to that instead. Things like choosing dependencies are webhook-vs-backfill agnostic, so which machine we choose isn’t that important (but it does happen during creation).

Returns:

  • (Symbol)


201
202
203
# File 'lib/webhookdb/replicator/base.rb', line 201

def preferred_create_state_machine_method
  return self.descriptor.supports_webhooks? ? :calculate_webhook_state_machine : :calculate_backfill_state_machine
end

#preprocess_headers_for_logging(headers) ⇒ Object

In some cases, services may send us sensitive headers we do not want to log. This should be very rare but some services are designed really badly and send auth info in the webhook. Remove or obfuscate the passed header hash.



76
# File 'lib/webhookdb/replicator/base.rb', line 76

def preprocess_headers_for_logging(headers); end

#primary_key_columnWebhookdb::DBAdapter::Column



403
404
405
# File 'lib/webhookdb/replicator/base.rb', line 403

def primary_key_column
  return Webhookdb::DBAdapter::Column.new(name: :pk, type: BIGINT, pk: true)
end

#process_state_change(field, value, attr: nil) ⇒ Webhookdb::Replicator::StateMachineStep

Set the new service integration field and return the newly calculated state machine.

Subclasses can override this method and then super, to change the field or value.

Parameters:

  • field (String)

    Like ‘webhook_secret’, ‘backfill_key’, etc.

  • value (String)

    The value of the field.

  • attr (String) (defaults to: nil)

    Subclasses can pass in a custom field that does not correspond to a service integration column. When doing that, they must pass in attr, which is what will be set during the state change.

Returns:



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/webhookdb/replicator/base.rb', line 159

def process_state_change(field, value, attr: nil)
  attr ||= field
  desc = self.descriptor
  value = value.strip if value.respond_to?(:strip)
  case field
    when *self._webhook_state_change_fields
      # If we don't support webhooks, then the backfill state machine may be using it.
      meth = desc.supports_webhooks? ? :calculate_webhook_state_machine : :calculate_backfill_state_machine
    when *self._backfill_state_change_fields
      # If we don't support backfilling, then the create state machine may be using them.
      meth = desc.supports_backfill? ? :calculate_backfill_state_machine : :calculate_webhook_state_machine
    when "dependency_choice"
      # Choose an upstream dependency for an integration.
      # See where this is used for more details.
      meth = self.preferred_create_state_machine_method
      value = self._find_dependency_candidate(value)
      attr = "depends_on"
    when "noop_create"
      # Use this to just recalculate the state machine,
      # not make any changes to the data.
      return self.calculate_preferred_create_state_machine
    else
      raise ArgumentError, "Field '#{field}' is not valid for a state change"
  end
  self.service_integration.db.transaction do
    self.service_integration.send(:"#{attr}=", value)
    self.service_integration.save_changes
    step = self.send(meth)
    if step.successful? && meth == :calculate_backfill_state_machine
      # If we are processing the backfill state machine, and we finish successfully,
      # we always want to start syncing.
      self._enqueue_backfill_jobs(incremental: true)
    end
    return step
  end
end

#process_webhooks_synchronously?Boolean

Return true if the service should process webhooks in the actual endpoint, rather than asynchronously through the job system. This should ONLY be used where we have important order-of-operations in webhook processing and/or need to return data to the webhook sender.

NOTE: You MUST implement synchronous_processing_response_body if this returns true.

Returns:

  • (Boolean)


56
57
58
# File 'lib/webhookdb/replicator/base.rb', line 56

def process_webhooks_synchronously?
  return false
end

#qualified_table_sequel_identifier(schema: nil, table: nil) ⇒ Sequel::SQL::QualifiedIdentifier

Return a Sequel identifier using schema_and_table_symbols, or schema or table as overrides if given.

Returns:

  • (Sequel::SQL::QualifiedIdentifier)


92
93
94
95
# File 'lib/webhookdb/replicator/base.rb', line 92

def qualified_table_sequel_identifier(schema: nil, table: nil)
  sch, tbl = self.schema_and_table_symbols
  return Sequel[schema || sch][table || tbl]
end

#readonly_dataset(**kw) ⇒ Sequel::Dataset

Yield to a dataset using the readonly connection.

Returns:

  • (Sequel::Dataset)


893
894
895
# File 'lib/webhookdb/replicator/base.rb', line 893

def readonly_dataset(**kw, &)
  self.with_dataset(self.service_integration.organization.readonly_connection_url_raw, **kw, &)
end

#remote_key_columnWebhookdb::DBAdapter::Column



408
409
410
# File 'lib/webhookdb/replicator/base.rb', line 408

def remote_key_column
  return self._remote_key_column.to_dbadapter(unique: true, nullable: false)
end

#requires_sequence?Boolean

Some integrations require sequences, like when upserting rows with numerical unique ids (if they were random values like UUIDs we could generate them and not use a sequence). In those cases, the integrations can mark themselves as requiring a sequence.

The sequence will be created in the *application database*, but it used primarily when inserting rows into the *organization/replication database*. This is necessary because things like sequences are not possible to migrate when moving replication databases.

Returns:

  • (Boolean)


620
621
622
# File 'lib/webhookdb/replicator/base.rb', line 620

def requires_sequence?
  return false
end

#resource_name_pluralObject



44
45
46
# File 'lib/webhookdb/replicator/base.rb', line 44

def resource_name_plural
  return @resource_name_plural ||= self.descriptor.resource_name_plural
end

#resource_name_singularObject



40
41
42
# File 'lib/webhookdb/replicator/base.rb', line 40

def resource_name_singular
  return @resource_name_singular ||= self.descriptor.resource_name_singular
end

#schema_and_table_symbolsArray<Symbol>

Return a tuple of (schema, table) based on the organization’s replication schema, and the service integration’s table name.

Returns:

  • (Array<Symbol>)


82
83
84
85
86
# File 'lib/webhookdb/replicator/base.rb', line 82

def schema_and_table_symbols
  sch = self.service_integration.organization&.replication_schema&.to_sym || :public
  tbl = self.service_integration.table_name.to_sym
  return [sch, tbl]
end

#storable_columnsArray<Webhookdb::DBAdapter::Column>

Denormalized columns, plus the enrichment column if supported. Does not include the data or external id columns, though perhaps it should.

Returns:



439
440
441
442
443
444
445
# File 'lib/webhookdb/replicator/base.rb', line 439

def storable_columns
  cols = self.denormalized_columns
  if (enr = self.enrichment_column)
    cols << enr
  end
  return cols
end

#synchronous_processing_response_body(upserted:, request:) ⇒ String

Call with the value that was inserted by synchronous processing. Takes the row values being upserted (result upsert_webhook), and the arguments used to upsert it (arguments to upsert_webhook), and should return the body string to respond back with.

Parameters:

Returns:

  • (String)

Raises:

  • (NotImplementedError)


68
69
70
71
# File 'lib/webhookdb/replicator/base.rb', line 68

def synchronous_processing_response_body(upserted:, request:)
  return {message: "process synchronously"}.to_json if Webhookdb::Replicator.always_process_synchronously
  raise NotImplementedError, "must be implemented if process_webhooks_synchronously? is true"
end

#timestamp_columnWebhookdb::DBAdapter::Column

Column to use as the ‘timestamp’ for the row. This is usually some created or updated at timestamp.

Returns:

Raises:

  • (NotImplementedError)


450
451
452
453
454
455
# File 'lib/webhookdb/replicator/base.rb', line 450

def timestamp_column
  got = self._denormalized_columns.find { |c| c.name == self._timestamp_column_name }
  raise NotImplementedError, "#{self.descriptor.name} has no timestamp column #{self._timestamp_column_name}" if
    got.nil?
  return got.to_dbadapter
end

#upsert_has_deps?Boolean

Return true if the integration requires making an API call to upsert. This puts the sync into a lower-priority queue so it is less likely to block other processing. This is usually true if enrichments are involved.

Returns:

  • (Boolean)


747
748
749
# File 'lib/webhookdb/replicator/base.rb', line 747

def upsert_has_deps?
  return false
end

#upsert_webhook(request, **kw) ⇒ Object

Upsert a webhook request into the database. Note this is a WebhookRequest, NOT a Rack::Request.



652
653
654
655
656
657
658
659
660
# File 'lib/webhookdb/replicator/base.rb', line 652

def upsert_webhook(request, **kw)
  return self._upsert_webhook(request, **kw)
rescue Amigo::Retry::Error
  # Do not log this since it's expected/handled by Amigo
  raise
rescue StandardError => e
  self.logger.error("upsert_webhook_error", request: request.as_json, error: e)
  raise
end

#upsert_webhook_body(body, **kw) ⇒ Object

Upsert webhook using only a body. This is not valid for the rare integration which does not rely on request info, like when we have to take different action based on a request method.

Parameters:

  • body (Hash)


644
645
646
# File 'lib/webhookdb/replicator/base.rb', line 644

def upsert_webhook_body(body, **kw)
  return self.upsert_webhook(Webhookdb::Replicator::WebhookRequest.new(body:), **kw)
end

#verify_backfill_credentialsWebhookdb::CredentialVerificationResult

Try to verify backfill credentials, by fetching the first page of items. Only relevant for integrations supporting backfilling.

If an error is received, return ‘verify_backfill<http status>_err_msg` as the error message, if defined. So for example, a 401 will call the method _verify_backfill_401_err_msg if defined. If such a method is not defined, call and return _verify_backfill_err_msg.

Returns:

  • (Webhookdb::CredentialVerificationResult)


951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
# File 'lib/webhookdb/replicator/base.rb', line 951

def verify_backfill_credentials
  backfiller = self._backfillers.first
  if backfiller.nil?
    # If for some reason we do not have a backfiller,
    # we can't verify credentials. This should never happen in practice,
    # because we wouldn't call this method if the integration doesn't support it.
    raise "No backfiller available for #{self.service_integration.inspect}"
  end
  begin
    # begin backfill attempt but do not return backfill result
    backfiller.fetch_backfill_page(nil, last_backfilled: nil)
  rescue Webhookdb::Http::Error => e
    msg = if self.respond_to?(:"_verify_backfill_#{e.status}_err_msg")
            self.send(:"_verify_backfill_#{e.status}_err_msg")
    else
      self._verify_backfill_err_msg
    end
    return CredentialVerificationResult.new(verified: false, message: msg)
  rescue TypeError, NoMethodError => e
    # if we don't incur an HTTP error, but do incur an Error due to differences in the shapes of anticipated
    # response data in the `fetch_backfill_page` function, we can assume that the credentials are okay
    self.logger.info "verify_backfill_credentials_expected_failure", error: e
    return CredentialVerificationResult.new(verified: true, message: "")
  end
  return CredentialVerificationResult.new(verified: true, message: "")
end

#webhook_endpointObject



1189
1190
1191
# File 'lib/webhookdb/replicator/base.rb', line 1189

def webhook_endpoint
  return self._webhook_endpoint
end

#webhook_response(request) ⇒ Webhookdb::WebhookResponse

Given a Rack request, return the webhook response object. Usually this performs verification of the request based on the webhook secret configured on the service integration. Note that if skip_webhook_verification is true on the service integration, this method always returns 201.

Parameters:

  • request (Rack::Request)

Returns:



122
123
124
125
# File 'lib/webhookdb/replicator/base.rb', line 122

def webhook_response(request)
  return Webhookdb::WebhookResponse.ok(status: 201) if self.service_integration.skip_webhook_verification
  return self._webhook_response(request)
end