Class: Webhookdb::WebhookSubscription

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/webhook_subscription.rb

Overview

Webhook subscriptions have a few parts:

  • The WebhookSubscription itself (this model), which represents a user’s desire to receive all webhooks at a URL.

  • The individual Delivery, which is a single ‘rowupsert’ event being delivered to a subscription. That is, if multiple rowupserts are done, there will be multiple Deliveries to a single Subscription, one for each rowupsert. Likewise, if a single rowupsert is done, but there are multiple Subscriptions, there will be multiple Deliveries, one to each Subscription.

  • Async job that listens for rowupsert events and enqueues new deliveries.

  • When a delivery is ‘enqueued’, it is created in the database, and then a sidekiq job is put into Redis. This sidekiq job operates OUTSIDE of our normal job system since we do not want to bother with audit logging or routing (enough history is in the DB already, though we could add it if needed).

  • We attempt the delivery until it succeeds, or we run out of attempts. See #attempt_delivery.

Defined Under Namespace

Classes: Delivery

Constant Summary collapse

TIMEOUT =

Amount of time we wait for a response from the server.

10.seconds
MAX_DELIVERY_ATTEMPTS =

An individual will be delivered this many times before giving up.

25

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.backoff_for_attempt(attempt) ⇒ Object



160
161
162
163
164
165
# File 'lib/webhookdb/webhook_subscription.rb', line 160

def self.backoff_for_attempt(attempt)
  return 1 if attempt <= 1
  return attempt * 2 if attempt <= 10
  return attempt * 3 if attempt <= 20
  return attempt * 4
end

Instance Method Details

#_fatal(d, e) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/webhookdb/webhook_subscription.rb', line 167

def _fatal(d, e)
  Webhookdb::DeveloperAlert.new(
    subsystem: "Webhook Subscriptions",
    emoji: ":hook:",
    fallback: "Error delivering WebhookSubscription::Delivery[id: #{d.id}, subscription_id: #{self.id}]: #{e}",
    fields: [
      {title: "Org", value: self.fetch_organization.display_string, short: true},
      {title: "Creator", value: self.created_by&.email, short: true},
      {title: "Delivery", value: "#{d.id}, Subscription: #{self.id}, Attempts: #{d.attempt_count}"},
      {title: "URL", value: self.deliver_to_url, short: false},
      {title: "Exception", value: e.inspect, short: false},
    ],
  ).emit
end

#_retry(delivery, attempt) ⇒ Object



155
156
157
158
# File 'lib/webhookdb/webhook_subscription.rb', line 155

def _retry(delivery, attempt)
  delay = self.class.backoff_for_attempt(attempt)
  Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_in(delay, delivery.id)
end

#active?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/webhookdb/webhook_subscription.rb', line 50

def active?
  return !self.deactivated?
end

#associated_idObject



188
189
190
191
192
# File 'lib/webhookdb/webhook_subscription.rb', line 188

def associated_id
  return self.organization.key unless self.organization_id.nil?
  return self.service_integration.opaque_id unless self.service_integration_id.nil?
  return ""
end

#associated_typeObject



182
183
184
185
186
# File 'lib/webhookdb/webhook_subscription.rb', line 182

def associated_type
  return "organization" unless self.organization_id.nil?
  return "service_integration" unless self.service_integration_id.nil?
  return ""
end

#attempt_delivery(d) ⇒ Object

Attempt to deliver the payload in d to the configured URL (see #deliver). Noops if the subscription is deactivated.

If the attempt succeeds, no attempts are enqueued.

If the attempt fails, another async job to reattempt delivery will be enqueued for some time in the future based on the number of attempts. The timestamp and http status are stored on the delivery for future analysis.

After too many failures, no more attempts will be enqueued. Instead, a developer alert is emitted.

In the future, we will support manually re-attempting delivery (success of which should clear deactivated subscriptions), and automatic deactivation (after some criteria of abandonment has been met).



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/webhookdb/webhook_subscription.rb', line 128

def attempt_delivery(d)
  return if self.deactivated?
  d.db.transaction do
    d.lock!
    attempt = d.attempt_count + 1
    begin
      r = self.deliver(**d.payload.symbolize_keys, headers: {"Whdb-Attempt" => attempt.to_s})
      d.add_attempt(status: r.code)
    rescue StandardError => e
      self.logger.error(
        "webhook_subscription_delivery_failure",
        error: e,
        webhook_subscription_id: self.id,
        webhook_subscription_delivery_id: d.id,
      )
      d.add_attempt(status: e.is_a?(Webhookdb::Http::Error) ? e.status : 0)
      if attempt < MAX_DELIVERY_ATTEMPTS
        self._retry(d, attempt)
      else
        self._fatal(d, e)
      end
    ensure
      d.save_changes
    end
  end
end

#before_createObject

:Sequel Hooks:



198
199
200
# File 'lib/webhookdb/webhook_subscription.rb', line 198

def before_create
  self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("wsb")
end

#create_delivery(payload) ⇒ Object



102
103
104
# File 'lib/webhookdb/webhook_subscription.rb', line 102

def create_delivery(payload)
  return Webhookdb::WebhookSubscription::Delivery.create(webhook_subscription: self, payload:)
end

#deactivate(at: Time.now) ⇒ Object



58
59
60
61
# File 'lib/webhookdb/webhook_subscription.rb', line 58

def deactivate(at: Time.now)
  self.deactivated_at = at
  return self
end

#deactivated?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/webhookdb/webhook_subscription.rb', line 54

def deactivated?
  return !!self.deactivated_at
end

#deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) ⇒ Object

Deliver the webhook payload to the configured URL. This does NOT create or deal with WebhookSubscription::Delivery; it is for the actual delivering.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/webhookdb/webhook_subscription.rb', line 74

def deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {})
  body = {
    service_name:,
    table_name:,
    row:,
    external_id:,
    external_id_column:,
  }
  return Webhookdb::Http.post(
    self.deliver_to_url,
    body,
    headers: {"Whdb-Webhook-Secret" => self.webhook_secret}.merge(headers),
    timeout: TIMEOUT,
    logger: self.logger,
  )
end

#deliver_test_event(external_id: SecureRandom.hex(6)) ⇒ Object



91
92
93
94
95
96
97
98
99
100
# File 'lib/webhookdb/webhook_subscription.rb', line 91

def deliver_test_event(external_id: SecureRandom.hex(6))
  return self.deliver(
    service_name: "test service",
    table_name: "test_table_name",
    external_id:,
    external_id_column: "external_id",
    row: {data: ["alpha", "beta", "charlie", "delta"]},
    headers: {"Whdb-Test-Event" => "1"},
  )
end

#enqueue_delivery(payload) ⇒ Object

Create a new Delivery and enqueue it for async processing.



107
108
109
110
111
# File 'lib/webhookdb/webhook_subscription.rb', line 107

def enqueue_delivery(payload)
  delivery = self.create_delivery(payload)
  Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_async(delivery.id)
  return delivery
end

#fetch_organizationObject



63
64
65
# File 'lib/webhookdb/webhook_subscription.rb', line 63

def fetch_organization
  return self.organization || self.service_integration.organization
end

#statusObject



67
68
69
# File 'lib/webhookdb/webhook_subscription.rb', line 67

def status
  return self.deactivated? ? "deactivated" : "active"
end