Class: Hookshot

Inherits:
Object
  • Object
show all
Defined in:
lib/hookshot.rb,
lib/hookshot/version.rb

Constant Summary collapse

PREFIX =
"hookshot"
NEW_JOBS_LIST =
"#{PREFIX}:jobs"
DELAYED_SET =
"#{PREFIX}:delayed"
FAILURES_LIST =
"#{PREFIX}:failures"
BLACKLIST =
"#{PREFIX}:throttle:blacklist"
WHITELIST =
"#{PREFIX}:throttle:whitelist"
FINAL_FAILURE =

Special sentinel value returned from get_next_failure when the job will no longer be retried.

-1
JOB_KEY_LIFETIME =

Jobs are retried for up to 48 hours. Though we delete the job info when hookshot is done with each job, it’s still a good idea to clean up any keys that have managed to stick around somehow.

86400 * 4
FailureQueueEmpty =

Raised by get_next_failure if there are no pending failures.

Class.new(StandardError)
VERSION =
"0.3.2"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis) ⇒ Hookshot

Hookshot expects to be initialized with an instance of Redis provided by the redis rubygem.

Example:

require 'hookshot'
require 'redis'
hookshot = Hookshot.new(Redis.new(port: 6379, host: 'localhost'))


37
38
39
# File 'lib/hookshot.rb', line 37

def initialize(redis)
  @redis = redis
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



27
28
29
# File 'lib/hookshot.rb', line 27

def redis
  @redis
end

Instance Method Details

#blacklistObject

blacklist returns an array of currently-blacklisted domains.

Example:

hookshot.blacklist
#=> ["example.com"]


203
204
205
# File 'lib/hookshot.rb', line 203

def blacklist
  redis.smembers(BLACKLIST)
end

#blacklist!(domain) ⇒ Object

blacklist! adds a domain to the list of currently blacklisted domains. Any jobs submitted with a domain in this list will be automatically dropped by hookshot.

The format of the domain should be the full domain. The port should not be included if it is 80 or 443. For example:

| URL | Domain | |—————————|———————| | example.com/post | example.com | | example.com/post | example.com | | example.com:8000 | example.com:8000 | | example.com:80 | example.com |

Example:

hookshot.blacklist!("example.com")


172
173
174
# File 'lib/hookshot.rb', line 172

def blacklist!(domain)
  redis.sadd(BLACKLIST, domain)
end

#enqueue(url:, headers:, context:, payload:, activate_at: nil) ⇒ Object

enqueue takes a URL, a hash of headers, a payload (request body), and a context value. It submits these values to hookshot for processing.

The context value can be any string, and will be returned to you via get_next_failure if the job can’t be successfully completed after about 48 hours of retries.

In Shopify, we pass our WebhookSubscription object ID for the context value, so that we can notify merchants when their webhooks are failing, and delete subscriptions that fail consistently.

activate_at is an optional parameter that specifies a number of seconds to wait before making this job active in hookshot. You should normally call enqueue_in rather than pass activate_at explicitly.

Example:

hookshot.enqueue(
  url: 'http://localhost:8080/post',
  headers: {"X-My-Header" => "value"},
  context: "42",
  payload: "request body")


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/hookshot.rb', line 63

def enqueue(url:, headers:, context:, payload:, activate_at: nil)
  uuid = SecureRandom.uuid
  redis.pipelined do
    redis.hmset(
      job_key(uuid),
      "url", url,
      "headers", serialize_headers(headers),
      "context", context,
      "payload", payload,
      "failures", 0)
    redis.expire(job_key(uuid), JOB_KEY_LIFETIME)
    if activate_at
      redis.zadd(DELAYED_SET, activate_at, uuid)
    else
      redis.lpush(NEW_JOBS_LIST, uuid)
    end
  end

  uuid
end

#enqueue_in(duration, url:, headers:, context:, payload:) ⇒ Object

enqeueue_in calls enqueue with an activate_at parameter to delay the job’s execution.

Example:

hookshot.enqueue_in(60, # seconds
  url: 'http://localhost:8080/post',
  headers: {"X-My-Header" => "value"},
  context: "42",
  payload: "request body")


94
95
96
97
98
99
100
101
102
# File 'lib/hookshot.rb', line 94

def enqueue_in(duration, url:, headers:, context:, payload:)
  enqueue_time = (Time.now + duration).to_i
  enqueue(
    url: url,
    headers: headers,
    context: context,
    payload: payload,
    activate_at: enqueue_time)
end

#get_next_failureObject

Jobs that fail many times in a row are returned back to the application. Specifically, the context value passed in via the enqueue* methods is returned.

get_next_failure returns two values: the number of failures so far for this job, and the context passed in with the job. If nfailures is equal to Hookshot::FINAL_FAILURE, the job will not be retried. However, if nfailures is any other value, the job will still be retried; this is just an advisory because the job has been failing for at least 24 hours.

This method is non-blocking: if there is no item present in the failures queue, it will raise FailureQueueEmpty.

Example:

loop {
  nfailures, context = hookshot.get_next_failure
  if nfailures == Hookshot::FINAL_FAILURE
    delete_webhook_subscription(context)
  end
}

Raises:



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/hookshot.rb', line 125

def get_next_failure
  # block indefinitely waiting for the next failure.
  line = redis.lpop(FAILURES_LIST)
  raise FailureQueueEmpty if line.nil?

  nfailures, failed_id = line.split('|', 2)

  if nfailures.to_i == 0 || failed_id.empty?
    raise "Invalid line from hookshot: #{line}"
  end

  [nfailures.to_i, failed_id]
end

#queue_statsObject

Hookshot writes a lot of statistics to statsd/datadog, but to quickly check the current queue sizes, use queue_stats.

Example:

hookshot.queue_stats
# => { pending: 42, delayed: 42, failures: 42 }


146
147
148
149
150
151
152
153
# File 'lib/hookshot.rb', line 146

def queue_stats
  pending, delayed, failures = redis.pipelined do
    redis.llen NEW_JOBS_LIST
    redis.zcard DELAYED_SET
    redis.llen FAILURES_LIST
  end
  { pending: pending, delayed: delayed, failures: failures }
end

#remove_blacklist(domain) ⇒ Object

remove_blacklist removes a currently-blacklisted domain from the blacklist. If the domain was not blacklisted, this method has no effect.

Example:

hookshot.remove_blacklist("google.com")


223
224
225
# File 'lib/hookshot.rb', line 223

def remove_blacklist(domain)
  redis.srem(BLACKLIST, domain)
end

#remove_whitelist(domain) ⇒ Object

remove_whitelist removes a currently-whitelisted domain from the whitelist. If the domain was not whitelisted, this method has no effect.

Example:

hookshot.remove_whitelist("google.com")


233
234
235
# File 'lib/hookshot.rb', line 233

def remove_whitelist(domain)
  redis.srem(WHITELIST, domain)
end

#whitelistObject

whitelist returns an array of currently-whitelisted domains.

Example:

hookshot.whitelist
#=> ["example.com"]


213
214
215
# File 'lib/hookshot.rb', line 213

def whitelist
  redis.smembers(WHITELIST)
end

#whitelist!(domain) ⇒ Object

whitelist! adds a domain to the list of currently whitelisted domains. Normally, jobs are throttled to a maximum requests per second per domain. Whitelisted domains are granted a much higher initial rate.

The format of the domain should be the full domain. The port should not be included if it is 80 or 443. For example:

| URL | Domain | |—————————|———————| | example.com/post | example.com | | example.com/post | example.com | | example.com:8000 | example.com:8000 | | example.com:80 | example.com |

Example:

hookshot.whitelist!("google.com")


193
194
195
# File 'lib/hookshot.rb', line 193

def whitelist!(domain)
  redis.sadd(WHITELIST, domain)
end