Module: Faktory::Job::ClassMethods
- Defined in:
- lib/faktory/testing.rb,
lib/faktory/job.rb
Overview
The Faktory testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.
This is similar to ActionMailer’s :test delivery_method and its ActionMailer::Base.deliveries array.
Example:
require 'faktory/testing'
assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]
assert_equal 0, Faktory::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
assert_equal 1, Faktory::Extensions::DelayedMailer.jobs.size
You can also clear and drain all workers’ jobs:
assert_equal 0, Faktory::Extensions::DelayedMailer.jobs.size
assert_equal 0, Faktory::Extensions::DelayedModel.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
MyModel.delay.do_something_hard
assert_equal 1, Faktory::Extensions::DelayedMailer.jobs.size
assert_equal 1, Faktory::Extensions::DelayedModel.jobs.size
Faktory::Worker.clear_all # or .drain_all
assert_equal 0, Faktory::Extensions::DelayedMailer.jobs.size
assert_equal 0, Faktory::Extensions::DelayedModel.jobs.size
This can be useful to make sure jobs don’t linger between tests:
RSpec.configure do |config|
config.before(:each) do
Faktory::Worker.clear_all
end
end
or for acceptance testing, i.e. with cucumber:
AfterStep do
Faktory::Worker.drain_all
end
When I sign up as "[email protected]"
Then I should receive a welcome email to "[email protected]"
Constant Summary collapse
- ACCESSOR_MUTEX =
Mutex.new
Instance Method Summary collapse
-
#clear ⇒ Object
Clear all jobs for this worker.
-
#drain ⇒ Object
Drain and run all jobs for this worker.
- #execute_job(worker, args) ⇒ Object
- #faktory_class_attribute(*attrs) ⇒ Object
-
#faktory_options(opts = {}) ⇒ Object
Allows customization of Faktory features for this type of Job.
-
#get_faktory_options ⇒ Object
:nodoc:.
-
#jobs ⇒ Object
Jobs queued for this worker.
- #perform_async(*args) ⇒ Object
-
#perform_in(interval, *args) ⇒ Object
(also: #perform_at)
intervalmust be a timestamp, numeric or something that acts numeric (like an activesupport time interval). -
#perform_one ⇒ Object
Pop out a single job and perform it.
- #process_job(job) ⇒ Object
-
#queue ⇒ Object
Queue for this worker.
- #set(options) ⇒ Object
Instance Method Details
#clear ⇒ Object
Clear all jobs for this worker
273 274 275 |
# File 'lib/faktory/testing.rb', line 273 def clear Queues.clear_for(queue, to_s) end |
#drain ⇒ Object
Drain and run all jobs for this worker
278 279 280 281 282 283 284 |
# File 'lib/faktory/testing.rb', line 278 def drain while jobs.any? next_job = jobs.first Queues.delete_for(next_job["jid"], next_job["queue"], to_s) process_job(next_job) end end |
#execute_job(worker, args) ⇒ Object
303 304 305 |
# File 'lib/faktory/testing.rb', line 303 def execute_job(worker, args) worker.perform(*args) end |
#faktory_class_attribute(*attrs) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/faktory/job.rb', line 162 def faktory_class_attribute(*attrs) instance_reader = true instance_writer = true attrs.each do |name| synchronized_getter = "__synchronized_#{name}" singleton_class.instance_eval do undef_method(name) if method_defined?(name) || private_method_defined?(name) end define_singleton_method(synchronized_getter) { nil } singleton_class.class_eval do private(synchronized_getter) end define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } } ivar = "@#{name}" singleton_class.instance_eval do m = "#{name}=" undef_method(m) if method_defined?(m) || private_method_defined?(m) end define_singleton_method(:"#{name}=") do |val| singleton_class.class_eval do ACCESSOR_MUTEX.synchronize do undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter) define_method(synchronized_getter) { val } end end if singleton_class? class_eval do undef_method(name) if method_defined?(name) || private_method_defined?(name) define_method(name) do if instance_variable_defined? ivar instance_variable_get ivar else singleton_class.send name end end end end val end if instance_reader undef_method(name) if method_defined?(name) || private_method_defined?(name) define_method(name) do if instance_variable_defined?(ivar) instance_variable_get ivar else self.class.public_send name end end end if instance_writer m = "#{name}=" undef_method(m) if method_defined?(m) || private_method_defined?(m) attr_writer name end end end |
#faktory_options(opts = {}) ⇒ Object
Allows customization of Faktory features for this type of Job. Legal options:
queue - use a named queue for this Job, default 'default'
retry - enable automatic retry for this Job, *Integer* count, default 25
backtrace - whether to save the error backtrace in the job payload to display in web UI,
an integer number of lines to save, default *0*
153 154 155 156 |
# File 'lib/faktory/job.rb', line 153 def (opts = {}) # stringify self. = .merge(opts.map { |k, v| [k.to_s, v] }.to_h) end |
#get_faktory_options ⇒ Object
:nodoc:
158 159 160 |
# File 'lib/faktory/job.rb', line 158 def # :nodoc: self. ||= Faktory. end |
#jobs ⇒ Object
Jobs queued for this worker
268 269 270 |
# File 'lib/faktory/testing.rb', line 268 def jobs Queues.jobs_by_worker[to_s] end |
#perform_async(*args) ⇒ Object
133 134 135 |
# File 'lib/faktory/job.rb', line 133 def perform_async(*args) set({}).perform_async(*args) end |
#perform_in(interval, *args) ⇒ Object Also known as: perform_at
interval must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
139 140 141 |
# File 'lib/faktory/job.rb', line 139 def perform_in(interval, *args) set({}).perform_in(interval, *args) end |
#perform_one ⇒ Object
Pop out a single job and perform it
287 288 289 290 291 292 |
# File 'lib/faktory/testing.rb', line 287 def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? next_job = jobs.first Queues.delete_for(next_job["jid"], queue, to_s) process_job(next_job) end |
#process_job(job) ⇒ Object
294 295 296 297 298 299 300 301 |
# File 'lib/faktory/testing.rb', line 294 def process_job(job) worker = new worker.jid = job["jid"] worker.bid = job["bid"] if worker.respond_to?(:bid=) # Faktory::Testing.server_middleware.invoke(worker, job, job['queue']) do execute_job(worker, job["args"]) # end end |
#queue ⇒ Object
Queue for this worker
263 264 265 |
# File 'lib/faktory/testing.rb', line 263 def queue ["queue"] end |
#set(options) ⇒ Object
127 128 129 130 131 |
# File 'lib/faktory/job.rb', line 127 def set() opts = .dup opts["jobtype"] = self Setter.new(Util.deep_merge(opts, )) end |