Class: SideJob::ServerMiddleware
- Inherits:
-
Object
- Object
- SideJob::ServerMiddleware
- Defined in:
- lib/sidejob/server_middleware.rb
Overview
This middleware is primarily responsible for changing job status depending on events Job sets status to terminating or queued when a job is queued All other job status changes happen here For simplicity, a job is allowed to be queued multiple times in the Sidekiq queue Only when it gets pulled out to be run, i.e. here, we decide if we want to actually run it
Class Attribute Summary collapse
-
.raise_errors ⇒ Object
If true, we do not rescue or log errors and instead propagate errors (useful for testing).
Instance Method Summary collapse
-
#call(worker, msg, queue) ⇒ Object
Called by sidekiq as a server middleware to handle running a worker.
Class Attribute Details
.raise_errors ⇒ Object
If true, we do not rescue or log errors and instead propagate errors (useful for testing)
10 11 12 |
# File 'lib/sidejob/server_middleware.rb', line 10 def raise_errors @raise_errors end |
Instance Method Details
#call(worker, msg, queue) ⇒ Object
Called by sidekiq as a server middleware to handle running a worker
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/sidejob/server_middleware.rb', line 17 def call(worker, msg, queue) # handle non SideJob workers (i.e. normal Sidekiq jobs) unless worker.is_a?(SideJob::Worker) yield return end @worker = worker return unless @worker.exists? # make sure the job has not been deleted # only run if status is queued or terminating case @worker.status when 'queued', 'terminating' else return end # We use the presence of this lock:worker key to indicate that a worker is trying to the get the job lock. # No other worker needs to also wait and no calls to {SideJob::Job#run} need to queue a new run. return unless SideJob.redis.set("#{@worker.redis_key}:lock:worker", 1, {nx: true, ex: 2}) # Obtain a lock to allow only one worker to run at a time to simplify workers from having to deal with concurrency token = @worker.lock(CONFIGURATION[:lock_expiration]) if token begin SideJob.redis.del "#{@worker.redis_key}:lock:worker" SideJob.context(job: @worker.id) do case @worker.status when 'queued' run_worker { yield } when 'terminating' terminate_worker # else no longer need running end end ensure @worker.unlock(token) @worker.run(parent: true) # run the parent every time worker runs end else SideJob.redis.del "#{@worker.redis_key}:lock:worker" # Unable to obtain job lock which may indicate another worker thread is running # Schedule another run # Note that the actual time before requeue depends on sidekiq poll_interval (default 15 seconds) case @worker.status when 'queued', 'terminating' @worker.run(wait: 1) # else no longer need running end end end |