Class: SideJob::ServerMiddleware

Inherits:
Object
  • Object
show all
Defined in:
lib/sidejob/server_middleware.rb

Overview

This middleware is primarily responsible for changing job status depending on events Job sets status to terminating or queued when a job is queued All other job status changes happen here For simplicity, a job is allowed to be queued multiple times in the Sidekiq queue Only when it gets pulled out to be run, i.e. here, we decide if we want to actually run it

Class Attribute Summary collapse

Instance Method Summary collapse

Class Attribute Details

.raise_errorsObject

If true, we do not rescue or log errors and instead propagate errors (useful for testing)



10
11
12
# File 'lib/sidejob/server_middleware.rb', line 10

def raise_errors
  @raise_errors
end

Instance Method Details

#call(worker, msg, queue) ⇒ Object

Called by sidekiq as a server middleware to handle running a worker

Parameters:

  • worker (SideJob::Worker)
  • msg (Hash)

    Sidekiq message format

  • queue (String)

    Queue the job was pulled from



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sidejob/server_middleware.rb', line 17

def call(worker, msg, queue)
  # handle non SideJob workers (i.e. normal Sidekiq jobs)
  unless worker.is_a?(SideJob::Worker)
    yield
    return
  end

  @worker = worker
  return unless @worker.exists? # make sure the job has not been deleted

  # only run if status is queued or terminating
  case @worker.status
    when 'queued', 'terminating'
    else
      return
  end

  # We use the presence of this lock:worker key to indicate that a worker is trying to the get the job lock.
  # No other worker needs to also wait and no calls to {SideJob::Job#run} need to queue a new run.
  return unless SideJob.redis.set("#{@worker.redis_key}:lock:worker", 1, {nx: true, ex: 2})

  # Obtain a lock to allow only one worker to run at a time to simplify workers from having to deal with concurrency
  token = @worker.lock(CONFIGURATION[:lock_expiration])
  if token
    begin
      SideJob.redis.del "#{@worker.redis_key}:lock:worker"
      SideJob.context(job: @worker.id) do
        case @worker.status
          when 'queued'
            run_worker { yield }
          when 'terminating'
            terminate_worker
          # else no longer need running
        end
      end
    ensure
      @worker.unlock(token)
      @worker.run(parent: true) # run the parent every time worker runs
    end
  else
    SideJob.redis.del "#{@worker.redis_key}:lock:worker"
    # Unable to obtain job lock which may indicate another worker thread is running
    # Schedule another run
    # Note that the actual time before requeue depends on sidekiq poll_interval (default 15 seconds)
    case @worker.status
      when 'queued', 'terminating'
        @worker.run(wait: 1)
      # else no longer need running
    end
  end
end