Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1::Backfiller

Inherits:
Backfiller
  • Object
show all
Defined in:
lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb

Instance Method Summary collapse

Methods inherited from Backfiller

#_fetch_backfill_page_with_retry, #backfill, do_retry_wait, #max_backfill_retry_attempts, #wait_for_retry_attempt

Constructor Details

#initialize(replicator) ⇒ Backfiller

Returns a new instance of Backfiller.



236
237
238
239
240
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 236

def initialize(replicator)
  super()
  @replicator = replicator
  @signalwire_sint = replicator.service_integration.depends_on
end

Instance Method Details

#_send_sms(idempotency, from:, to:, body:) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 285

def _send_sms(idempotency, from:, to:, body:)
  return idempotency.execute do
    Webhookdb::Signalwire.send_sms(
      from:,
      to:,
      body:,
      space_url: @signalwire_sint.api_url,
      project_id: @signalwire_sint.backfill_key,
      api_key: @signalwire_sint.backfill_secret,
      logger: @replicator.logger,
    )
  end
rescue Webhookdb::Http::Error => e
  response_body = e.body
  response_status = e.status
  request_url = e.uri.to_s
  @replicator.logger.warn("signalwire_send_sms_error",
                          response_body:, response_status:, request_url:, sms_from: from, sms_to: to,)
  code = begin
    # If this fails for whatever reason, or there is no 'code', re-raise the original error
    e.response.parsed_response["code"]
  rescue StandardError
    nil
  end
  # All known codes are for the integrator, not on the webhookdb code side.
  # https://developer.signalwire.com/guides/how-to-troubleshoot-common-messaging-issues
  raise e if code.nil?

  message = Webhookdb::Messages::ErrorSignalwireSendSms.new(
    @replicator.service_integration,
    response_status:,
    response_body:,
    request_url:,
    request_method: "POST",
  )
  @replicator.service_integration.organization.alerting.dispatch_alert(message)
  return nil
end

#_sync_front_inbound(sender:, texted_at:, item:, body:) ⇒ Object



324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 324

def _sync_front_inbound(sender:, texted_at:, item:, body:)
  body = {
    sender: {handle: sender},
    body: body || "<no body>",
    delivered_at: texted_at.to_i,
    metadata: {
      external_id: item.fetch(:external_id),
      external_conversation_id: item.fetch(:external_conversation_id),
    },
  }
  token = JWT.encode(
    {
      iss: Webhookdb::Front.signalwire_channel_app_id,
      jti: Webhookdb::Front.channel_jwt_jti,
      sub: @replicator.front_channel_id,
      exp: 10.seconds.from_now.to_i,
    },
    Webhookdb::Front.signalwire_channel_app_secret,
  )
  resp = Webhookdb::Http.post(
    "https://api2.frontapp.com/channels/#{@replicator.front_channel_id}/inbound_messages",
    body,
    headers: {"Authorization" => "Bearer #{token}"},
    timeout: Webhookdb::Front.http_timeout,
    logger: @replicator.logger,
  )
  resp.parsed_response
end

#fetch_backfill_pageObject



353
354
355
356
357
358
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 353

def fetch_backfill_page(*)
  rows = @replicator.admin_dataset do |ds|
    ds.where(Sequel[signalwire_sid: nil] | Sequel[front_message_id: nil]).all
  end
  return rows, nil
end

#handle_item(item) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 242

def handle_item(item)
  front_id = item.fetch(:front_message_id)
  sw_id = item.fetch(:signalwire_sid)
  if (front_id && sw_id) || (!front_id && !sw_id)
    msg = "row should have a front id OR signalwire id, should not have been inserted, or selected: #{item}"
    raise Webhookdb::InvariantViolation, msg
  end
  sender = @replicator.format_phone(item.fetch(:sender))
  recipient = @replicator.format_phone(item.fetch(:recipient))
  body = item.fetch(:body)
  idempotency_key = "fsmca-fims-#{item.fetch(:external_id)}"
  idempotency = Webhookdb::Idempotency.once_ever.stored.using_seperate_connection.under_key(idempotency_key)
  if front_id.nil?
    texted_at = Time.parse(item.fetch(:data).fetch("date_created"))
    if texted_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      item[:front_message_id] = "skipped_due_to_age"
    else
      # sync the message into Front
      front_response_body = idempotency.execute do
        self._sync_front_inbound(sender:, texted_at:, item:, body:)
      end
      item[:front_message_id] = front_response_body.fetch("message_uid")
    end
  else
    messaged_at = Time.at(item.fetch(:data).fetch("payload").fetch("created_at"))
    if messaged_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      item[:signalwire_sid] = "skipped_due_to_age"
    else
      # send the SMS via signalwire
      signalwire_resp = _send_sms(
        idempotency,
        from: sender,
        to: recipient,
        body:,
      )
      item[:signalwire_sid] = signalwire_resp.fetch("sid") if signalwire_resp
    end
  end
  @replicator.upsert_webhook_body(item.deep_stringify_keys)
end