Module: Faktory::Job::ClassMethods

Defined in:
lib/faktory/testing.rb,
lib/faktory/job.rb

Overview

The Faktory testing infrastructure overrides perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.

This is similar to ActionMailer’s :test delivery_method and its ActionMailer::Base.deliveries array.

Example:

require 'faktory/testing'

assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]

assert_equal 0, Faktory::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('[email protected]')
assert_equal 1, Faktory::Extensions::DelayedMailer.jobs.size

You can also clear and drain all workers’ jobs:

assert_equal 0, Faktory::Extensions::DelayedMailer.jobs.size
assert_equal 0, Faktory::Extensions::DelayedModel.jobs.size

MyMailer.delay.send_welcome_email('[email protected]')
MyModel.delay.do_something_hard

assert_equal 1, Faktory::Extensions::DelayedMailer.jobs.size
assert_equal 1, Faktory::Extensions::DelayedModel.jobs.size

Faktory::Worker.clear_all # or .drain_all

assert_equal 0, Faktory::Extensions::DelayedMailer.jobs.size
assert_equal 0, Faktory::Extensions::DelayedModel.jobs.size

This can be useful to make sure jobs don’t linger between tests:

RSpec.configure do |config|
  config.before(:each) do
    Faktory::Worker.clear_all
  end
end

or for acceptance testing, i.e. with cucumber:

AfterStep do
  Faktory::Worker.drain_all
end

When I sign up as "[email protected]"
Then I should receive a welcome email to "[email protected]"

Constant Summary collapse

ACCESSOR_MUTEX =
Mutex.new

Instance Method Summary collapse

Instance Method Details

#clearObject

Clear all jobs for this worker



273
274
275
# File 'lib/faktory/testing.rb', line 273

def clear
  Queues.clear_for(queue, to_s)
end

#drainObject

Drain and run all jobs for this worker



278
279
280
281
282
283
284
# File 'lib/faktory/testing.rb', line 278

def drain
  while jobs.any?
    next_job = jobs.first
    Queues.delete_for(next_job["jid"], next_job["queue"], to_s)
    process_job(next_job)
  end
end

#execute_job(worker, args) ⇒ Object



303
304
305
# File 'lib/faktory/testing.rb', line 303

def execute_job(worker, args)
  worker.perform(*args)
end

#faktory_class_attribute(*attrs) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/faktory/job.rb', line 162

def faktory_class_attribute(*attrs)
  instance_reader = true
  instance_writer = true

  attrs.each do |name|
    synchronized_getter = "__synchronized_#{name}"

    singleton_class.instance_eval do
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
    end

    define_singleton_method(synchronized_getter) { nil }
    singleton_class.class_eval do
      private(synchronized_getter)
    end

    define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } }

    ivar = "@#{name}"

    singleton_class.instance_eval do
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
    end
    define_singleton_method(:"#{name}=") do |val|
      singleton_class.class_eval do
        ACCESSOR_MUTEX.synchronize do
          undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter)
          define_method(synchronized_getter) { val }
        end
      end

      if singleton_class?
        class_eval do
          undef_method(name) if method_defined?(name) || private_method_defined?(name)
          define_method(name) do
            if instance_variable_defined? ivar
              instance_variable_get ivar
            else
              singleton_class.send name
            end
          end
        end
      end
      val
    end

    if instance_reader
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
      define_method(name) do
        if instance_variable_defined?(ivar)
          instance_variable_get ivar
        else
          self.class.public_send name
        end
      end
    end

    if instance_writer
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
      attr_writer name
    end
  end
end

#faktory_options(opts = {}) ⇒ Object

Allows customization of Faktory features for this type of Job. Legal options:

queue - use a named queue for this Job, default 'default'
retry - enable automatic retry for this Job, *Integer* count, default 25
backtrace - whether to save the error backtrace in the job payload to display in web UI,
   an integer number of lines to save, default *0*


153
154
155
156
# File 'lib/faktory/job.rb', line 153

def faktory_options(opts = {})
  # stringify
  self.faktory_options_hash = get_faktory_options.merge(opts.map { |k, v| [k.to_s, v] }.to_h)
end

#get_faktory_optionsObject

:nodoc:



158
159
160
# File 'lib/faktory/job.rb', line 158

def get_faktory_options # :nodoc:
  self.faktory_options_hash ||= Faktory.default_job_options
end

#jobsObject

Jobs queued for this worker



268
269
270
# File 'lib/faktory/testing.rb', line 268

def jobs
  Queues.jobs_by_worker[to_s]
end

#perform_async(*args) ⇒ Object



133
134
135
# File 'lib/faktory/job.rb', line 133

def perform_async(*args)
  set({}).perform_async(*args)
end

#perform_in(interval, *args) ⇒ Object Also known as: perform_at

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).


139
140
141
# File 'lib/faktory/job.rb', line 139

def perform_in(interval, *args)
  set({}).perform_in(interval, *args)
end

#perform_oneObject

Pop out a single job and perform it

Raises:



287
288
289
290
291
292
# File 'lib/faktory/testing.rb', line 287

def perform_one
  raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
  next_job = jobs.first
  Queues.delete_for(next_job["jid"], queue, to_s)
  process_job(next_job)
end

#process_job(job) ⇒ Object



294
295
296
297
298
299
300
301
# File 'lib/faktory/testing.rb', line 294

def process_job(job)
  worker = new
  worker.jid = job["jid"]
  worker.bid = job["bid"] if worker.respond_to?(:bid=)
  # Faktory::Testing.server_middleware.invoke(worker, job, job['queue']) do
  execute_job(worker, job["args"])
  # end
end

#queueObject

Queue for this worker



263
264
265
# File 'lib/faktory/testing.rb', line 263

def queue
  faktory_options["queue"]
end

#set(options) ⇒ Object



127
128
129
130
131
# File 'lib/faktory/job.rb', line 127

def set(options)
  opts = get_faktory_options.dup
  opts["jobtype"] = self
  Setter.new(Util.deep_merge(opts, options))
end