Class: DistribCore::Leader::QueueWithLease
- Inherits:
-
Object
- Object
- DistribCore::Leader::QueueWithLease
- Includes:
- MonitorMixin
- Defined in:
- lib/distrib_core/leader/queue_with_lease.rb
Overview
Generic queue with lease.
Additionally it keeps the time of the lease, allowing watchdog to return (repush) timed out entries back to the queue.
Lifecycle of an entry in the queue:
[QUEUED]--(lease)-->[LEASED] -(release)-> out of the queue
^---(repush)--/
Constant Summary collapse
- SYNC_TIMEOUT_SEC =
60
Instance Attribute Summary collapse
-
#initialized_at ⇒ Object
readonly
Returns the value of attribute initialized_at.
-
#last_activity_at ⇒ Object
readonly
Returns the value of attribute last_activity_at.
Instance Method Summary collapse
-
#completed?(entry) ⇒ TrueClass, FalseClass
‘true` if `entry` was already completed.
-
#completed_size ⇒ Integer
Amount of completed entries.
-
#empty? ⇒ TrueClass, FalseClass
‘true` if there is no more enqueued or leased entries.
-
#entries_list ⇒ Array<String>
Lists of tests in the queue.
-
#initialize(entries = []) ⇒ QueueWithLease
constructor
A new instance of QueueWithLease.
-
#lease ⇒ Object
The next entry in the queue.
-
#leased_size ⇒ Integer
private
Amount of leased entries.
-
#release(entry) ⇒ NilClass, Set<String>
‘nil` if was already completed.
-
#repush(entry) ⇒ Object
It’s only necessary to remove the entry from the list of leased ones, and this have to be done atomically with pushing to the queue to avoid race conditions when the entry is released by another thread, or there’s an attempt to lease it and we release it immediately after.
-
#select_leased ⇒ Object
private
Iterate over leased entries.
-
#size ⇒ Integer
Amount of not completed entries.
-
#visited? ⇒ TrueClass, FalseClass
‘true` if there was already some activity in the queue.
Constructor Details
#initialize(entries = []) ⇒ QueueWithLease
Returns a new instance of QueueWithLease.
23 24 25 26 27 28 29 30 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 23 def initialize(entries = []) # To initialize [MonitorMixin](https://ruby-doc.org/3.2.4/exts/monitor/MonitorMixin.html) super() @entries = entries.dup @leased = {} @completed = Set.new @initialized_at = Time.now end |
Instance Attribute Details
#initialized_at ⇒ Object (readonly)
Returns the value of attribute initialized_at.
20 21 22 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 20 def initialized_at @initialized_at end |
#last_activity_at ⇒ Object (readonly)
Returns the value of attribute last_activity_at.
20 21 22 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 20 def last_activity_at @last_activity_at end |
Instance Method Details
#completed?(entry) ⇒ TrueClass, FalseClass
Returns ‘true` if `entry` was already completed.
76 77 78 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 76 def completed?(entry) synchronize_with_timeout { completed.include?(entry) } end |
#completed_size ⇒ Integer
Returns amount of completed entries.
91 92 93 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 91 def completed_size completed.size end |
#empty? ⇒ TrueClass, FalseClass
Returns ‘true` if there is no more enqueued or leased entries.
81 82 83 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 81 def empty? size.zero? end |
#entries_list ⇒ Array<String>
Returns Lists of tests in the queue.
108 109 110 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 108 def entries_list synchronize_with_timeout { entries.dup } end |
#lease ⇒ Object
Returns the next entry in the queue.
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 33 def lease loop do sleep 0.1 entry = synchronize_with_timeout { entries.pop } next unless entry next if completed?(entry) record_lease(entry) return entry end end |
#leased_size ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns amount of leased entries.
97 98 99 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 97 def leased_size leased.size end |
#release(entry) ⇒ NilClass, Set<String>
Returns ‘nil` if was already completed.
65 66 67 68 69 70 71 72 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 65 def release(entry) return if completed?(entry) synchronize_with_timeout do leased.delete(entry) completed.add(entry) end end |
#repush(entry) ⇒ Object
It’s only necessary to remove the entry from the list of leased ones, and this have to be done atomically with pushing to the queue to avoid race conditions when the entry is released by another thread, or there’s an attempt to lease it and we release it immediately after.
53 54 55 56 57 58 59 60 61 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 53 def repush(entry) synchronize_with_timeout do leased.delete(entry) # We want to insert an entry before the last one, so it won't be leased again with the same worker # If there is no last entry, we just push it to the end entries.insert(entries.empty? ? -1 : -2, entry) end end |
#select_leased ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Iterate over leased entries
103 104 105 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 103 def select_leased(...) synchronize_with_timeout { leased.dup.select(...) } end |
#size ⇒ Integer
Returns amount of not completed entries.
86 87 88 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 86 def size synchronize_with_timeout { leased.size + entries.size } end |
#visited? ⇒ TrueClass, FalseClass
Returns ‘true` if there was already some activity in the queue.
113 114 115 |
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 113 def visited? @last_activity_at != nil end |