Module: Amigo
- Defined in:
- lib/amigo.rb,
lib/amigo/job.rb,
lib/amigo/retry.rb,
lib/amigo/router.rb,
lib/amigo/version.rb,
lib/amigo/autoscaler.rb,
lib/amigo/audit_logger.rb,
lib/amigo/spec_helpers.rb,
lib/amigo/scheduled_job.rb,
lib/amigo/deprecated_jobs.rb,
lib/amigo/autoscaler/heroku.rb,
lib/amigo/queue_backoff_job.rb,
lib/amigo/semaphore_backoff_job.rb,
lib/amigo/semaphore_backoff_job.rb,
lib/amigo/rate_limited_error_handler.rb
Overview
Wrap another Sidekiq error handler so invoking it is rate limited.
Useful when wrapping a usage-based error reporter like Sentry, which can be hammered in the case of an issue like connectivity that causes all jobs and retries to fail. It is suggested that all errors are still reported to something like application logs, since entirely silencing errors can make debugging problems tricky.
Usage:
Sidekiq.configure_server do |config|
config.error_handlers << Amigo::RateLimitedErrorHandler.new(
Sentry::Sidekiq::ErrorHandler.new,
sample_rate: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_SAMPLE_RATE', '0.5').to_f,
ttl: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_TTL', '120').to_f,
)
end
See notes about sample_rate
and ttl
, and fingerprint
for how exceptions are fingerprinted for uniqueness.
Rate limiting is done in-memory so is unique across the entire process- threads/workers share rate limiting, but multiple processes do not. So if 2 processes have 10 threads each, the error handler would be invoked twice if they all error for the same reason.
Thread-based limiting (20 errors in the case above) or cross-process limiting (1 error in the case above) can be added in the future.
Defined Under Namespace
Modules: DeprecatedJobs, Job, QueueBackoffJob, Retry, ScheduledJob, SemaphoreBackoffJob, SpecHelpers Classes: AuditLogger, Autoscaler, Error, Event, RateLimitedErrorHandler, Router, StartSchedulerFailed
Constant Summary collapse
- VERSION =
"1.8.0"
Class Attribute Summary collapse
-
.audit_logger_class ⇒ Object
Returns the value of attribute audit_logger_class.
-
.log_callback ⇒ Object
Proc called with [job, level, message, params].
-
.on_publish_error ⇒ Object
A single callback to be run when an event publication errors, almost always due to an error in a subscriber.
-
.registered_jobs ⇒ Object
Every subclass of Amigo::Job and Amigo::ScheduledJob goes here.
-
.router_class ⇒ Object
Returns the value of attribute router_class.
-
.structured_logging ⇒ Object
Returns the value of attribute structured_logging.
-
.subscribers ⇒ Object
An Array of callbacks to be run when an event is published.
-
.synchronous_mode ⇒ Object
If true, perform event work synchronously rather than asynchronously.
Class Method Summary collapse
- ._subscriber(event) ⇒ Object
-
.install_amigo_jobs ⇒ Object
Install Amigo so that every publish will be sent to the AuditLogger job and will invoke the relevant jobs in registered_jobs via the Router job.
- .log(job, level, message, params) ⇒ Object
-
.publish(eventname, *payload) ⇒ Object
Publish an event with the specified
eventname
andpayload
to any configured publishers. - .register_job(job) ⇒ Object
-
.register_subscriber(&block) ⇒ Object
Register a hook to be called when an event is sent.
-
.registered_event_jobs ⇒ Object
Return an array of all Job subclasses that respond to event publishing (have patterns).
-
.registered_scheduled_jobs ⇒ Object
Return an array of all Job subclasses that are scheduled (have intervals).
- .reset_logging ⇒ Object
-
.start_scheduler(load_from_hash = Sidekiq::Cron::Job.method(:load_from_hash)) ⇒ Object
Start the scheduler.
- .unregister_subscriber(block_ref) ⇒ Object
Class Attribute Details
.audit_logger_class ⇒ Object
Returns the value of attribute audit_logger_class.
109 110 111 |
# File 'lib/amigo.rb', line 109 def audit_logger_class @audit_logger_class end |
.log_callback ⇒ Object
Proc called with [job, level, message, params]. By default, logs to the job’s logger (or Sidekiq’s if job is nil). If structured_logging is true, the message will be an ‘event’ string (like ‘registered_subscriber’) without any dynamic info. If structured_logging is false, the params will be rendered into the message so are suitable for unstructured logging. Also, the params will also have an :log_message key which will contain the original log message.
118 119 120 |
# File 'lib/amigo.rb', line 118 def log_callback @log_callback end |
.on_publish_error ⇒ Object
A single callback to be run when an event publication errors, almost always due to an error in a subscriber.
The callback receives the exception, the event being published, and the erroring subscriber.
If this is not set, errors from subscribers will be re-raised immediately, since broken subscribers usually indicate a broken application.
Note also that when an error occurs, Amigo.log is always called first. You do NOT need a callback that just logs and swallows the error. If all you want to do is log, and not propogate the error, you can use ‘Amigo.on_publish_error = proc {}`.
158 159 160 |
# File 'lib/amigo.rb', line 158 def on_publish_error @on_publish_error end |
.registered_jobs ⇒ Object
Every subclass of Amigo::Job and Amigo::ScheduledJob goes here. It is used for routing and testing isolated jobs.
141 142 143 |
# File 'lib/amigo.rb', line 141 def registered_jobs @registered_jobs end |
.router_class ⇒ Object
Returns the value of attribute router_class.
109 110 111 |
# File 'lib/amigo.rb', line 109 def router_class @router_class end |
.structured_logging ⇒ Object
Returns the value of attribute structured_logging.
109 110 111 |
# File 'lib/amigo.rb', line 109 def structured_logging @structured_logging end |
.subscribers ⇒ Object
An Array of callbacks to be run when an event is published.
144 145 146 |
# File 'lib/amigo.rb', line 144 def subscribers @subscribers end |
.synchronous_mode ⇒ Object
If true, perform event work synchronously rather than asynchronously. Only useful for testing.
137 138 139 |
# File 'lib/amigo.rb', line 137 def synchronous_mode @synchronous_mode end |
Class Method Details
._subscriber(event) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/amigo.rb', line 212 def _subscriber(event) event_json = event.as_json begin self.audit_logger_class.perform_async(event_json) rescue StandardError => e # If the audit logger cannot perform, let's say because Redis is down, # we can run the job manually. This is pretty important for anything used for auditing; # it should be as resilient as possible. self.log(nil, :error, "amigo_audit_log_subscriber_error", error: e, event: event_json) self.audit_logger_class.new.perform(event_json) end self.router_class.perform_async(event_json) end |
.install_amigo_jobs ⇒ Object
Install Amigo so that every publish will be sent to the AuditLogger job and will invoke the relevant jobs in registered_jobs via the Router job.
206 207 208 209 210 |
# File 'lib/amigo.rb', line 206 def install_amigo_jobs return self.register_subscriber do |ev| self._subscriber(ev) end end |
.log(job, level, message, params) ⇒ Object
125 126 127 128 129 130 131 132 133 |
# File 'lib/amigo.rb', line 125 def log(job, level, , params) params ||= {} if !self.structured_logging && !params.empty? paramstr = params.map { |k, v| "#{k}=#{v}" }.join(" ") params[:log_message] = = "#{} #{paramstr}" end self.log_callback[job, level, , params] end |
.publish(eventname, *payload) ⇒ Object
Publish an event with the specified eventname
and payload
to any configured publishers.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/amigo.rb', line 162 def publish(eventname, *payload) ev = Event.new(SecureRandom.uuid, eventname, payload) self.subscribers.to_a.each do |hook| hook.call(ev) rescue StandardError => e self.log(nil, :error, "amigo_subscriber_hook_error", error: e, hook: hook, event: ev&.as_json) raise e if self.on_publish_error.nil? if self.on_publish_error.respond_to?(:arity) && self.on_publish_error.arity == 1 self.on_publish_error.call(e) else self.on_publish_error.call(e, ev, hook) end end end |
.register_job(job) ⇒ Object
226 227 228 229 |
# File 'lib/amigo.rb', line 226 def register_job(job) self.registered_jobs << job self.registered_jobs.uniq! end |
.register_subscriber(&block) ⇒ Object
Register a hook to be called when an event is sent. If a subscriber errors, on_publish_error is called with the exception, event, and subscriber.
180 181 182 183 184 185 |
# File 'lib/amigo.rb', line 180 def register_subscriber(&block) raise LocalJumpError, "no block given" unless block self.log nil, :info, "amigo_installed_subscriber", block: block self.subscribers << block return block end |
.registered_event_jobs ⇒ Object
Return an array of all Job subclasses that respond to event publishing (have patterns).
192 193 194 |
# File 'lib/amigo.rb', line 192 def registered_event_jobs return self.registered_jobs.select(&:event_job?) end |
.registered_scheduled_jobs ⇒ Object
Return an array of all Job subclasses that are scheduled (have intervals).
197 198 199 |
# File 'lib/amigo.rb', line 197 def registered_scheduled_jobs return self.registered_jobs.select(&:scheduled_job?) end |
.reset_logging ⇒ Object
120 121 122 123 |
# File 'lib/amigo.rb', line 120 def reset_logging self.log_callback = ->(job, level, msg, _params) { (job || Sidekiq).logger.send(level, msg) } self.structured_logging = false end |
.start_scheduler(load_from_hash = Sidekiq::Cron::Job.method(:load_from_hash)) ⇒ Object
Start the scheduler. This should generally be run in the Sidekiq worker process, not a webserver process.
234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/amigo.rb', line 234 def start_scheduler(load_from_hash=Sidekiq::Cron::Job.method(:load_from_hash)) hash = self.registered_scheduled_jobs.each_with_object({}) do |job, memo| self.log(nil, :info, "scheduling_job_cron", {job_name: job.name, job_cron: job.cron_expr}) memo[job.name] = { "class" => job.name, "cron" => job.cron_expr, } end load_errs = load_from_hash.call(hash) || {} raise StartSchedulerFailed, "Errors loading sidekiq-cron jobs: %p" % [load_errs] unless load_errs.empty? end |
.unregister_subscriber(block_ref) ⇒ Object
187 188 189 |
# File 'lib/amigo.rb', line 187 def unregister_subscriber(block_ref) self.subscribers.delete(block_ref) end |