Marj - Minimal ActiveRecord Jobs

A minimal database-backed ActiveJob queueing backend.

API docs: https://gemdocs.org/gems/marj/6.1.0/
RubyGems: https://rubygems.org/gems/marj
Changelog: https://github.com/nicholasdower/marj/releases
Issues: https://github.com/nicholasdower/marj/issues
Development: https://github.com/nicholasdower/marj/blob/master/CONTRIBUTING.md

Motivation

There are already several great database-backed ActiveJob queueing backends:

Any of these which support your RDBMS is likely to work well for you. But you may find them to be more featureful and complex than what you require.

Marj aims to be a minimal alternative.

Goal

To be the database-backend ActiveJob queueing backend with:

  • The simplest setup
  • The fewest configuration options
  • The fewest features
  • The fewest backend-specific APIs
  • The fewest lines of code

Features

Marj supports and has been tested with MySQL, PostgreSQL and SQLite.

It provides the following features:

  • Enqueued jobs are written to the database.
  • Enqueued jobs can be queried, executed and discarded.
  • Executed jobs are re-enqueued or discarded, depending on the result.
  • Compatibility with Mission Control Jobs.

Extensions

Marj does not provide the following features by default, but each can easily be added to your application with a few lines of code. See Example Usage and Extension Examples for suggestions.

API

Marj relies on existing ActiveJob APIs, for example:

job.enqueue         # Enqueues a job
job.perform_now     # Executes a job

Additionally, it extends the ActiveJob API with methods required for a minimal queueing backend implementaion:

SomeJob.query(args) # Queries for enqeueued jobs
job.discard         # Runs discard callbacks and, by default, deletes the job
job.delete          # Deletes the job

Requirements

  • activejob >= 7.1
  • activerecord >= 7.1

Setup

1. Install

bundle add marj

2. Create the database table

class CreateJobs < ActiveRecord::Migration[7.1]
  def self.up
    create_table :jobs, id: :string, primary_key: :job_id do |table|
      table.string   :job_class,            null: false
      table.text     :arguments,            null: false
      table.string   :queue_name,           null: false
      table.integer  :priority
      table.integer  :executions,           null: false
      table.text     :exception_executions, null: false
      table.datetime :enqueued_at,          null: false
      table.datetime :scheduled_at
      table.string   :locale,               null: false
      table.string   :timezone,             null: false
    end

    add_index :jobs, %i[job_class]
    add_index :jobs, %i[queue_name]
    add_index :jobs, %i[enqueued_at]
    add_index :jobs, %i[scheduled_at]
    add_index :jobs, %i[priority scheduled_at enqueued_at]
  end

  def self.down
    drop_table :jobs
  end
end

3. Configure the queue adapter

require 'marj'

# Choose one of the following:
Rails.configuration.active_job.queue_adapter = :marj # Globally, with Rails
ActiveJob::Base.queue_adapter = :marj                # Globally, without Rails
SomeJob.queue_adapter = :marj                        # Single job

4. Optionally, include the Marj module

Without any additional setup, jobs can be queried and discarded via the Marj module:

Marj.query(:all)
Marj.discard(job)

But it is also convenient to query or discard via job classes:

SomeJob.query(:all)
SomeJob.discard(job)
job.discard

In order to enable this functionality, you must include the Marj module:

class SomeJob < ActiveJob::Base
  include Marj

  def perform; end
end

Example Usage

# Enqueue a job
job = SomeJob.perform_later('foo')

# Query jobs
Marj.query(:all)
Marj.query(:due)
Marj.query(:due, queue: :foo)
Marj.query('8720417d-8fff-4fcf-bc16-22aaef8543d2')

# Execute a job
job.perform_now

# Execute jobs which are due (single query)
Marj.query(:due).map(&:perform_now)

# Execute jobs which are due (multiple queries)
loop do
  Marj.query(:due, :first)&.tap(&:perform_now) || break
end

# Execute jobs as they become due
loop do
  Marj.query(:due).each(&:perform_now) rescue logger.error($!)
ensure
  sleep 5.seconds
end

Testing

By default, jobs enqeued during tests will be written to the database. Enqueued jobs can be executed via:

SomeJob.query(:due).map(&:perform_now)

Alternatively, to use ActiveJob's TestAdapter:

ActiveJob::Base.queue_adapter = :test

Extension Examples

Most missing features can easily be added to your application with a few lines of code.

Concurrency Protection

To prevent two threads from executing the same job simultaneously, consider moving jobs to a different queue before executing them.

class ApplicationJob < ActiveJob::Base
  around_perform do |job, block|
    if job.queue_name.start_with?('claimed')
      raise "Job #{job.job_id} already claimed"
    end

    updated = Marj::Record
      .where(job_id: job.job_id, queue_name: job.queue_name)
      .update_all(
        queue_name: "claimed-#{job.queue_name}",
        scheduled_at: Time.now.utc
      )
    unless updated == 1
      raise "Failed to claim job #{job.job_id}. #{updated} records updated"
    end

    begin
      block.call
    rescue StandardError
      Marj::Record
        .where(job_id: job.job_id, queue_name: "claimed-#{job.queue_name}")
        .update_all(queue_name: job.queue_name, scheduled_at: job.scheduled_at)
      raise
    end
  end
end

Job Timeouts

class ApplicationJob < ActiveJob::Base
  def self.timeout_after(duration)
    @timeout = duration
  end

  around_perform do |job, block|
    if (timeout = job.class.instance_variable_get(:@timeout))
      ::Timeout.timeout(timeout, StandardError, 'execution expired') do
        block.call
      end
    else
      block.call
    end
  end
end

Error Storage

class AddLastErrorToJobs < ActiveRecord::Migration[7.1]
  def self.up
    add_column :jobs, :last_error, :text
  end

  def self.down
    remove_column :jobs, :last_error
  end
end

class ApplicationJob < ActiveJob::Base
  attr_reader :last_error

  def last_error=(error)
    if error.is_a?(Exception)
      backtrace = error.backtrace&.map { |line| "\t#{line}" }&.join("\n")
      error = backtrace ?
        "#{error.class}: #{error.message}\n#{backtrace}" :
        "#{error.class}: #{error.message}"
    end

    @last_error = error&.truncate(10_000, omission: '… (truncated)')
  end

  def set(options = {})
    super.tap { self.last_error = options[:error] if options[:error] }
  end

  def serialize
    super.merge('last_error' => @last_error)
  end

  def deserialize(job_data)
    super.tap { self.last_error = job_data['last_error'] }
  end
end

Discarded Job Retention

By default, discarded jobs are deleted from the database. If desired, this behavior can be overridden, for instance to move jobs to a differnt queue:

ActiveJob::Base.queue_adapter = MarjAdapter.new(
  discard: proc { _1.enqueue(queue: 'discarded') }
)

# To retrieve a discarded job, query the discarded queue:
job = Marj.query(:first, queue_name: 'discarded')

# To delete, use one of the following:
Marj.delete(job)
job.delete

Observability

For instance with Prometheus metrics:

class ApplicationJob < ActiveJob::Base
  counter = Prometheus::Client::Counter.new(
    :job_events_total, docstring: '...', labels: [:job, :event]
  )

  around_enqueue do |job, block|
    counter.increment(labels: { job: job.class.name, event: 'before_enqueue' })
    block.call
    counter.increment(labels: { job: job.class.name, event: 'after_enqueue' })
  rescue Exception
    counter.increment(labels: { job: job.class.name, event: 'enqueue_error' })
    raise
  end

  around_perform do |job, block|
    counter.increment(labels: { job: job.class.name, event: 'before_perform' })
    block.call
    counter.increment(labels: { job: job.class.name, event: 'after_perform' })
  rescue Exception => e # rubocop:disable Lint/RescueException
    counter.increment(labels: { job: job.class.name, event: 'perform_error' })
    raise
  end

  after_discard do |job|
    counter.increment(labels: { job: job.class.name, event: 'after_discard' })
  end
end

Multi-DB Support

It is possible to create a custom record class in order to, for instance, use a different class or table name, or write jobs to multiple databases/tables within a single application.

Assuming you have a jobs tabled named my_jobs:

class MyRecord < Marj::Record
  self.table_name = 'my_jobs'
end

class MyJob < ActiveJob::Base
  include Marj

  self.queue_adapter = MarjAdapter.new('MyRecord')

  def perform; end
end

MyJob.perform_later('oh, hi')
MyJob.query(:due, :first).perform_now

ActiveJob Cheatsheet

For more information on ActiveJob, see:

Configuring a Queue Adapter

# With Rails
Rails.configuration.active_job.queue_adapter = :foo # Instantiates FooAdapter
Rails.configuration.active_job.queue_adapter = FooAdapter.new

# Without Rails
ActiveJob::Base.queue_adapter = :foo               # Instantiates FooAdapter
ActiveJob::Base.queue_adapter = FooAdapter.new     # Uses FooAdapter directly

# Single Job
SomeJob.queue_adapter = :foo                       # Instantiates FooAdapter
SomeJob.queue_adapter = FooAdapter.new             # Uses FooAdapter directly

Configuration

config.active_job.default_queue_name
config.active_job.queue_name_prefix
config.active_job.queue_name_delimiter
config.active_job.retry_jitter
SomeJob.queue_name
SomeJob.queue_as
SomeJob.queue_name_prefix
SomeJob.queue_name_delimiter
SomeJob.retry_jitter

Options

:wait       # Enqueues the job with the specified delay
:wait_until # Enqueues the job at the time specified
:queue      # Enqueues the job on the specified queue
:priority   # Enqueues the job with the specified priority

Callbacks

SomeJob.before_enqueue
SomeJob.after_enqueue
SomeJob.around_enqueue
SomeJob.before_perform
SomeJob.after_perform
SomeJob.around_perform
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :before, &block)
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :after, &block)
ActiveJob::Callbacks.singleton_class.set_callback(:execute, :around, &block)

Handling Exceptions

SomeJob.retry_on
SomeJob.discard_on
SomeJob.after_discard

Creating Jobs

# Create without enqueueing
job = SomeJob.new
job = SomeJob.new(args)
job = SomeJob.new.deserialize(other_job.serialize)

# Create and enqueue
job = SomeJob.perform_later
job = SomeJob.perform_later(args)

# Create without enqueueing and run (only enqueued on retryable failure)
SomeJob.perform_now
SomeJob.perform_now(args)

Enqueueing Jobs

Jobs are enqueued via the ActiveJob::Base#enqueue method. This method returns the job on success. If an error is raised during enqueueing, that error will propagate to the caller, unless the error is an ActiveJob::EnqueueError. In this case, enqueue will return false and job.enqueue_error will be set.

SomeJob.new(args).enqueue
SomeJob.new(args).enqueue(options)

# Via perform_later
SomeJob.perform_later(SomeJob.new(args))
SomeJob.perform_later(args)
SomeJob.set(options).perform_later(args)

# After a retryable execution failure
SomeJob.perform_now(args)
ActiveJob::Base.execute(SomeJob.new(args).serialize)

# Enqueue multiple
ActiveJob.perform_all_later(SomeJob.new, SomeJob.new)
ActiveJob.perform_all_later(SomeJob.new, SomeJob.new, options:)
SomeJob.set(options).perform_all_later(SomeJob.new, SomeJob.new)
SomeJob.set(options).perform_all_later(SomeJob.new, SomeJob.new, options:)

Executing Jobs

# Executed without enqueueing, enqueued on retryable failure
SomeJob.new(args).perform_now
SomeJob.perform_now(args)
ActiveJob::Base.execute(SomeJob.new(args).serialize)

# Executed after enqueueing
SomeJob.perform_later(args).perform_now
ActiveJob::Base.execute(SomeJob.perform_later(args).serialize)