Class: GoodJob::JobPerformer

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/job_performer.rb,
lib/good_job/job_performer/metrics.rb

Overview

JobPerformer queries the database for jobs and performs them on behalf of a Scheduler. It mainly functions as glue between a Scheduler and the jobs it should be executing.

The JobPerformer must be safe to execute across multiple threads.

Defined Under Namespace

Classes: Metrics

Instance Method Summary collapse

Constructor Details

#initialize(queue_string, capsule: GoodJob.capsule) ⇒ JobPerformer

Returns a new instance of JobPerformer.

Parameters:

  • queue_string (String)

    Queues to execute jobs from



17
18
19
20
21
# File 'lib/good_job/job_performer.rb', line 17

def initialize(queue_string, capsule: GoodJob.capsule)
  @queue_string = queue_string
  @capsule = capsule
  @metrics = Metrics.new
end

Instance Method Details

#cleanupvoid

This method returns an undefined value.

Destroy expired preserved jobs



88
89
90
# File 'lib/good_job/job_performer.rb', line 88

def cleanup
  GoodJob.cleanup_preserved_jobs
end

#nameString

A meaningful name to identify the performer in logs and for debugging.

Returns:

  • (String)

    The queues from which Jobs are worked



25
26
27
# File 'lib/good_job/job_performer.rb', line 25

def name
  @queue_string
end

#next {|Execution| ... } ⇒ Object?

Perform the next eligible job

Yields:

  • (Execution)

    Yields the execution, if one is dequeued

Returns:

  • (Object, nil)

    Returns job result or nil if no job was found



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/good_job/job_performer.rb', line 32

def next
  active_job_id = nil
  @capsule.tracker.register do
    job_query.perform_with_advisory_lock(lock_id: @capsule.tracker.id_for_lock, parsed_queues: parsed_queues, queue_select_limit: GoodJob.configuration.queue_select_limit) do |execution|
      @metrics.touch_check_queue_at

      if execution
        active_job_id = execution.active_job_id
        performing_active_job_ids << active_job_id
        @metrics.touch_execution_at
        yield(execution) if block_given?
      else
        @metrics.increment_empty_executions
      end
    end.tap do |result|
      if result
        result.succeeded? ? @metrics.increment_succeeded_executions : @metrics.increment_errored_executions
      end
    end
  end
ensure
  performing_active_job_ids.delete(active_job_id) if active_job_id
end

#next?(state = {}) ⇒ Boolean

Tests whether this performer should be used in GoodJob’s current state.

For example, state will be a LISTEN/NOTIFY message that is passed down from the Notifier to the Scheduler. The Scheduler is able to ask its performer “does this message relate to you?”, and if not, ignore it to minimize thread wake-ups, database queries, and thundering herds.

Returns:

  • (Boolean)

    whether the performer’s #next method should be called in the current state.



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/good_job/job_performer.rb', line 65

def next?(state = {})
  return true unless state[:queue_name]

  if parsed_queues[:exclude]
    parsed_queues[:exclude].exclude?(state[:queue_name])
  elsif parsed_queues[:include]
    parsed_queues[:include].include?(state[:queue_name])
  else
    true
  end
end

#next_at(after: nil, limit: nil, now_limit: nil) ⇒ Array<DateTime, Time>?

The Returns timestamps of when next tasks may be available.

Parameters:

  • after (DateTime, Time, nil) (defaults to: nil)

    future jobs scheduled after this time

  • limit (Integer) (defaults to: nil)

    number of future timestamps to return

  • now_limit (Integer) (defaults to: nil)

    number of past timestamps to return

Returns:

  • (Array<DateTime, Time>, nil)


82
83
84
# File 'lib/good_job/job_performer.rb', line 82

def next_at(after: nil, limit: nil, now_limit: nil)
  job_query.next_scheduled_at(after: after, limit: limit, now_limit: now_limit)
end

#reset_statsvoid

This method returns an undefined value.

Reset metrics about this performer



102
103
104
# File 'lib/good_job/job_performer.rb', line 102

def reset_stats
  @metrics.reset
end

#statsHash

Metrics about this performer

Returns:

  • (Hash)


94
95
96
97
98
# File 'lib/good_job/job_performer.rb', line 94

def stats
  {
    name: name,
  }.merge(@metrics.to_h)
end