Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1::Backfiller

Inherits:
Backfiller
  • Object
show all
Defined in:
lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb

Instance Method Summary collapse

Methods inherited from Backfiller

#_fetch_backfill_page_with_retry, #backfill, do_retry_wait, #max_backfill_retry_attempts, #wait_for_retry_attempt

Constructor Details

#initialize(replicator) ⇒ Backfiller

Returns a new instance of Backfiller.



236
237
238
239
240
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 236

def initialize(replicator)
  super()
  @replicator = replicator
  @signalwire_sint = replicator.service_integration.depends_on
end

Instance Method Details

#_send_sms(idempotency, from:, to:, body:) ⇒ Object



294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 294

def _send_sms(idempotency, from:, to:, body:)
  return idempotency.execute do
    Webhookdb::Signalwire.send_sms(
      from:,
      to:,
      body:,
      space_url: @signalwire_sint.api_url,
      project_id: @signalwire_sint.backfill_key,
      api_key: @signalwire_sint.backfill_secret,
      logger: @replicator.logger,
    )
  end
rescue Webhookdb::Http::Error => e
  response_body = e.body
  response_status = e.status
  request_url = e.uri.to_s
  @replicator.logger.warn("signalwire_send_sms_error",
                          response_body:, response_status:, request_url:, sms_from: from, sms_to: to,)
  code = begin
    # If this fails for whatever reason, or there is no 'code', re-raise the original error
    e.response.parsed_response["code"]
  rescue StandardError
    nil
  end
  # All known codes are for the integrator, not on the webhookdb code side.
  # https://developer.signalwire.com/guides/how-to-troubleshoot-common-messaging-issues
  raise e if code.nil?

  message = Webhookdb::Messages::ErrorSignalwireSendSms.new(
    @replicator.service_integration,
    response_status:,
    response_body:,
    request_url:,
    request_method: "POST",
  )
  @replicator.service_integration.organization.alerting.dispatch_alert(message)
  return nil
end

#_sync_front_inbound(sender:, texted_at:, db_row:, body:) ⇒ Object



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 333

def _sync_front_inbound(sender:, texted_at:, db_row:, body:)
  body = {
    sender: {handle: sender},
    body: body || "<no body>",
    delivered_at: texted_at.to_i,
    metadata: {
      external_id: db_row.fetch(:external_id),
      external_conversation_id: db_row.fetch(:external_conversation_id),
    },
  }
  token = JWT.encode(
    {
      iss: Webhookdb::Front.signalwire_channel_app_id,
      jti: Webhookdb::Front.channel_jwt_jti,
      sub: @replicator.front_channel_id,
      exp: 10.seconds.from_now.to_i,
    },
    Webhookdb::Front.signalwire_channel_app_secret,
  )
  resp = Webhookdb::Http.post(
    "https://api2.frontapp.com/channels/#{@replicator.front_channel_id}/inbound_messages",
    body,
    headers: {"Authorization" => "Bearer #{token}"},
    timeout: Webhookdb::Front.http_timeout,
    logger: @replicator.logger,
  )
  resp.parsed_response
end

#fetch_backfill_pageObject



362
363
364
365
366
367
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 362

def fetch_backfill_page(*)
  rows = @replicator.admin_dataset do |ds|
    ds.where(Sequel[signalwire_sid: nil] | Sequel[front_message_id: nil]).all
  end
  return rows, nil
end

#handle_item(db_row) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 242

def handle_item(db_row)
  front_id = db_row.fetch(:front_message_id)
  sw_id = db_row.fetch(:signalwire_sid)
  # This is sort of gross- we get the db row here, and need to re-update it with certain fields
  # as a result of the signalwire or front sync. To do that, we need to run the upsert on 'data',
  # but what's in 'data' is incomplete. So we use the db row to form a more fully complete 'data'.
  upserting_data = db_row.dup
  # Remove the columns that don't belong in 'data'
  upserting_data.delete(:pk)
  upserting_data.delete(:row_updated_at)
  # Splat the 'data' column into the row so it all gets put back into 'data'
  upserting_data.merge!(**upserting_data.delete(:data))
  if (front_id && sw_id) || (!front_id && !sw_id)
    msg = "row should have a front id OR signalwire id, should not have been inserted, or selected: #{db_row}"
    raise Webhookdb::InvariantViolation, msg
  end
  sender = @replicator.format_phone(db_row.fetch(:sender))
  recipient = @replicator.format_phone(db_row.fetch(:recipient))
  body = db_row.fetch(:body)
  idempotency_key = "fsmca-fims-#{db_row.fetch(:external_id)}"
  idempotency = Webhookdb::Idempotency.once_ever.stored.using_seperate_connection.under_key(idempotency_key)
  if front_id.nil?
    texted_at = Time.parse(db_row.fetch(:data).fetch("date_created"))
    if texted_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      upserting_data[:front_message_id] = "skipped_due_to_age"
    else
      # sync the message into Front
      front_response_body = idempotency.execute do
        self._sync_front_inbound(sender:, texted_at:, db_row:, body:)
      end
      upserting_data[:front_message_id] = front_response_body.fetch("message_uid")
    end
  else
    messaged_at = Time.at(db_row.fetch(:data).fetch("payload").fetch("created_at"))
    if messaged_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      upserting_data[:signalwire_sid] = "skipped_due_to_age"
    else
      # send the SMS via signalwire
      signalwire_resp = _send_sms(
        idempotency,
        from: sender,
        to: recipient,
        body:,
      )
      upserting_data[:signalwire_sid] = signalwire_resp.fetch("sid") if signalwire_resp
    end
  end
  @replicator.upsert_webhook_body(upserting_data.deep_stringify_keys)
end