Class: SolidQueue::RecurringTask

Inherits:
Record
  • Object
show all
Defined in:
app/models/solid_queue/recurring_task.rb

Defined Under Namespace

Classes: Arguments

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Record

non_blocking_lock

Class Method Details

.create_or_update_all(tasks) ⇒ Object



38
39
40
41
42
43
44
45
46
47
# File 'app/models/solid_queue/recurring_task.rb', line 38

def create_or_update_all(tasks)
  if connection.supports_insert_conflict_target?
    # PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict
    # during two concurrent INSERTs for the same value of an unique index. We need to explicitly
    # indicate unique_by to ignore duplicate rows by this value when inserting
    upsert_all tasks.map(&:attributes_for_upsert), unique_by: :key
  else
    upsert_all tasks.map(&:attributes_for_upsert)
  end
end

.from_configuration(key, **options) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
# File 'app/models/solid_queue/recurring_task.rb', line 25

def from_configuration(key, **options)
  new \
    key: key,
    class_name: options[:class],
    command: options[:command],
    arguments: options[:args],
    schedule: options[:schedule],
    queue_name: options[:queue].presence,
    priority: options[:priority].presence,
    description: options[:description],
    static: true
end

.wrap(args) ⇒ Object



21
22
23
# File 'app/models/solid_queue/recurring_task.rb', line 21

def wrap(args)
  args.is_a?(self) ? args : from_configuration(args.first, **args.second)
end

Instance Method Details

#attributes_for_upsertObject



100
101
102
# File 'app/models/solid_queue/recurring_task.rb', line 100

def attributes_for_upsert
  attributes.without("id", "created_at", "updated_at")
end

#delay_from_nowObject



50
51
52
# File 'app/models/solid_queue/recurring_task.rb', line 50

def delay_from_now
  [ (next_time - Time.current).to_f, 0 ].max
end

#enqueue(at:) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'app/models/solid_queue/recurring_task.rb', line 70

def enqueue(at:)
  SolidQueue.instrument(:enqueue_recurring_task, task: key, at: at) do |payload|
    active_job = if using_solid_queue_adapter?
      enqueue_and_record(run_at: at)
    else
      payload[:other_adapter] = true

      perform_later.tap do |job|
        unless job.successfully_enqueued?
          payload[:enqueue_error] = job.enqueue_error&.message
        end
      end
    end

    active_job.tap do |enqueued_job|
      payload[:active_job_id] = enqueued_job.job_id
    end
  rescue RecurringExecution::AlreadyRecorded
    payload[:skipped] = true
    false
  rescue Job::EnqueueError => error
    payload[:enqueue_error] = error.message
    false
  end
end

#last_enqueued_timeObject



62
63
64
65
66
67
68
# File 'app/models/solid_queue/recurring_task.rb', line 62

def last_enqueued_time
  if recurring_executions.loaded?
    recurring_executions.map(&:run_at).max
  else
    recurring_executions.maximum(:run_at)
  end
end

#next_timeObject



54
55
56
# File 'app/models/solid_queue/recurring_task.rb', line 54

def next_time
  parsed_schedule.next_time.utc
end

#previous_timeObject



58
59
60
# File 'app/models/solid_queue/recurring_task.rb', line 58

def previous_time
  parsed_schedule.previous_time.utc
end

#to_sObject



96
97
98
# File 'app/models/solid_queue/recurring_task.rb', line 96

def to_s
  "#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule.original} ]"
end