Module: Marj::JobsInterface

Included in:
Marj, Relation
Defined in:
lib/marj/jobs_interface.rb

Overview

The interface provided by Marj and Relation.

To create a custom jobs interface, for example for all job classes in your application:

class ApplicationJob < ActiveJob::Base
  extend Marj::JobsInterface

  def self.all
    Marj::Relation.new(self == ApplicationJob ? Marj::Record.ordered : Marj::Record.where(job_class: self))
  end
end

class SomeJob < ApplicationJob
  def perform(msg)
    puts msg
  end
end

This will allow you to query jobs via the ApplicationJob class:

ApplicationJob.next # Returns the next job of any type

Or to query jobs via a specific job class:

SomeJob.next # Returns the next SomeJob

Alternatively, to create a jobs interface for a single job class:

class SomeJob < ActiveJob::Base
  extend Marj::JobsInterface

  def self.all
    Marj::Relation.new(Marj::Record.where(job_class: self).ordered)
  end
end

Instance Method Summary collapse

Instance Method Details

#count(column_name = nil, &block) ⇒ Integer

Returns a count of jobs, optionally either matching the specified column name criteria or where the specified block returns true.

Parameters:

  • column_name (String, Symbol, NilClass) (defaults to: nil)
  • block (Proc, NilClass)

Returns:

  • (Integer)


71
72
73
# File 'lib/marj/jobs_interface.rb', line 71

def count(column_name = nil, &block)
  block_given? ? all.count(column_name) { |r| block.call(r.as_job) } : all.count(column_name)
end

#discard_allNumeric

Discards all jobs.

Returns:

  • (Numeric)

    the number of discarded jobs



109
110
111
# File 'lib/marj/jobs_interface.rb', line 109

def discard_all
  all.delete_all
end

#dueMarj::Relation

Returns a Relation for enqueued jobs with a scheduled_at that is either null or in the past.

Returns:



86
87
88
# File 'lib/marj/jobs_interface.rb', line 86

def due
  Marj::Relation.new(all.due)
end

#next(limit = nil) ⇒ ActiveJob::Base, NilClass

Returns the next job or the next N jobs if limit is specified. If no jobs exist, returns nil.

Parameters:

  • limit (Integer, NilClass) (defaults to: nil)

Returns:

  • (ActiveJob::Base, NilClass)


61
62
63
# File 'lib/marj/jobs_interface.rb', line 61

def next(limit = nil)
  all.first(limit)&.then { _1.is_a?(Array) ? _1.map(&:as_job) : _1.as_job }
end

#perform_all(batch_size: nil) ⇒ Array

Calls perform_now on each job.

Parameters:

  • batch_size (Integer, NilClass) (defaults to: nil)

    the number of jobs to fetch at a time, or nil to fetch all jobs at once

Returns:

  • (Array)

    the results returned by each job



94
95
96
97
98
99
100
101
102
103
104
# File 'lib/marj/jobs_interface.rb', line 94

def perform_all(batch_size: nil)
  if batch_size
    [].tap do |results|
      while (jobs = all.limit(batch_size).map(&:as_job)).any?
        results.concat(jobs.map { |job| ActiveJob::Callbacks.run_callbacks(:execute) { job.perform_now } })
      end
    end
  else
    all.map(&:as_job).map { |job| ActiveJob::Callbacks.run_callbacks(:execute) { job.perform_now } }
  end
end

#queue(queue, *queues) ⇒ Marj::Relation

Returns a Relation for jobs in the specified queue(s).

Parameters:

  • queue (String, Symbol)

    the queue to query

  • queues (Array<String>, Array<Symbol>)

    more queues to query

Returns:



53
54
55
# File 'lib/marj/jobs_interface.rb', line 53

def queue(queue, *queues)
  Marj::Relation.new(all.where(queue_name: queues.dup.unshift(queue)))
end

#where(*args) ⇒ Marj::Relation

Returns a Relation for jobs matching the specified criteria.

Parameters:

  • args (Array)

Returns:



79
80
81
# File 'lib/marj/jobs_interface.rb', line 79

def where(*args)
  Marj::Relation.new(all.where(*args))
end