Module: RocketJob::Plugins::Job::Persistence::ClassMethods
- Defined in:
- lib/rocket_job/plugins/job/persistence.rb
Instance Method Summary collapse
-
#counts_by_state ⇒ Object
Returns [Hash<String:Integer>] of the number of jobs in each state Queued jobs are separated into :queued_now and :scheduled :queued_now are jobs that are awaiting processing and can be processed now.
-
#rocket_job_retrieve(worker_name, filter) ⇒ Object
Retrieves the next job to work on in priority based order and assigns it to this worker.
Instance Method Details
#counts_by_state ⇒ Object
Returns [Hash<String:Integer>] of the number of jobs in each state Queued jobs are separated into :queued_now and :scheduled
:queued_now are jobs that are awaiting processing and can be processed now.
:scheduled are jobs scheduled to run the future.
Note: If there are no jobs in that particular state then the hash will not have a value for it
Example jobs in every state:
RocketJob::Job.counts_by_state
# => {
:aborted => 1,
:completed => 37,
:failed => 1,
:paused => 3,
:queued => 4,
:running => 1,
:queued_now => 1,
:scheduled => 3
}
Example jobs some states:
RocketJob::Job.counts_by_state
# => {
:failed => 1,
:running => 25,
:completed => 1237
}
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/rocket_job/plugins/job/persistence.rb', line 71 def counts_by_state counts = {} collection.aggregate([ { '$group' => { _id: '$state', count: {'$sum' => 1} } } ] ).each do |result| counts[result['_id'].to_sym] = result['count'] end # Calculate :queued_now and :scheduled if there are queued jobs if queued_count = counts[:queued] scheduled_count = RocketJob::Job.scheduled.count if scheduled_count > 0 queued_now_count = queued_count - scheduled_count counts[:queued_now] = queued_count - scheduled_count if queued_now_count > 0 counts[:scheduled] = scheduled_count else counts[:queued_now] = queued_count end end counts end |
#rocket_job_retrieve(worker_name, filter) ⇒ Object
Retrieves the next job to work on in priority based order and assigns it to this worker
Returns nil if no jobs are available for processing
Parameters
worker_name: [String]
Name of the worker that will be processing this job
filter: [Hash]
Filter to apply to the query.
For example: to exclude jobs from being returned.
Example:
# Skip any job ids from the job_ids_list
filter = {:id.nin => job_ids_list}
job = RocketJob::Job.rocket_job_retrieve('host:pid:worker', filter)
35 36 37 38 39 40 41 42 |
# File 'lib/rocket_job/plugins/job/persistence.rb', line 35 def rocket_job_retrieve(worker_name, filter) SemanticLogger.silence(:info) do query = queued_now query = query.where(filter) unless filter.blank? update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}} query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) end end |