Class: ActiveJob::JobsRelation
- Inherits:
-
Object
- Object
- ActiveJob::JobsRelation
- Includes:
- Enumerable
- Defined in:
- lib/active_job/jobs_relation.rb
Overview
A relation of jobs that can be filtered and acted on.
Relations of jobs are normally fetched via ActiveJob.jobs
or through a given queue (+ActiveJob::Queue#jobs+).
This class offers a fluid interface to query a subset of jobs. For example:
queue = ActiveJob.queues[:default]
queue.jobs.limit(10).where(job_class_name: "DummyJob").last
Relations are enumerable, so you can use Enumerable
methods on them. Notice however that using these methods will imply loading all the relation in memory, which could introduce performance concerns.
Internally, ActiveJob
will always use paginated queries to the underlying queue adapter. The page size can be controlled via config.active_job.default_page_size
(1000 by default).
There are additional performance concerns depending on the configured adapter. Please check ActiveJob::Relation#where, ActiveJob::Relation#count.
Constant Summary collapse
- STATUSES =
%i[ pending failed in_progress blocked scheduled finished ]
- FILTERS =
%i[ queue_name job_class_name ]
- PROPERTIES =
%i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at ]
- ALL_JOBS_LIMIT =
When no limit value it defaults to “all jobs”
100_000_000
Instance Method Summary collapse
-
#count ⇒ Object
(also: #length, #size)
Returns the number of jobs in the relation.
-
#discard_all ⇒ Object
Discard all the jobs in the relation.
-
#discard_job(job) ⇒ Object
Discard the provided job.
-
#dispatch_job(job) ⇒ Object
Dispatch the provided job.
- #each(&block) ⇒ Object
- #empty? ⇒ Boolean
- #filtering_needed? ⇒ Boolean
-
#find_by_id(job_id) ⇒ Object
Find a job by id.
-
#find_by_id!(job_id) ⇒ Object
Find a job by id.
- #in_batches(of: default_page_size, order: :asc, &block) ⇒ Object
-
#initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size: ActiveJob::Base.default_page_size) ⇒ JobsRelation
constructor
A new instance of JobsRelation.
-
#job_class_names(from_first: 500) ⇒ Object
Returns an array of jobs class names in the first
from_first
jobs. -
#limit(limit) ⇒ Object
Sets the max number of jobs to fetch in the query.
- #limit_value_provided? ⇒ Boolean
-
#offset(offset) ⇒ Object
Sets an offset for the jobs-fetching query.
- #paginated? ⇒ Boolean
- #reload ⇒ Object
-
#retry_all ⇒ Object
Retry all the jobs in the queue.
-
#retry_job(job) ⇒ Object
Retry the provided job.
- #to_s ⇒ Object (also: #inspect)
-
#where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil) ⇒ Object
Returns a
ActiveJob::JobsRelation
with the configured filtering options. - #with_status(status) ⇒ Object
Constructor Details
#initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size: ActiveJob::Base.default_page_size) ⇒ JobsRelation
Returns a new instance of JobsRelation.
36 37 38 39 40 41 |
# File 'lib/active_job/jobs_relation.rb', line 36 def initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size: ActiveJob::Base.default_page_size) @queue_adapter = queue_adapter @default_page_size = default_page_size set_defaults end |
Instance Method Details
#count ⇒ Object Also known as: length, size
Returns the number of jobs in the relation.
When filtering jobs, if the adapter doesn’t support the filter(s) directly, this will load all the jobs in memory to filter them.
99 100 101 102 103 104 105 |
# File 'lib/active_job/jobs_relation.rb', line 99 def count if loaded? || filtering_needed? to_a.length else query_count end end |
#discard_all ⇒ Object
Discard all the jobs in the relation.
148 149 150 151 |
# File 'lib/active_job/jobs_relation.rb', line 148 def discard_all queue_adapter.discard_all_jobs(self) nil end |
#discard_job(job) ⇒ Object
Discard the provided job.
154 155 156 |
# File 'lib/active_job/jobs_relation.rb', line 154 def discard_job(job) queue_adapter.discard_job(job, self) end |
#dispatch_job(job) ⇒ Object
Dispatch the provided job.
This operation is only valid for blocked or scheduled jobs. It will raise an error ActiveJob::Errors::InvalidOperation
otherwise.
162 163 164 165 166 |
# File 'lib/active_job/jobs_relation.rb', line 162 def dispatch_job(job) raise ActiveJob::Errors::InvalidOperation, "This operation can only be performed on blocked or scheduled jobs, but this job is #{job.status}" unless job.blocked? || job.scheduled? queue_adapter.dispatch_job(job, self) end |
#each(&block) ⇒ Object
124 125 126 |
# File 'lib/active_job/jobs_relation.rb', line 124 def each(&block) loaded_jobs&.each(&block) || load_jobs(&block) end |
#empty? ⇒ Boolean
110 111 112 |
# File 'lib/active_job/jobs_relation.rb', line 110 def empty? count == 0 end |
#filtering_needed? ⇒ Boolean
216 217 218 |
# File 'lib/active_job/jobs_relation.rb', line 216 def filtering_needed? filters.any? end |
#find_by_id(job_id) ⇒ Object
Find a job by id.
Returns nil when not found.
171 172 173 |
# File 'lib/active_job/jobs_relation.rb', line 171 def find_by_id(job_id) queue_adapter.find_job(job_id, self) end |
#find_by_id!(job_id) ⇒ Object
Find a job by id.
Raises ActiveJob::Errors::JobNotFoundError
when not found.
178 179 180 |
# File 'lib/active_job/jobs_relation.rb', line 178 def find_by_id!(job_id) queue_adapter.find_job(job_id, self) or raise ActiveJob::Errors::JobNotFoundError.new(job_id, self) end |
#in_batches(of: default_page_size, order: :asc, &block) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/active_job/jobs_relation.rb', line 195 def in_batches(of: default_page_size, order: :asc, &block) validate_looping_in_batches_is_possible case order when :asc in_ascending_batches(of: of, &block) when :desc in_descending_batches(of: of, &block) else raise "Unsupported order: #{order}. Valid values: :asc, :desc." end end |
#job_class_names(from_first: 500) ⇒ Object
Returns an array of jobs class names in the first from_first
jobs.
183 184 185 |
# File 'lib/active_job/jobs_relation.rb', line 183 def job_class_names(from_first: 500) first(from_first).collect(&:job_class_name).uniq end |
#limit(limit) ⇒ Object
Sets the max number of jobs to fetch in the query.
91 92 93 |
# File 'lib/active_job/jobs_relation.rb', line 91 def limit(limit) clone_with limit_value: limit end |
#limit_value_provided? ⇒ Boolean
212 213 214 |
# File 'lib/active_job/jobs_relation.rb', line 212 def limit_value_provided? limit_value.present? && limit_value != ActiveJob::JobsRelation::ALL_JOBS_LIMIT end |
#offset(offset) ⇒ Object
Sets an offset for the jobs-fetching query. The first position is 0.
86 87 88 |
# File 'lib/active_job/jobs_relation.rb', line 86 def offset(offset) clone_with offset_value: offset end |
#paginated? ⇒ Boolean
208 209 210 |
# File 'lib/active_job/jobs_relation.rb', line 208 def paginated? offset_value > 0 || limit_value_provided? end |
#reload ⇒ Object
187 188 189 190 191 192 193 |
# File 'lib/active_job/jobs_relation.rb', line 187 def reload @count = nil @loaded_jobs = nil @filters = nil self end |
#retry_all ⇒ Object
Retry all the jobs in the queue.
This operation is only valid for sets of failed jobs. It will raise an error ActiveJob::Errors::InvalidOperation
otherwise.
132 133 134 135 136 |
# File 'lib/active_job/jobs_relation.rb', line 132 def retry_all ensure_failed_status queue_adapter.retry_all_jobs(self) nil end |
#retry_job(job) ⇒ Object
Retry the provided job.
This operation is only valid for sets of failed jobs. It will raise an error ActiveJob::Errors::InvalidOperation
otherwise.
142 143 144 145 |
# File 'lib/active_job/jobs_relation.rb', line 142 def retry_job(job) ensure_failed_status queue_adapter.retry_job(job, self) end |
#to_s ⇒ Object Also known as: inspect
114 115 116 117 118 119 120 |
# File 'lib/active_job/jobs_relation.rb', line 114 def to_s properties_with_values = PROPERTIES.collect do |name| value = public_send(name) "#{name}: #{value}" unless value.nil? end.compact.join(", ") "<Jobs with [#{properties_with_values}]> (loaded: #{loaded?})" end |
#where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil) ⇒ Object
Returns a ActiveJob::JobsRelation
with the configured filtering options.
Options
-
:job_class_name
- To only include the jobs of a given class. Depending on the configured queue adapter, this will perform the filtering in memory, which could introduce performance concerns for large sets of jobs. -
:queue_name
- To only include the jobs in the provided queue. -
:worker_id
- To only include the jobs processed by the provided worker. -
:recurring_task_id
- To only include the jobs corresponding to runs of a recurring task. -
:finished_at
- (Range) To only include the jobs finished between the provided range
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/active_job/jobs_relation.rb', line 55 def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil) # Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses arguments = { job_class_name: job_class_name, queue_name: queue_name&.to_s, worker_id: worker_id, recurring_task_id: recurring_task_id, finished_at: finished_at }.compact clone_with **arguments end |
#with_status(status) ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/active_job/jobs_relation.rb', line 67 def with_status(status) if status.to_sym.in? STATUSES clone_with status: status.to_sym else self end end |