Class: Exekutor::Internal::Reserver
- Inherits:
-
Object
- Object
- Exekutor::Internal::Reserver
- Defined in:
- lib/exekutor/internal/reserver.rb
Overview
Reserves jobs to be executed by the current worker
Constant Summary collapse
- ACTION_NAME =
The name to use for the SQL log message
"Exekutor::Reserve"
Instance Method Summary collapse
-
#earliest_scheduled_at ⇒ Time?
Gets the earliest scheduled at of all pending jobs in the watched queues.
-
#get_abandoned_jobs(active_job_ids) ⇒ Array<Hash>
Gets the jobs that are assigned to this worker and have an id that is not included in
active_job_ids
. -
#initialize(worker_id, queues: nil, min_priority: nil, max_priority: nil) ⇒ Reserver
constructor
Creates a new Reserver.
-
#reserve(limit) ⇒ Array<Hash>?
Reserves pending jobs.
Constructor Details
#initialize(worker_id, queues: nil, min_priority: nil, max_priority: nil) ⇒ Reserver
Creates a new Reserver
14 15 16 17 18 |
# File 'lib/exekutor/internal/reserver.rb', line 14 def initialize(worker_id, queues: nil, min_priority: nil, max_priority: nil) @worker_id = worker_id @reserve_filter_sql = build_filter_sql(queues: queues, min_priority: min_priority, max_priority: max_priority) @json_serializer = Exekutor.config.load_json_serializer end |
Instance Method Details
#earliest_scheduled_at ⇒ Time?
Gets the earliest scheduled at of all pending jobs in the watched queues
52 53 54 55 56 |
# File 'lib/exekutor/internal/reserver.rb', line 52 def earliest_scheduled_at jobs = Exekutor::Job.pending jobs.where! @reserve_filter_sql unless @reserve_filter_sql.nil? jobs.minimum(:scheduled_at) end |
#get_abandoned_jobs(active_job_ids) ⇒ Array<Hash>
Gets the jobs that are assigned to this worker and have an id that is not included in active_job_ids
43 44 45 46 47 48 |
# File 'lib/exekutor/internal/reserver.rb', line 43 def get_abandoned_jobs(active_job_ids) jobs = Exekutor::Job.executing.where(worker_id: @worker_id) jobs = jobs.where.not(id: active_job_ids) if active_job_ids.present? attrs = %i[id payload options scheduled_at] jobs.pluck(*attrs).map { |p| attrs.zip(p).to_h } end |
#reserve(limit) ⇒ Array<Hash>?
Reserves pending jobs
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/exekutor/internal/reserver.rb', line 23 def reserve(limit) return unless limit.positive? results = Exekutor::Job.connection.exec_query <<~SQL, ACTION_NAME, [@worker_id, limit], prepare: true UPDATE exekutor_jobs SET worker_id = $1, status = 'e' WHERE id IN ( SELECT id FROM exekutor_jobs WHERE scheduled_at <= now() AND "status"='p'#{" AND #{@reserve_filter_sql}" if @reserve_filter_sql} ORDER BY priority#{" DESC" if Exekutor.config.inverse_priority?}, scheduled_at, enqueued_at FOR UPDATE SKIP LOCKED LIMIT $2 ) RETURNING "id", "payload", "options", "scheduled_at" SQL return unless results&.length&.positive? parse_jobs results end |