Class: Webhookdb::WebhookSubscription
- Inherits:
-
Object
- Object
- Webhookdb::WebhookSubscription
- Defined in:
- lib/webhookdb/webhook_subscription.rb
Overview
Webhook subscriptions have a few parts:
-
The WebhookSubscription itself (this model), which represents a user’s desire to receive all webhooks at a URL.
-
The individual Delivery, which is a single ‘rowupsert’ event being delivered to a subscription. That is, if multiple rowupserts are done, there will be multiple Deliveries to a single Subscription, one for each rowupsert. Likewise, if a single rowupsert is done, but there are multiple Subscriptions, there will be multiple Deliveries, one to each Subscription.
-
Async job that listens for rowupsert events and enqueues new deliveries.
-
When a delivery is ‘enqueued’, it is created in the database, and then a sidekiq job is put into Redis. This sidekiq job operates OUTSIDE of our normal job system since we do not want to bother with audit logging or routing (enough history is in the DB already, though we could add it if needed).
-
We attempt the delivery until it succeeds, or we run out of attempts. See #attempt_delivery.
Defined Under Namespace
Classes: Delivery
Constant Summary collapse
- TIMEOUT =
Amount of time we wait for a response from the server.
10.seconds
- MAX_DELIVERY_ATTEMPTS =
An individual will be delivered this many times before giving up.
25
Class Method Summary collapse
Instance Method Summary collapse
- #_fatal(d, e) ⇒ Object
- #_retry(delivery, attempt) ⇒ Object
- #active? ⇒ Boolean
- #associated_id ⇒ Object
- #associated_type ⇒ Object
-
#attempt_delivery(d) ⇒ Object
Attempt to deliver the payload in
d
to the configured URL (see #deliver). -
#before_create ⇒ Object
:Sequel Hooks:.
- #create_delivery(payload) ⇒ Object
- #deactivate(at: Time.now) ⇒ Object
- #deactivated? ⇒ Boolean
-
#deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) ⇒ Object
Deliver the webhook payload to the configured URL.
- #deliver_test_event(external_id: SecureRandom.hex(6)) ⇒ Object
-
#enqueue_delivery(payload) ⇒ Object
Create a new Delivery and enqueue it for async processing.
- #fetch_organization ⇒ Object
- #status ⇒ Object
Class Method Details
.backoff_for_attempt(attempt) ⇒ Object
160 161 162 163 164 165 |
# File 'lib/webhookdb/webhook_subscription.rb', line 160 def self.backoff_for_attempt(attempt) return 1 if attempt <= 1 return attempt * 2 if attempt <= 10 return attempt * 3 if attempt <= 20 return attempt * 4 end |
Instance Method Details
#_fatal(d, e) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/webhookdb/webhook_subscription.rb', line 167 def _fatal(d, e) Webhookdb::DeveloperAlert.new( subsystem: "Webhook Subscriptions", emoji: ":hook:", fallback: "Error delivering WebhookSubscription::Delivery[id: #{d.id}, subscription_id: #{self.id}]: #{e}", fields: [ {title: "Org", value: self.fetch_organization.display_string, short: true}, {title: "Creator", value: self.created_by&.email, short: true}, {title: "Delivery", value: "#{d.id}, Subscription: #{self.id}, Attempts: #{d.attempt_count}"}, {title: "URL", value: self.deliver_to_url, short: false}, {title: "Exception", value: e.inspect, short: false}, ], ).emit end |
#_retry(delivery, attempt) ⇒ Object
155 156 157 158 |
# File 'lib/webhookdb/webhook_subscription.rb', line 155 def _retry(delivery, attempt) delay = self.class.backoff_for_attempt(attempt) Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_in(delay, delivery.id) end |
#active? ⇒ Boolean
50 51 52 |
# File 'lib/webhookdb/webhook_subscription.rb', line 50 def active? return !self.deactivated? end |
#associated_id ⇒ Object
188 189 190 191 192 |
# File 'lib/webhookdb/webhook_subscription.rb', line 188 def associated_id return self.organization.key unless self.organization_id.nil? return self.service_integration.opaque_id unless self.service_integration_id.nil? return "" end |
#associated_type ⇒ Object
182 183 184 185 186 |
# File 'lib/webhookdb/webhook_subscription.rb', line 182 def associated_type return "organization" unless self.organization_id.nil? return "service_integration" unless self.service_integration_id.nil? return "" end |
#attempt_delivery(d) ⇒ Object
Attempt to deliver the payload in d
to the configured URL (see #deliver). Noops if the subscription is deactivated.
If the attempt succeeds, no attempts are enqueued.
If the attempt fails, another async job to reattempt delivery will be enqueued for some time in the future based on the number of attempts. The timestamp and http status are stored on the delivery for future analysis.
After too many failures, no more attempts will be enqueued. Instead, a developer alert is emitted.
In the future, we will support manually re-attempting delivery (success of which should clear deactivated subscriptions), and automatic deactivation (after some criteria of abandonment has been met).
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/webhookdb/webhook_subscription.rb', line 128 def attempt_delivery(d) return if self.deactivated? d.db.transaction do d.lock! attempt = d.attempt_count + 1 begin r = self.deliver(**d.payload.symbolize_keys, headers: {"Whdb-Attempt" => attempt.to_s}) d.add_attempt(status: r.code) rescue StandardError => e self.logger.error( "webhook_subscription_delivery_failure", error: e, webhook_subscription_id: self.id, webhook_subscription_delivery_id: d.id, ) d.add_attempt(status: e.is_a?(Webhookdb::Http::Error) ? e.status : 0) if attempt < MAX_DELIVERY_ATTEMPTS self._retry(d, attempt) else self._fatal(d, e) end ensure d.save_changes end end end |
#before_create ⇒ Object
:Sequel Hooks:
198 199 200 |
# File 'lib/webhookdb/webhook_subscription.rb', line 198 def before_create self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("wsb") end |
#create_delivery(payload) ⇒ Object
102 103 104 |
# File 'lib/webhookdb/webhook_subscription.rb', line 102 def create_delivery(payload) return Webhookdb::WebhookSubscription::Delivery.create(webhook_subscription: self, payload:) end |
#deactivate(at: Time.now) ⇒ Object
58 59 60 61 |
# File 'lib/webhookdb/webhook_subscription.rb', line 58 def deactivate(at: Time.now) self.deactivated_at = at return self end |
#deactivated? ⇒ Boolean
54 55 56 |
# File 'lib/webhookdb/webhook_subscription.rb', line 54 def deactivated? return !!self.deactivated_at end |
#deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) ⇒ Object
Deliver the webhook payload to the configured URL. This does NOT create or deal with WebhookSubscription::Delivery; it is for the actual delivering.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/webhookdb/webhook_subscription.rb', line 74 def deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) body = { service_name:, table_name:, row:, external_id:, external_id_column:, } return Webhookdb::Http.post( self.deliver_to_url, body, headers: {"Whdb-Webhook-Secret" => self.webhook_secret}.merge(headers), timeout: TIMEOUT, logger: self.logger, ) end |
#deliver_test_event(external_id: SecureRandom.hex(6)) ⇒ Object
91 92 93 94 95 96 97 98 99 100 |
# File 'lib/webhookdb/webhook_subscription.rb', line 91 def deliver_test_event(external_id: SecureRandom.hex(6)) return self.deliver( service_name: "test service", table_name: "test_table_name", external_id:, external_id_column: "external_id", row: {data: ["alpha", "beta", "charlie", "delta"]}, headers: {"Whdb-Test-Event" => "1"}, ) end |
#enqueue_delivery(payload) ⇒ Object
Create a new Delivery and enqueue it for async processing.
107 108 109 110 111 |
# File 'lib/webhookdb/webhook_subscription.rb', line 107 def enqueue_delivery(payload) delivery = self.create_delivery(payload) Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_async(delivery.id) return delivery end |
#fetch_organization ⇒ Object
63 64 65 |
# File 'lib/webhookdb/webhook_subscription.rb', line 63 def fetch_organization return self.organization || self.service_integration.organization end |
#status ⇒ Object
67 68 69 |
# File 'lib/webhookdb/webhook_subscription.rb', line 67 def status return self.deactivated? ? "deactivated" : "active" end |