Class: Exekutor::Internal::Reserver

Inherits:
Object
  • Object
show all
Defined in:
lib/exekutor/internal/reserver.rb

Overview

Reserves jobs to be executed by the current worker

Constant Summary collapse

ACTION_NAME =

The name to use for the SQL log message

"Exekutor::Reserve"

Instance Method Summary collapse

Constructor Details

#initialize(worker_id, queues: nil, min_priority: nil, max_priority: nil) ⇒ Reserver

Creates a new Reserver

Parameters:

  • worker_id (String)

    the id of the worker

  • queues (Array<String>) (defaults to: nil)

    the queues to watch



14
15
16
17
18
# File 'lib/exekutor/internal/reserver.rb', line 14

def initialize(worker_id, queues: nil, min_priority: nil, max_priority: nil)
  @worker_id = worker_id
  @reserve_filter_sql = build_filter_sql(queues: queues, min_priority: min_priority, max_priority: max_priority)
  @json_serializer = Exekutor.config.load_json_serializer
end

Instance Method Details

#earliest_scheduled_atTime?

Gets the earliest scheduled at of all pending jobs in the watched queues

Returns:

  • (Time, nil)

    The earliest scheduled at, or nil if the queues are empty



52
53
54
55
56
# File 'lib/exekutor/internal/reserver.rb', line 52

def earliest_scheduled_at
  jobs = Exekutor::Job.pending
  jobs.where! @reserve_filter_sql unless @reserve_filter_sql.nil?
  jobs.minimum(:scheduled_at)
end

#get_abandoned_jobs(active_job_ids) ⇒ Array<Hash>

Gets the jobs that are assigned to this worker and have an id that is not included in active_job_ids

Parameters:

  • active_job_ids (Array<String>)

    The ids of the jobs that should be excluded

Returns:

  • (Array<Hash>)

    the jobs



43
44
45
46
47
48
# File 'lib/exekutor/internal/reserver.rb', line 43

def get_abandoned_jobs(active_job_ids)
  jobs = Exekutor::Job.executing.where(worker_id: @worker_id)
  jobs = jobs.where.not(id: active_job_ids) if active_job_ids.present?
  attrs = %i[id payload options scheduled_at]
  jobs.pluck(*attrs).map { |p| attrs.zip(p).to_h }
end

#reserve(limit) ⇒ Array<Hash>?

Reserves pending jobs

Parameters:

  • limit (Integer)

    the number of jobs to reserve

Returns:

  • (Array<Hash>, nil)

    the reserved jobs, or nil if no jobs were reserved



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/exekutor/internal/reserver.rb', line 23

def reserve(limit)
  return unless limit.positive?

  results = Exekutor::Job.connection.exec_query <<~SQL, ACTION_NAME, [@worker_id, limit], prepare: true
    UPDATE exekutor_jobs SET worker_id = $1, status = 'e' WHERE id IN (
       SELECT id FROM exekutor_jobs
          WHERE scheduled_at <= now() AND "status"='p'#{" AND #{@reserve_filter_sql}" if @reserve_filter_sql}
          ORDER BY priority#{" DESC" if Exekutor.config.inverse_priority?}, scheduled_at, enqueued_at
          FOR UPDATE SKIP LOCKED
          LIMIT $2
    ) RETURNING "id", "payload", "options", "scheduled_at"
  SQL
  return unless results&.length&.positive?

  parse_jobs results
end