Class: QueuedTask
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- QueuedTask
- Includes:
- Plok::Loggable
- Defined in:
- app/models/queued_task.rb
Constant Summary collapse
- DEFAULT_PRIORITY =
0
- HIGH_PRIORITY =
10
Class Method Summary collapse
-
.queue(klass, data, weight = DEFAULT_PRIORITY) ⇒ Object
TODO: Might be good to use named parameters for data and weight here.
- .queue_unless_already_queued(klass, data, weight = DEFAULT_PRIORITY) ⇒ Object
Instance Method Summary collapse
- #dequeue! ⇒ Object
- #execute! ⇒ Object
- #increase_attempts! ⇒ Object
- #lock! ⇒ Object
- #process! ⇒ Object
-
#reset_attempts! ⇒ Object
You want this separate from #unlock so the ensure block in #process! does not keep resetting attempts when it shouldn’t.
- #stuck? ⇒ Boolean
- #unlock! ⇒ Object
- #unlocked? ⇒ Boolean
Methods included from Plok::Loggable
Class Method Details
.queue(klass, data, weight = DEFAULT_PRIORITY) ⇒ Object
TODO: Might be good to use named parameters for data and weight here. Might be able to use the data var to store weight like the perform_at key.
TODO: Refactor to a separate class.
36 37 38 39 40 41 42 43 44 45 46 |
# File 'app/models/queued_task.rb', line 36 def self.queue(klass, data, weight = DEFAULT_PRIORITY) task = create!( klass: klass.to_s, weight: weight, attempts: 0, data: data&.except(:perform_at) ) task.update(perform_at: data[:perform_at]) if data&.dig(:perform_at).present? task end |
.queue_unless_already_queued(klass, data, weight = DEFAULT_PRIORITY) ⇒ Object
48 49 50 51 52 |
# File 'app/models/queued_task.rb', line 48 def self.queue_unless_already_queued(klass, data, weight = DEFAULT_PRIORITY) task = find_by(klass: klass, data: data, weight: weight) return task if task.present? self.queue(klass, data, weight) end |
Instance Method Details
#dequeue! ⇒ Object
54 55 56 |
# File 'app/models/queued_task.rb', line 54 def dequeue! destroy end |
#execute! ⇒ Object
28 29 30 |
# File 'app/models/queued_task.rb', line 28 def execute! klass.to_s.constantize.new(data).execute! end |
#increase_attempts! ⇒ Object
65 66 67 |
# File 'app/models/queued_task.rb', line 65 def increase_attempts! update_column(:attempts, attempts + 1) end |
#lock! ⇒ Object
16 17 18 |
# File 'app/models/queued_task.rb', line 16 def lock! update_attribute(:locked, true) end |
#process! ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'app/models/queued_task.rb', line 69 def process! lock! begin execute! dequeue! rescue raise ensure if persisted? increase_attempts! unlock! end end end |
#reset_attempts! ⇒ Object
You want this separate from #unlock so the ensure block in #process! does not keep resetting attempts when it shouldn’t. This should be called from the controller where you manually unlock tasks in your backend.
61 62 63 |
# File 'app/models/queued_task.rb', line 61 def reset_attempts! update_column(:attempts, 0) end |
#stuck? ⇒ Boolean
85 86 87 88 89 90 |
# File 'app/models/queued_task.rb', line 85 def stuck? return false if locked? # Make sure task is past its perform_at timestamp. return perform_at <= 30.minutes.ago if perform_at.present? created_at <= 30.minutes.ago end |
#unlock! ⇒ Object
20 21 22 |
# File 'app/models/queued_task.rb', line 20 def unlock! update_attribute(:locked, false) end |
#unlocked? ⇒ Boolean
24 25 26 |
# File 'app/models/queued_task.rb', line 24 def unlocked? !locked? end |