Class: ActiveJob::JobsRelation

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/active_job/jobs_relation.rb

Overview

A relation of jobs that can be filtered and acted on.

Relations of jobs are normally fetched via ActiveJob.jobs or through a given queue (+ActiveJob::Queue#jobs+).

This class offers a fluid interface to query a subset of jobs. For example:

queue = ActiveJob.queues[:default]
queue.jobs.limit(10).where(job_class_name: "DummyJob").last

Relations are enumerable, so you can use Enumerable methods on them. Notice however that using these methods will imply loading all the relation in memory, which could introduce performance concerns.

Internally, ActiveJob will always use paginated queries to the underlying queue adapter. The page size can be controlled via config.active_job.default_page_size (1000 by default).

There are additional performance concerns depending on the configured adapter. Please check ActiveJob::Relation#where, ActiveJob::Relation#count.

Constant Summary collapse

STATUSES =
%i[ pending failed in_progress blocked scheduled finished ]
FILTERS =
%i[ queue_name job_class_name ]
PROPERTIES =
%i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at ]
ALL_JOBS_LIMIT =

When no limit value it defaults to “all jobs”

100_000_000

Instance Method Summary collapse

Constructor Details

#initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size: ActiveJob::Base.default_page_size) ⇒ JobsRelation

Returns a new instance of JobsRelation.



36
37
38
39
40
41
# File 'lib/active_job/jobs_relation.rb', line 36

def initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size: ActiveJob::Base.default_page_size)
  @queue_adapter = queue_adapter
  @default_page_size = default_page_size

  set_defaults
end

Instance Method Details

#countObject Also known as: length, size

Returns the number of jobs in the relation.

When filtering jobs, if the adapter doesn’t support the filter(s) directly, this will load all the jobs in memory to filter them.



99
100
101
102
103
104
105
# File 'lib/active_job/jobs_relation.rb', line 99

def count
  if loaded? || filtering_needed?
    to_a.length
  else
    query_count
  end
end

#discard_allObject

Discard all the jobs in the relation.



148
149
150
151
# File 'lib/active_job/jobs_relation.rb', line 148

def discard_all
  queue_adapter.discard_all_jobs(self)
  nil
end

#discard_job(job) ⇒ Object

Discard the provided job.



154
155
156
# File 'lib/active_job/jobs_relation.rb', line 154

def discard_job(job)
  queue_adapter.discard_job(job, self)
end

#dispatch_job(job) ⇒ Object

Dispatch the provided job.

This operation is only valid for blocked or scheduled jobs. It will raise an error ActiveJob::Errors::InvalidOperation otherwise.



162
163
164
165
166
# File 'lib/active_job/jobs_relation.rb', line 162

def dispatch_job(job)
  raise ActiveJob::Errors::InvalidOperation, "This operation can only be performed on blocked or scheduled jobs, but this job is #{job.status}" unless job.blocked? || job.scheduled?

  queue_adapter.dispatch_job(job, self)
end

#each(&block) ⇒ Object



124
125
126
# File 'lib/active_job/jobs_relation.rb', line 124

def each(&block)
  loaded_jobs&.each(&block) || load_jobs(&block)
end

#empty?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/active_job/jobs_relation.rb', line 110

def empty?
  count == 0
end

#filtering_needed?Boolean

Returns:

  • (Boolean)


216
217
218
# File 'lib/active_job/jobs_relation.rb', line 216

def filtering_needed?
  filters.any?
end

#find_by_id(job_id) ⇒ Object

Find a job by id.

Returns nil when not found.



171
172
173
# File 'lib/active_job/jobs_relation.rb', line 171

def find_by_id(job_id)
  queue_adapter.find_job(job_id, self)
end

#find_by_id!(job_id) ⇒ Object

Find a job by id.

Raises ActiveJob::Errors::JobNotFoundError when not found.



178
179
180
# File 'lib/active_job/jobs_relation.rb', line 178

def find_by_id!(job_id)
  queue_adapter.find_job(job_id, self) or raise ActiveJob::Errors::JobNotFoundError.new(job_id, self)
end

#in_batches(of: default_page_size, order: :asc, &block) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/active_job/jobs_relation.rb', line 195

def in_batches(of: default_page_size, order: :asc, &block)
  validate_looping_in_batches_is_possible

  case order
  when :asc
    in_ascending_batches(of: of, &block)
  when :desc
    in_descending_batches(of: of, &block)
  else
    raise "Unsupported order: #{order}. Valid values: :asc, :desc."
  end
end

#job_class_names(from_first: 500) ⇒ Object

Returns an array of jobs class names in the first from_first jobs.



183
184
185
# File 'lib/active_job/jobs_relation.rb', line 183

def job_class_names(from_first: 500)
  first(from_first).collect(&:job_class_name).uniq
end

#limit(limit) ⇒ Object

Sets the max number of jobs to fetch in the query.



91
92
93
# File 'lib/active_job/jobs_relation.rb', line 91

def limit(limit)
  clone_with limit_value: limit
end

#limit_value_provided?Boolean

Returns:

  • (Boolean)


212
213
214
# File 'lib/active_job/jobs_relation.rb', line 212

def limit_value_provided?
  limit_value.present? && limit_value != ActiveJob::JobsRelation::ALL_JOBS_LIMIT
end

#offset(offset) ⇒ Object

Sets an offset for the jobs-fetching query. The first position is 0.



86
87
88
# File 'lib/active_job/jobs_relation.rb', line 86

def offset(offset)
  clone_with offset_value: offset
end

#paginated?Boolean

Returns:

  • (Boolean)


208
209
210
# File 'lib/active_job/jobs_relation.rb', line 208

def paginated?
  offset_value > 0 || limit_value_provided?
end

#reloadObject



187
188
189
190
191
192
193
# File 'lib/active_job/jobs_relation.rb', line 187

def reload
  @count = nil
  @loaded_jobs = nil
  @filters = nil

  self
end

#retry_allObject

Retry all the jobs in the queue.

This operation is only valid for sets of failed jobs. It will raise an error ActiveJob::Errors::InvalidOperation otherwise.



132
133
134
135
136
# File 'lib/active_job/jobs_relation.rb', line 132

def retry_all
  ensure_failed_status
  queue_adapter.retry_all_jobs(self)
  nil
end

#retry_job(job) ⇒ Object

Retry the provided job.

This operation is only valid for sets of failed jobs. It will raise an error ActiveJob::Errors::InvalidOperation otherwise.



142
143
144
145
# File 'lib/active_job/jobs_relation.rb', line 142

def retry_job(job)
  ensure_failed_status
  queue_adapter.retry_job(job, self)
end

#to_sObject Also known as: inspect



114
115
116
117
118
119
120
# File 'lib/active_job/jobs_relation.rb', line 114

def to_s
  properties_with_values = PROPERTIES.collect do |name|
    value = public_send(name)
    "#{name}: #{value}" unless value.nil?
  end.compact.join(", ")
  "<Jobs with [#{properties_with_values}]> (loaded: #{loaded?})"
end

#where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil) ⇒ Object

Returns a ActiveJob::JobsRelation with the configured filtering options.

Options

  • :job_class_name - To only include the jobs of a given class. Depending on the configured queue adapter, this will perform the filtering in memory, which could introduce performance concerns for large sets of jobs.

  • :queue_name - To only include the jobs in the provided queue.

  • :worker_id - To only include the jobs processed by the provided worker.

  • :recurring_task_id - To only include the jobs corresponding to runs of a recurring task.

  • :finished_at - (Range) To only include the jobs finished between the provided range



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/active_job/jobs_relation.rb', line 55

def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil)
  # Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses
  arguments = { job_class_name: job_class_name,
    queue_name: queue_name&.to_s,
    worker_id: worker_id,
    recurring_task_id: recurring_task_id,
    finished_at: finished_at
  }.compact

  clone_with **arguments
end

#with_status(status) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/active_job/jobs_relation.rb', line 67

def with_status(status)
  if status.to_sym.in? STATUSES
    clone_with status: status.to_sym
  else
    self
  end
end