Class: QueuedTask

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Plok::Loggable
Defined in:
app/models/queued_task.rb

Constant Summary collapse

DEFAULT_PRIORITY =
0
HIGH_PRIORITY =
10

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Plok::Loggable

#log

Class Method Details

.queue(klass, data, weight = DEFAULT_PRIORITY) ⇒ Object

TODO: Might be good to use named parameters for data and weight here. Might be able to use the data var to store weight like the perform_at key.

TODO: Refactor to a separate class.



36
37
38
39
40
41
42
43
44
45
46
# File 'app/models/queued_task.rb', line 36

def self.queue(klass, data, weight = DEFAULT_PRIORITY)
  task = create!(
    klass: klass.to_s,
    weight: weight,
    attempts: 0,
    data: data&.except(:perform_at)
  )

  task.update(perform_at: data[:perform_at]) if data&.dig(:perform_at).present?
  task
end

.queue_unless_already_queued(klass, data, weight = DEFAULT_PRIORITY) ⇒ Object



48
49
50
51
52
# File 'app/models/queued_task.rb', line 48

def self.queue_unless_already_queued(klass, data, weight = DEFAULT_PRIORITY)
  task = find_by(klass: klass, data: data, weight: weight)
  return task if task.present?
  self.queue(klass, data, weight)
end

Instance Method Details

#dequeue!Object



54
55
56
# File 'app/models/queued_task.rb', line 54

def dequeue!
  destroy
end

#execute!Object



28
29
30
# File 'app/models/queued_task.rb', line 28

def execute!
  klass.to_s.constantize.new(data).execute!
end

#increase_attempts!Object



65
66
67
# File 'app/models/queued_task.rb', line 65

def increase_attempts!
  update_column(:attempts, attempts + 1)
end

#lock!Object



16
17
18
# File 'app/models/queued_task.rb', line 16

def lock!
  update_attribute(:locked, true)
end

#process!Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'app/models/queued_task.rb', line 69

def process!
  lock!

  begin
    execute!
    dequeue!
  rescue
    raise
  ensure
    if persisted?
      increase_attempts!
      unlock!
    end
  end
end

#reset_attempts!Object

You want this separate from #unlock so the ensure block in #process! does not keep resetting attempts when it shouldn’t. This should be called from the controller where you manually unlock tasks in your backend.



61
62
63
# File 'app/models/queued_task.rb', line 61

def reset_attempts!
  update_column(:attempts, 0)
end

#stuck?Boolean

Returns:

  • (Boolean)


85
86
87
88
89
90
# File 'app/models/queued_task.rb', line 85

def stuck?
  return false if locked?
  # Make sure task is past its perform_at timestamp.
  return perform_at <= 30.minutes.ago if perform_at.present?
  created_at <= 30.minutes.ago
end

#unlock!Object



20
21
22
# File 'app/models/queued_task.rb', line 20

def unlock!
  update_attribute(:locked, false)
end

#unlocked?Boolean

Returns:

  • (Boolean)


24
25
26
# File 'app/models/queued_task.rb', line 24

def unlocked?
  !locked?
end