Class: Sidekiq::Middleware::Server::MaxJobs

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/middleware/server/max_jobs.rb

Constant Summary collapse

VERSION =

Version

File.read(
  File.join(
    File.dirname(__FILE__),
    '..',
    '..',
    '..',
    '..',
    'VERSION'
  )
).strip
COUNTER_FOR_QUEUE_KEY_TEMPLATE =

Constant(s)

'COUNTER_%s'
COUNTER_KEY =
'COUNTER'
LOG_INITIALIZATION_TEMPLATE =
'Max-Jobs middleware enabled, shutting down pid: %d when quota is reached'
LOG_MAX_JOBS_QUOTA_MET_FOR_QUEUE_TEMPLATE =
'Max-Jobs queue quota met for: "%s", shutting down pid: %d'
LOG_MAX_JOBS_QUOTA_MET_TEMPLATE =
'Max-Jobs total quota met, shutting down pid: %d'
LOG_MAX_JOBS_RUNTIME_QUOTA_MET_TEMPLATE =
'Max-Jobs runtime quota met, shutting down pid: %d'
MAX_JOBS_FOR_QUEUE_KEY_TEMPLATE =
'MAX_JOBS_%s'
MAX_JOBS_JITTER_FOR_QUEUE_KEY_TEMPLATE =
'MAX_JOBS_JITTER_%s'
MAX_JOBS_JITTER_KEY =
'MAX_JOBS_JITTER'
MAX_JOBS_KEY =
'MAX_JOBS'
MAX_JOBS_RUNTIME_JITTER_KEY =
'MAX_JOBS_RUNTIME_JITTER'
MAX_JOBS_RUNTIME_WITH_JITTER_KEY =
'MAX_JOBS_RUNTIME_WITH_JITTER'
MAX_JOBS_RUNTIME_KEY =
'MAX_JOBS_RUNTIME'
MAX_JOBS_WITH_JITTER_FOR_QUEUE_KEY_TEMPLATE =
'MAX_JOBS_WITH_JITTER_%s'
MAX_JOBS_WITH_JITTER_KEY =
'MAX_JOBS_WITH_JITTER'
MUTEX_KEY =
'MUTEX'
PID_KEY =
'PID'
START_TIME_KEY =
'START_TIME'
TERM =
'TERM'
TERMINATING_KEY =
'TERMINATING'
DEFAULT_MAX_JOBS =

Default(s)

500
DEFAULT_MAX_JOBS_FOR_QUEUE =
-1
DEFAULT_MAX_JOBS_JITTER =
-1
DEFAULT_MAX_JOBS_JITTER_FOR_QUEUE =
-1
DEFAULT_MAX_JOBS_RUNTIME =
-1
DEFAULT_MAX_JOBS_RUNTIME_JITTER =
-1

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.cacheObject

Helper Method(s)



60
61
62
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 60

def cache
  @cache ||= {}
end

.counterObject



64
65
66
67
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 64

def counter
  key = COUNTER_KEY
  cache[key] ||= 0
end

.counter_for_queue(queue) ⇒ Object



69
70
71
72
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 69

def counter_for_queue(queue)
  key = format(COUNTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= 0
end

.increment_counter!Object



74
75
76
77
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 74

def increment_counter!
  key = COUNTER_KEY
  cache[key] = (cache[key] || 0).next
end

.increment_counter_for_queue!(queue) ⇒ Object



79
80
81
82
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 79

def increment_counter_for_queue!(queue)
  key = format(COUNTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] = (cache[key] || 0).next
end

.log_info(message) ⇒ Object



84
85
86
87
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 84

def log_info(message)
  logger_defined = defined?(::Sidekiq.logger)
  logger_defined ? ::Sidekiq.logger.info(message) : puts(message)
end

.log_initialization!Object



89
90
91
92
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 89

def log_initialization!
  message = format(LOG_INITIALIZATION_TEMPLATE, pid)
  log_info(message)
end

.log_max_jobs_quota_met!Object



94
95
96
97
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 94

def log_max_jobs_quota_met!
  message = format(LOG_MAX_JOBS_QUOTA_MET_TEMPLATE, pid)
  log_info(message)
end

.log_max_jobs_quota_met_for_queue!(queue) ⇒ Object



99
100
101
102
103
104
105
106
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 99

def log_max_jobs_quota_met_for_queue!(queue)
  message = format(
    LOG_MAX_JOBS_QUOTA_MET_FOR_QUEUE_TEMPLATE,
    queue,
    pid
  )
  log_info(message)
end

.log_max_jobs_runtime_quota_met!Object



108
109
110
111
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 108

def log_max_jobs_runtime_quota_met!
  message = format(LOG_MAX_JOBS_RUNTIME_QUOTA_MET_TEMPLATE, pid)
  log_info(message)
end

.max_jobsObject



113
114
115
116
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 113

def max_jobs
  key = MAX_JOBS_KEY
  cache[key] ||= (ENV[key] || DEFAULT_MAX_JOBS).to_i
end

.max_jobs_for_queue(queue) ⇒ Object



118
119
120
121
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 118

def max_jobs_for_queue(queue)
  key = format(MAX_JOBS_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= (ENV[key] || DEFAULT_MAX_JOBS_FOR_QUEUE).to_i
end

.max_jobs_jitterObject



123
124
125
126
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 123

def max_jobs_jitter
  key = MAX_JOBS_JITTER_KEY
  cache[key] ||= rand((ENV[key] || DEFAULT_MAX_JOBS_JITTER).to_i)
end

.max_jobs_jitter_for_queue(queue) ⇒ Object



128
129
130
131
132
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 128

def max_jobs_jitter_for_queue(queue)
  key = format(MAX_JOBS_JITTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= \
    rand((ENV[key] || DEFAULT_MAX_JOBS_JITTER_FOR_QUEUE).to_i)
end

.max_jobs_quota_met?Boolean

Returns:

  • (Boolean)


134
135
136
137
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 134

def max_jobs_quota_met?
  quota = max_jobs_with_jitter
  quota.positive? ? counter == quota : false
end

.max_jobs_quota_met_for_queue?(queue) ⇒ Boolean

Returns:

  • (Boolean)


139
140
141
142
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 139

def max_jobs_quota_met_for_queue?(queue)
  quota = max_jobs_with_jitter_for_queue(queue)
  quota.positive? ? counter_for_queue(queue) == quota : false
end

.max_jobs_runtimeObject



144
145
146
147
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 144

def max_jobs_runtime
  key = MAX_JOBS_RUNTIME_KEY
  cache[key] ||= (ENV[key] || DEFAULT_MAX_JOBS_RUNTIME).to_i
end

.max_jobs_runtime_jitterObject



149
150
151
152
153
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 149

def max_jobs_runtime_jitter
  key = MAX_JOBS_RUNTIME_JITTER_KEY
  cache[key] ||= \
    rand((ENV[key] || DEFAULT_MAX_JOBS_RUNTIME_JITTER).to_i)
end

.max_jobs_runtime_quota_met?Boolean

Returns:

  • (Boolean)


155
156
157
158
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 155

def max_jobs_runtime_quota_met?
  quota = max_jobs_runtime_with_jitter
  quota.positive? ? (::Time.now.to_i - start_time) >= quota : false
end

.max_jobs_runtime_with_jitterObject



160
161
162
163
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 160

def max_jobs_runtime_with_jitter
  key = MAX_JOBS_RUNTIME_WITH_JITTER_KEY
  cache[key] ||= (max_jobs_runtime + max_jobs_runtime_jitter)
end

.max_jobs_with_jitterObject



165
166
167
168
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 165

def max_jobs_with_jitter
  key = MAX_JOBS_WITH_JITTER_KEY
  cache[key] ||= (max_jobs + max_jobs_jitter)
end

.max_jobs_with_jitter_for_queue(queue) ⇒ Object



170
171
172
173
174
175
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 170

def max_jobs_with_jitter_for_queue(queue)
  key = \
    format(MAX_JOBS_WITH_JITTER_FOR_QUEUE_KEY_TEMPLATE, queue.upcase)
  cache[key] ||= \
    (max_jobs_for_queue(queue) + max_jobs_jitter_for_queue(queue))
end

.mutexObject



177
178
179
180
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 177

def mutex
  key = MUTEX_KEY
  cache[key] ||= ::Mutex.new
end

.pidObject



182
183
184
185
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 182

def pid
  key = PID_KEY
  cache[key] ||= ::Process.pid
end

.start_timeObject



187
188
189
190
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 187

def start_time
  key = START_TIME_KEY
  cache[key] ||= ::Time.now.to_i
end

.terminate!Object



192
193
194
195
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 192

def terminate!
  key = TERMINATING_KEY
  cache[key] = true && ::Process.kill(TERM, pid)
end

.terminating?Boolean

Returns:

  • (Boolean)


197
198
199
200
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 197

def terminating?
  key = TERMINATING_KEY
  cache[key] == true
end

Instance Method Details

#call(_, _, queue) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/sidekiq/middleware/server/max_jobs.rb', line 203

def call(
  _,     # worker-instance
  _,     # item
  queue
)
  exception_raised = false
  begin
    yield
  rescue Exception
    # Set the `exception_raised` boolean to `true` so that the counter
    # *is not* incremented in the `ensure` block
    exception_raised = true
    # Re-raise the `Exception` so that _Sidekiq_ can deal w/ it
    raise
  ensure
    if !exception_raised && !self.class.terminating?
      self.class.mutex.synchronize do
        # Controls whether or not the process will be TERMinated at the
        # end of the block
        terminate = false

        # First check if the runtime quota has been met
        if self.class.max_jobs_runtime_quota_met?
          self.class.log_max_jobs_runtime_quota_met!
          terminate = true
        end

        # Increment the total counter
        self.class.increment_counter!

        # Next, check if the total quota has been met
        if !terminate && self.class.max_jobs_quota_met?
          self.class.log_max_jobs_quota_met!
          terminate = true
        end

        # Increment the queue specific counter
        self.class.increment_counter_for_queue!(queue)

        # Last[ly], check if the queue quota has been met
        if !terminate && self.class.max_jobs_quota_met_for_queue?(queue)
          self.class.log_max_jobs_quota_met_for_queue!(queue)
          terminate = true
        end

        # If applicable, terminate
        self.class.terminate! if terminate
      end
    end
  end
end