Class: Exekutor::Cleanup

Inherits:
Object
  • Object
show all
Defined in:
lib/exekutor/cleanup.rb

Overview

Helper class to clean up finished jobs and stale workers.

Instance Method Summary collapse

Instance Method Details

#cleanup_jobs(before: 48.hours.ago, status: nil) ⇒ Integer

Purges all jobs where scheduled at is before before. Only purges jobs with the given status, if no status is given all jobs that are not pending are purged.

Parameters:

  • before (ActiveSupport::Duration, Numeric, Time) (defaults to: 48.hours.ago)

    the maximum scheduled at. Default: 48 hours ago

  • status (Array<String,Symbol>, String, Symbol) (defaults to: nil)

    the statuses to purge. Default: All except :pending

Returns:

  • (Integer)

    the number of purged jobs



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/exekutor/cleanup.rb', line 20

def cleanup_jobs(before: 48.hours.ago, status: nil)
  destroy_before = parse_timeout_arg :before, before
  unless [Array, String, Symbol, NilClass].any? { |c| status.is_a? c }
    raise ArgumentError, "Unsupported value for status: #{status.class}"
  end

  jobs = Exekutor::Job.all
  jobs.where!(%("scheduled_at"<?), destroy_before) unless before.nil?
  if status
    jobs.where! status: status
  else
    jobs = jobs.where.not(status: :p)
  end
  jobs.delete_all
end

#cleanup_workers(timeout: 4.hours) ⇒ Array<Exekutor::Info::Worker>

Purges all workers where the last heartbeat is over the timeout ago.

Parameters:

  • timeout (ActiveSupport::Duration, Numeric, Time) (defaults to: 4.hours)

    the timeout. Default: 4 hours

Returns:



9
10
11
12
13
# File 'lib/exekutor/cleanup.rb', line 9

def cleanup_workers(timeout: 4.hours)
  destroy_before = parse_timeout_arg :timeout, timeout
  # TODO: PG-NOTIFY each worker with an EXIT command
  Exekutor::Info::Worker.where(%("last_heartbeat_at"<?), destroy_before).destroy_all
end