Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1

Inherits:
Base
  • Object
show all
Includes:
DBAdapter::ColumnTypes
Defined in:
lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb

Overview

Front has a system of ‘channels’ but it is a challenge to use. This replicator leverages WebhookDB (and our existing Front app) to integrate Front and SignalWire messages, using a sort of two-way sync that implements the necessary Front channel contrWcts.

Note: In the future, we can abstract this to support other channels, with minimal changes.

We have the following concepts to keep in mind:

  • The front_message_v1 replicator stores ALL messages in Front (inbound and outbound).

  • The signalwire_message_v1 replicator stores ALL messages in SignalWire (inbound and outbound).

  • For two-way sync, we care that Outbound Front messages are turned into Outbound SignalWire messages, and Inbound SignalWire messages are turned into Inbound Front messages.

  • This means that, for the purpose of a two-way sync, this replicator can ‘enqueue’ deliveries by storing a row with either a Front message id (query Front for all outbound messages), or SignalWire message id (query signalwire for all inbound messages). When a row has both ids, it means it has been “delivered”, so to speak.

  • We can ignore inbound Front messages and outbound SignalWire messages (stored in their respective replicators), since those are created by this replicator.

This means that, rather than having to manage state between two event-based systems, we can converge to a correct state based on a given state. This is much easier (possible?) to reason about and test, and makes it possible to reuse code,

The order of operations is:

  • The channel description instructs the user to go to /v1/install/front_signalwire/setup.

  • This loads a terminal, showing instructions for how to set up (enabling the WebhookDB Front app, setting up SignalWire).

  • The state machine also asks for the phone number to use to send messages.

    • The phone number used to send messages is stored in the api_url.

  • The state machine prints out the API token to use in Front.

    • The api token is stored in the ‘webhookdb_api_key’ field, which is searchable.

  • The user is directed to Front, to install the WebhookDB SignalWire channel.

  • The user inputs their API token and connects the channel.

  • Front makes an ‘authorization’ request to /v1/install/front_signalwire/authorization.

    • This uses the API key to find the right front_signalwire_message_channel_app_v1 integration via the webhookdb_api_key field.

    • This stores the channel_id on the integration as the api_url.

  • Front makes ‘message’ requests to /v1/install/front_signalwire/message/<opaque id>.

    • This upserts a DB row into the front_message_v1 replicator.

    • It also enqueues a backfill of this replicator.

  • Front can make a ‘delete’ request to /v1/install/front_signalwire/message/<opaque id>.

    • This deletes deletes this service integration.

  • Because this replicator is a dependent of signalwire_message_v1 (see explanation below), whenever a signalwire row is updated, this replicator will be triggered and enqueue a backfill.

  • When this replicator backfills, it will:

    • Look for inbound SMS, and upsert a row into this replication table.

    • Look for outbound Front messages, and upsert a row into this replication table.

    • Find replication table rows without a signalwire id, and send an SMS.

    • Find replication table rows without a Front message id, and create a Front message using dev.frontapp.com/reference/sync-inbound-message

Defined Under Namespace

Classes: Backfiller

Constant Summary

Constants included from DBAdapter::ColumnTypes

DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID

Constants inherited from Base

Base::MAX_INDEX_NAME_LENGTH

Instance Attribute Summary

Attributes inherited from Base

#service_integration

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#_any_subscriptions_to_notify?, #_backfill_state_change_fields, #_clear_backfill_information, #_clear_webook_information, #_coalesce_excluded_on_update, #_enqueue_backfill_jobs, #_extra_index_specs, #_fetch_enrichment, #_find_dependency_candidate, #_parallel_backfill, #_prepare_for_insert, #_publish_rowupsert, #_resource_to_data, #_store_enrichment_body?, #_to_json, #_upsert_update_expr, #_upsert_webhook, #_verify_backfill_err_msg, #_webhook_state_change_fields, #admin_dataset, #backfill, #backfill_not_supported_message, #calculate_and_backfill_state_machine, #calculate_dependency_state_machine_step, #calculate_preferred_create_state_machine, chunked_row_update_bounds, #clear_backfill_information, #create_table, #create_table_modification, #data_column, #dbadapter_table, #denormalized_columns, #descriptor, #dispatch_request_to, #documentation_url, #enqueue_sync_targets, #enrichment_column, #ensure_all_columns, #ensure_all_columns_modification, #find_dependent, #find_dependent!, #indices, #initialize, #preferred_create_state_machine_method, #preprocess_headers_for_logging, #primary_key_column, #process_state_change, #qualified_table_sequel_identifier, #readonly_dataset, #remote_key_column, #requires_sequence?, #resource_name_plural, #resource_name_singular, #schema_and_table_symbols, #storable_columns, #timestamp_column, #upsert_has_deps?, #upsert_webhook, #upsert_webhook_body, #verify_backfill_credentials, #webhook_endpoint, #webhook_response

Constructor Details

This class inherits a constructor from Webhookdb::Replicator::Base

Class Method Details

.descriptorObject



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 64

def self.descriptor
  return Webhookdb::Replicator::Descriptor.new(
    name: "front_signalwire_message_channel_app_v1",
    ctor: self,
    feature_roles: [],
    resource_name_singular: "Front/SignalWire Message",
    dependency_descriptor: Webhookdb::Replicator::SignalwireMessageV1.descriptor,
    supports_webhooks: true,
    supports_backfill: true,
    api_docs_url: "https://dev.frontapp.com/docs/getting-started-with-partner-channels",
  )
end

Instance Method Details

#_backfillersObject



233
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 233

def _backfillers = [Backfiller.new(self)]

#_denormalized_columnsObject



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 81

def _denormalized_columns
  return [
    Webhookdb::Replicator::Column.new(:signalwire_sid, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:front_message_id, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:external_conversation_id, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:row_updated_at, TIMESTAMP, defaulter: :now, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:direction, TEXT),
    Webhookdb::Replicator::Column.new(:body, TEXT),
    Webhookdb::Replicator::Column.new(:sender, TEXT),
    Webhookdb::Replicator::Column.new(:recipient, TEXT),
  ]
end

#_front_recipient_phone(payload) ⇒ Object



205
206
207
208
209
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 205

def _front_recipient_phone(payload)
  recipient = payload["recipients"].find { |r| r.fetch("role") == "to" }
  raise Webhookdb::InvariantViolation, "no recipient found in #{payload}" if recipient.nil?
  return self.format_phone(recipient.fetch("handle"))
end

#_notify_dependents(inserting, changed) ⇒ Object



227
228
229
230
231
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 227

def _notify_dependents(inserting, changed)
  super
  return unless changed
  Webhookdb::BackfillJob.create_recursive(service_integration: self.service_integration, incremental: true).enqueue
end

#_remote_key_columnObject



77
78
79
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 77

def _remote_key_column
  return Webhookdb::Replicator::Column.new(:external_id, TEXT)
end

#_resource_and_event(request) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 175

def _resource_and_event(request)
  type = request.body["type"]
  is_signalwire = type.nil?
  return request.body, nil if is_signalwire

  # This ends up being called for 'authorization' and 'delete' messages too.
  # Those are handled in the webhook response body.
  is_message_type = ["message", "message_autoreply"].include?(type)
  return nil, nil unless is_message_type

  resource = request.body.dup
  payload = resource.fetch("payload")
  mid = if type == "message"
          payload.fetch("id")
    else
      replied_to_id = payload["_links"]["related"]["message_replied_to"].split("/").last
      "#{replied_to_id}_autoreply"
  end
  resource["front_message_id"] = mid
  # Use the Front ID to identify this outbound message.
  resource["external_id"] = mid
  resource["direction"] = "outbound"
  resource["body"] = payload.fetch("text")
  resource["sender"] = self.support_phone
  resource["recipient"] = self._front_recipient_phone(payload)
  # All messages get the same conversation with SMS/chat, unlike email.
  resource["external_conversation_id"] = resource["recipient"]
  return resource, nil
end

#_timestamp_column_nameObject



94
95
96
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 94

def _timestamp_column_name
  return :row_updated_at
end

#_update_where_exprObject



98
99
100
101
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 98

def _update_where_expr
  return (self.qualified_table_sequel_identifier[:signalwire_sid] =~ nil) |
      (self.qualified_table_sequel_identifier[:front_message_id] =~ nil)
end

#_webhook_response(request) ⇒ Object



171
172
173
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 171

def _webhook_response(request)
  return Webhookdb::Front.webhook_response(request, Webhookdb::Front.signalwire_channel_app_secret)
end

#calculate_backfill_state_machineObject



130
131
132
133
134
135
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 130

def calculate_backfill_state_machine
  # The backfills here are not normal backfills, requested by the customer.
  # They are procedurally enqueued when we upsert data.
  # So just reuse the webhook state machine.
  return self.calculate_webhook_state_machine
end

#calculate_webhook_state_machineObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 106

def calculate_webhook_state_machine
  if (step = self.calculate_dependency_state_machine_step(dependency_help: ""))
    return step
  end
  step = Webhookdb::Replicator::StateMachineStep.new
  if self.service_integration.api_url.blank?
    step.output = %(This Front Channel will be linked to a specific number in SignalWire.
Choose the phone number to connect to Front.)
    return step.prompting("Phone number").api_url(self.service_integration)
  end
  self.service_integration.webhookdb_api_key ||= self.service_integration.new_api_key
  self.service_integration.save_changes
  step.output = %(Almost there! You can now finish installing the SignalWire Channel in Front.

1. In Front, go to Settings -> Company -> Channels (in the left nav), Connect a Channel,
 and choose the 'WebhookDB/SignalWire' channel.
2. In the 'Token' field, enter this API Key: #{self.service_integration.webhookdb_api_key}

If you need to find this key, you can run `webhookdb integrations info front_signalwire_message_channel_app_v1`.

All of this information can be found in the WebhookDB docs, at https://docs.webhookdb.com/guides/front-channel-signalwire/)
  return step.completed
end

#clear_webhook_informationObject



137
138
139
140
141
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 137

def clear_webhook_information
  # We say we support backfill, so this won't get cleared normally.
  self._clear_backfill_information
  super
end

#format_phone(s) ⇒ Object



103
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 103

def format_phone(s) = Webhookdb::PhoneNumber.format_e164(s)

#front_channel_idObject



165
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 165

def front_channel_id = self.service_integration.backfill_key

#front_channel_id=(c) ⇒ Object



167
168
169
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 167

def front_channel_id=(c)
  self.service_integration.backfill_key = c
end

#on_dependency_webhook_upsert(_replicator, payload, changed:) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 211

def on_dependency_webhook_upsert(_replicator, payload, changed:)
  return unless changed
  return unless payload.fetch(:direction) == "inbound"
  return unless payload.fetch(:to) == self.support_phone
  body = JSON.parse(payload.fetch(:data))
  body.merge!(
    "external_id" => payload.fetch(:signalwire_id),
    "signalwire_sid" => payload.fetch(:signalwire_id),
    "direction" => "inbound",
    "sender" => payload.fetch(:from),
    "recipient" => self.support_phone,
    "external_conversation_id" => payload.fetch(:from),
  )
  self.upsert_webhook_body(body)
end

#process_webhooks_synchronously?Boolean

Returns:

  • (Boolean)


143
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 143

def process_webhooks_synchronously? = true

#support_phoneObject



104
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 104

def support_phone = self.format_phone(self.service_integration.api_url)

#synchronous_processing_response_body(upserted:, request:) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 145

def synchronous_processing_response_body(upserted:, request:)
  case request.body["type"]
    when "authorization"
      self.front_channel_id = request.body.fetch("payload").fetch("channel_id")
      self.service_integration.save_changes
      return {type: "success", webhook_url: "#{Webhookdb.api_url}/v1/install/front_signalwire/channel"}.to_json
    when "delete"
      self.service_integration.destroy_self_and_all_dependents
      return "{}"
    when "message", "message_autoreply"
      return {
        type: "success",
        external_id: upserted.fetch(:external_id),
        external_conversation_id: upserted.fetch(:external_conversation_id),
      }.to_json
    else
      return "{}"
  end
end