Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1
- Includes:
- DBAdapter::ColumnTypes
- Defined in:
- lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb
Overview
Front has a system of ‘channels’ but it is a challenge to use. This replicator leverages WebhookDB (and our existing Front app) to integrate Front and SignalWire messages, using a sort of two-way sync that implements the necessary Front channel contrWcts.
Note: In the future, we can abstract this to support other channels, with minimal changes.
We have the following concepts to keep in mind:
-
The front_message_v1 replicator stores ALL messages in Front (inbound and outbound).
-
The signalwire_message_v1 replicator stores ALL messages in SignalWire (inbound and outbound).
-
For two-way sync, we care that Outbound Front messages are turned into Outbound SignalWire messages, and Inbound SignalWire messages are turned into Inbound Front messages.
-
This means that, for the purpose of a two-way sync, this replicator can ‘enqueue’ deliveries by storing a row with either a Front message id (query Front for all outbound messages), or SignalWire message id (query signalwire for all inbound messages). When a row has both ids, it means it has been “delivered”, so to speak.
-
We can ignore inbound Front messages and outbound SignalWire messages (stored in their respective replicators), since those are created by this replicator.
This means that, rather than having to manage state between two event-based systems, we can converge to a correct state based on a given state. This is much easier (possible?) to reason about and test, and makes it possible to reuse code,
The order of operations is:
-
The channel description instructs the user to go to /v1/install/front_signalwire/setup.
-
This loads a terminal, showing instructions for how to set up (enabling the WebhookDB Front app, setting up SignalWire).
-
The state machine also asks for the phone number to use to send messages.
-
The phone number used to send messages is stored in the api_url.
-
-
The state machine prints out the API token to use in Front.
-
The api token is stored in the ‘webhookdb_api_key’ field, which is searchable.
-
-
The user is directed to Front, to install the WebhookDB SignalWire channel.
-
The user inputs their API token and connects the channel.
-
Front makes an ‘authorization’ request to /v1/install/front_signalwire/authorization.
-
This uses the API key to find the right front_signalwire_message_channel_app_v1 integration via the webhookdb_api_key field.
-
This stores the channel_id on the integration as the api_url.
-
-
Front makes ‘message’ requests to /v1/install/front_signalwire/message/<opaque id>.
-
This upserts a DB row into the front_message_v1 replicator.
-
It also enqueues a backfill of this replicator.
-
-
Front can make a ‘delete’ request to /v1/install/front_signalwire/message/<opaque id>.
-
This deletes deletes this service integration.
-
-
Because this replicator is a dependent of signalwire_message_v1 (see explanation below), whenever a signalwire row is updated, this replicator will be triggered and enqueue a backfill.
-
When this replicator backfills, it will:
-
Look for inbound SMS, and upsert a row into this replication table.
-
Look for outbound Front messages, and upsert a row into this replication table.
-
Find replication table rows without a signalwire id, and send an SMS.
-
Find replication table rows without a Front message id, and create a Front message using dev.frontapp.com/reference/sync-inbound-message
-
Defined Under Namespace
Classes: Backfiller
Constant Summary
Constants included from DBAdapter::ColumnTypes
DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Class Method Summary collapse
Instance Method Summary collapse
- #_backfillers ⇒ Object
- #_denormalized_columns ⇒ Object
- #_front_recipient_phone(payload) ⇒ Object
- #_notify_dependents(inserting, changed) ⇒ Object
- #_remote_key_column ⇒ Object
- #_resource_and_event(request) ⇒ Object
- #_timestamp_column_name ⇒ Object
- #_update_where_expr ⇒ Object
- #_webhook_response(request) ⇒ Object
- #calculate_backfill_state_machine ⇒ Object
- #calculate_webhook_state_machine ⇒ Object
- #clear_webhook_information ⇒ Object
- #format_phone(s) ⇒ Object
- #front_channel_id ⇒ Object
- #front_channel_id=(c) ⇒ Object
- #on_dependency_webhook_upsert(_replicator, payload, changed:) ⇒ Object
- #process_webhooks_synchronously? ⇒ Boolean
- #support_phone ⇒ Object
- #synchronous_processing_response_body(upserted:, request:) ⇒ Object
Methods inherited from Base
#_any_subscriptions_to_notify?, #_backfill_state_change_fields, #_clear_backfill_information, #_clear_webook_information, #_coalesce_excluded_on_update, #_enqueue_backfill_jobs, #_extra_index_specs, #_fetch_enrichment, #_find_dependency_candidate, #_parallel_backfill, #_prepare_for_insert, #_publish_rowupsert, #_resource_to_data, #_store_enrichment_body?, #_to_json, #_upsert_update_expr, #_upsert_webhook, #_verify_backfill_err_msg, #_webhook_state_change_fields, #admin_dataset, #backfill, #backfill_not_supported_message, #calculate_and_backfill_state_machine, #calculate_dependency_state_machine_step, #calculate_preferred_create_state_machine, chunked_row_update_bounds, #clear_backfill_information, #create_table, #create_table_modification, #data_column, #dbadapter_table, #denormalized_columns, #descriptor, #dispatch_request_to, #documentation_url, #enqueue_sync_targets, #enrichment_column, #ensure_all_columns, #ensure_all_columns_modification, #find_dependent, #find_dependent!, #indices, #initialize, #preferred_create_state_machine_method, #preprocess_headers_for_logging, #primary_key_column, #process_state_change, #qualified_table_sequel_identifier, #readonly_dataset, #remote_key_column, #requires_sequence?, #resource_name_plural, #resource_name_singular, #schema_and_table_symbols, #storable_columns, #timestamp_column, #upsert_has_deps?, #upsert_webhook, #upsert_webhook_body, #verify_backfill_credentials, #webhook_endpoint, #webhook_response
Constructor Details
This class inherits a constructor from Webhookdb::Replicator::Base
Class Method Details
.descriptor ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 64 def self.descriptor return Webhookdb::Replicator::Descriptor.new( name: "front_signalwire_message_channel_app_v1", ctor: self, feature_roles: [], resource_name_singular: "Front/SignalWire Message", dependency_descriptor: Webhookdb::Replicator::SignalwireMessageV1.descriptor, supports_webhooks: true, supports_backfill: true, api_docs_url: "https://dev.frontapp.com/docs/getting-started-with-partner-channels", ) end |
Instance Method Details
#_backfillers ⇒ Object
233 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 233 def _backfillers = [Backfiller.new(self)] |
#_denormalized_columns ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 81 def _denormalized_columns return [ Webhookdb::Replicator::Column.new(:signalwire_sid, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:front_message_id, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:external_conversation_id, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:row_updated_at, TIMESTAMP, defaulter: :now, optional: true, index: true), Webhookdb::Replicator::Column.new(:direction, TEXT), Webhookdb::Replicator::Column.new(:body, TEXT), Webhookdb::Replicator::Column.new(:sender, TEXT), Webhookdb::Replicator::Column.new(:recipient, TEXT), ] end |
#_front_recipient_phone(payload) ⇒ Object
205 206 207 208 209 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 205 def _front_recipient_phone(payload) recipient = payload["recipients"].find { |r| r.fetch("role") == "to" } raise Webhookdb::InvariantViolation, "no recipient found in #{payload}" if recipient.nil? return self.format_phone(recipient.fetch("handle")) end |
#_notify_dependents(inserting, changed) ⇒ Object
227 228 229 230 231 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 227 def _notify_dependents(inserting, changed) super return unless changed Webhookdb::BackfillJob.create_recursive(service_integration: self.service_integration, incremental: true).enqueue end |
#_remote_key_column ⇒ Object
77 78 79 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 77 def _remote_key_column return Webhookdb::Replicator::Column.new(:external_id, TEXT) end |
#_resource_and_event(request) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 175 def _resource_and_event(request) type = request.body["type"] is_signalwire = type.nil? return request.body, nil if is_signalwire # This ends up being called for 'authorization' and 'delete' messages too. # Those are handled in the webhook response body. = ["message", "message_autoreply"].include?(type) return nil, nil unless resource = request.body.dup payload = resource.fetch("payload") mid = if type == "message" payload.fetch("id") else replied_to_id = payload["_links"]["related"]["message_replied_to"].split("/").last "#{replied_to_id}_autoreply" end resource["front_message_id"] = mid # Use the Front ID to identify this outbound message. resource["external_id"] = mid resource["direction"] = "outbound" resource["body"] = payload.fetch("text") resource["sender"] = self.support_phone resource["recipient"] = self._front_recipient_phone(payload) # All messages get the same conversation with SMS/chat, unlike email. resource["external_conversation_id"] = resource["recipient"] return resource, nil end |
#_timestamp_column_name ⇒ Object
94 95 96 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 94 def return :row_updated_at end |
#_update_where_expr ⇒ Object
98 99 100 101 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 98 def _update_where_expr return (self.qualified_table_sequel_identifier[:signalwire_sid] =~ nil) | (self.qualified_table_sequel_identifier[:front_message_id] =~ nil) end |
#_webhook_response(request) ⇒ Object
171 172 173 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 171 def _webhook_response(request) return Webhookdb::Front.webhook_response(request, Webhookdb::Front.signalwire_channel_app_secret) end |
#calculate_backfill_state_machine ⇒ Object
130 131 132 133 134 135 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 130 def calculate_backfill_state_machine # The backfills here are not normal backfills, requested by the customer. # They are procedurally enqueued when we upsert data. # So just reuse the webhook state machine. return self.calculate_webhook_state_machine end |
#calculate_webhook_state_machine ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 106 def calculate_webhook_state_machine if (step = self.calculate_dependency_state_machine_step(dependency_help: "")) return step end step = Webhookdb::Replicator::StateMachineStep.new if self.service_integration.api_url.blank? step.output = %(This Front Channel will be linked to a specific number in SignalWire. Choose the phone number to connect to Front.) return step.prompting("Phone number").api_url(self.service_integration) end self.service_integration.webhookdb_api_key ||= self.service_integration.new_api_key self.service_integration.save_changes step.output = %(Almost there! You can now finish installing the SignalWire Channel in Front. 1. In Front, go to Settings -> Company -> Channels (in the left nav), Connect a Channel, and choose the 'WebhookDB/SignalWire' channel. 2. In the 'Token' field, enter this API Key: #{self.service_integration.webhookdb_api_key} If you need to find this key, you can run `webhookdb integrations info front_signalwire_message_channel_app_v1`. All of this information can be found in the WebhookDB docs, at https://docs.webhookdb.com/guides/front-channel-signalwire/) return step.completed end |
#clear_webhook_information ⇒ Object
137 138 139 140 141 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 137 def clear_webhook_information # We say we support backfill, so this won't get cleared normally. self._clear_backfill_information super end |
#format_phone(s) ⇒ Object
103 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 103 def format_phone(s) = Webhookdb::PhoneNumber.format_e164(s) |
#front_channel_id ⇒ Object
165 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 165 def front_channel_id = self.service_integration.backfill_key |
#front_channel_id=(c) ⇒ Object
167 168 169 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 167 def front_channel_id=(c) self.service_integration.backfill_key = c end |
#on_dependency_webhook_upsert(_replicator, payload, changed:) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 211 def on_dependency_webhook_upsert(_replicator, payload, changed:) return unless changed return unless payload.fetch(:direction) == "inbound" return unless payload.fetch(:to) == self.support_phone body = JSON.parse(payload.fetch(:data)) body.merge!( "external_id" => payload.fetch(:signalwire_id), "signalwire_sid" => payload.fetch(:signalwire_id), "direction" => "inbound", "sender" => payload.fetch(:from), "recipient" => self.support_phone, "external_conversation_id" => payload.fetch(:from), ) self.upsert_webhook_body(body) end |
#process_webhooks_synchronously? ⇒ Boolean
143 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 143 def process_webhooks_synchronously? = true |
#support_phone ⇒ Object
104 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 104 def support_phone = self.format_phone(self.service_integration.api_url) |
#synchronous_processing_response_body(upserted:, request:) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 145 def synchronous_processing_response_body(upserted:, request:) case request.body["type"] when "authorization" self.front_channel_id = request.body.fetch("payload").fetch("channel_id") self.service_integration.save_changes return {type: "success", webhook_url: "#{Webhookdb.api_url}/v1/install/front_signalwire/channel"}.to_json when "delete" self.service_integration.destroy_self_and_all_dependents return "{}" when "message", "message_autoreply" return { type: "success", external_id: upserted.fetch(:external_id), external_conversation_id: upserted.fetch(:external_conversation_id), }.to_json else return "{}" end end |