Module: Sidekiq::Worker::ClassMethods
- Defined in:
- lib/sidekiq/testing.rb,
lib/sidekiq/worker.rb
Overview
The Sidekiq testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.
This is similar to ActionMailer’s :test delivery_method and its ActionMailer::Base.deliveries array.
Example:
require 'sidekiq/testing'
assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
You can also clear and drain all workers’ jobs:
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
MyModel.delay.do_something_hard
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size
Sidekiq::Worker.clear_all # or .drain_all
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size
This can be useful to make sure jobs don’t linger between tests:
RSpec.configure do |config|
config.before(:each) do
Sidekiq::Worker.clear_all
end
end
or for acceptance testing, i.e. with cucumber:
AfterStep do
Sidekiq::Worker.drain_all
end
When I sign up as "[email protected]"
Then I should receive a welcome email to "[email protected]"
Constant Summary collapse
- ACCESSOR_MUTEX =
Mutex.new
Instance Method Summary collapse
-
#clear ⇒ Object
Clear all jobs for this worker.
-
#client_push(item) ⇒ Object
:nodoc:.
- #delay(*args) ⇒ Object
- #delay_for(*args) ⇒ Object
- #delay_until(*args) ⇒ Object
-
#drain ⇒ Object
Drain and run all jobs for this worker.
- #execute_job(worker, args) ⇒ Object
-
#get_sidekiq_options ⇒ Object
:nodoc:.
-
#jobs ⇒ Object
Jobs queued for this worker.
- #perform_async(*args) ⇒ Object
-
#perform_in(interval, *args) ⇒ Object
(also: #perform_at)
interval
must be a timestamp, numeric or something that acts numeric (like an activesupport time interval). -
#perform_one ⇒ Object
Pop out a single job and perform it.
- #process_job(job) ⇒ Object
-
#queue ⇒ Object
Queue for this worker.
- #set(options) ⇒ Object
- #sidekiq_class_attribute(*attrs) ⇒ Object
-
#sidekiq_options(opts = {}) ⇒ Object
Allows customization for this type of Worker.
- #sidekiq_retries_exhausted(&block) ⇒ Object
- #sidekiq_retry_in(&block) ⇒ Object
Instance Method Details
#clear ⇒ Object
Clear all jobs for this worker
270 271 272 |
# File 'lib/sidekiq/testing.rb', line 270 def clear Queues.clear_for(queue, self.to_s) end |
#client_push(item) ⇒ Object
:nodoc:
142 143 144 145 146 147 148 149 150 |
# File 'lib/sidekiq/worker.rb', line 142 def client_push(item) # :nodoc: pool = Thread.current[:sidekiq_via_pool] || ['pool'] || Sidekiq.redis_pool # stringify item.keys.each do |key| item[key.to_s] = item.delete(key) end Sidekiq::Client.new(pool).push(item) end |
#delay(*args) ⇒ Object
76 77 78 |
# File 'lib/sidekiq/worker.rb', line 76 def delay(*args) raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async" end |
#delay_for(*args) ⇒ Object
80 81 82 |
# File 'lib/sidekiq/worker.rb', line 80 def delay_for(*args) raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in" end |
#delay_until(*args) ⇒ Object
84 85 86 |
# File 'lib/sidekiq/worker.rb', line 84 def delay_until(*args) raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at" end |
#drain ⇒ Object
Drain and run all jobs for this worker
275 276 277 278 279 280 281 |
# File 'lib/sidekiq/testing.rb', line 275 def drain while jobs.any? next_job = jobs.first Queues.delete_for(next_job["jid"], next_job["queue"], self.to_s) process_job(next_job) end end |
#execute_job(worker, args) ⇒ Object
300 301 302 |
# File 'lib/sidekiq/testing.rb', line 300 def execute_job(worker, args) worker.perform(*args) end |
#get_sidekiq_options ⇒ Object
:nodoc:
138 139 140 |
# File 'lib/sidekiq/worker.rb', line 138 def # :nodoc: self. ||= Sidekiq. end |
#jobs ⇒ Object
Jobs queued for this worker
265 266 267 |
# File 'lib/sidekiq/testing.rb', line 265 def jobs Queues.jobs_by_worker[self.to_s] end |
#perform_async(*args) ⇒ Object
92 93 94 |
# File 'lib/sidekiq/worker.rb', line 92 def perform_async(*args) client_push('class' => self, 'args' => args) end |
#perform_in(interval, *args) ⇒ Object Also known as: perform_at
interval
must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/sidekiq/worker.rb', line 98 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) item = { 'class' => self, 'args' => args, 'at' => ts } # Optimization to enqueue something now that is scheduled to go out now or in the past item.delete('at') if ts <= now client_push(item) end |
#perform_one ⇒ Object
Pop out a single job and perform it
284 285 286 287 288 289 |
# File 'lib/sidekiq/testing.rb', line 284 def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? next_job = jobs.first Queues.delete_for(next_job["jid"], queue, self.to_s) process_job(next_job) end |
#process_job(job) ⇒ Object
291 292 293 294 295 296 297 298 |
# File 'lib/sidekiq/testing.rb', line 291 def process_job(job) worker = new worker.jid = job['jid'] worker.bid = job['bid'] if worker.respond_to?(:bid=) Sidekiq::Testing.server_middleware.invoke(worker, job, job['queue']) do execute_job(worker, job['args']) end end |
#queue ⇒ Object
Queue for this worker
260 261 262 |
# File 'lib/sidekiq/testing.rb', line 260 def queue self.["queue"] end |
#set(options) ⇒ Object
88 89 90 |
# File 'lib/sidekiq/worker.rb', line 88 def set() Setter.new(self, ) end |
#sidekiq_class_attribute(*attrs) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/sidekiq/worker.rb', line 152 def sidekiq_class_attribute(*attrs) instance_reader = true instance_writer = true attrs.each do |name| synchronized_getter = "__synchronized_#{name}" singleton_class.instance_eval do undef_method(name) if method_defined?(name) || private_method_defined?(name) end define_singleton_method(synchronized_getter) { nil } singleton_class.class_eval do private(synchronized_getter) end define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } } ivar = "@#{name}" singleton_class.instance_eval do m = "#{name}=" undef_method(m) if method_defined?(m) || private_method_defined?(m) end define_singleton_method("#{name}=") do |val| singleton_class.class_eval do ACCESSOR_MUTEX.synchronize do undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter) define_method(synchronized_getter) { val } end end if singleton_class? class_eval do undef_method(name) if method_defined?(name) || private_method_defined?(name) define_method(name) do if instance_variable_defined? ivar instance_variable_get ivar else singleton_class.send name end end end end val end if instance_reader undef_method(name) if method_defined?(name) || private_method_defined?(name) define_method(name) do if instance_variable_defined?(ivar) instance_variable_get ivar else self.class.public_send name end end end if instance_writer m = "#{name}=" undef_method(m) if method_defined?(m) || private_method_defined?(m) attr_writer name end end end |
#sidekiq_options(opts = {}) ⇒ Object
Allows customization for this type of Worker. Legal options:
queue - use a named queue for this Worker, default 'default'
retry - enable the RetryJobs middleware for this Worker, *true* to use the default
or *Integer* count
backtrace - whether to save any error backtrace in the retry payload to display in web UI,
can be true, false or an integer number of lines to save, default *false*
pool - use the given Redis connection pool to push this type of job to a given shard.
In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.
125 126 127 128 |
# File 'lib/sidekiq/worker.rb', line 125 def (opts={}) # stringify self. = .merge(Hash[opts.map{|k, v| [k.to_s, v]}]) end |
#sidekiq_retries_exhausted(&block) ⇒ Object
134 135 136 |
# File 'lib/sidekiq/worker.rb', line 134 def sidekiq_retries_exhausted(&block) self.sidekiq_retries_exhausted_block = block end |
#sidekiq_retry_in(&block) ⇒ Object
130 131 132 |
# File 'lib/sidekiq/worker.rb', line 130 def sidekiq_retry_in(&block) self.sidekiq_retry_in_block = block end |