Class: DistribCore::Leader::QueueWithLease

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/distrib_core/leader/queue_with_lease.rb

Overview

Generic queue with lease.

Additionally it keeps the time of the lease, allowing watchdog to return (repush) timed out entries back to the queue.

Lifecycle of an entry in the queue:

[QUEUED]--(lease)-->[LEASED] -(release)-> out of the queue
      ^---(repush)--/

Constant Summary collapse

SYNC_TIMEOUT_SEC =
60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(entries = []) ⇒ QueueWithLease

Returns a new instance of QueueWithLease.

Parameters:

  • entries (Array<Object>) (defaults to: [])

    the entries to enqueue



23
24
25
26
27
28
29
30
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 23

def initialize(entries = [])
  # To initialize [MonitorMixin](https://ruby-doc.org/3.2.4/exts/monitor/MonitorMixin.html)
  super()
  @entries = entries.dup
  @leased = {}
  @completed = Set.new
  @initialized_at = Time.now
end

Instance Attribute Details

#initialized_atObject (readonly)

Returns the value of attribute initialized_at.



20
21
22
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 20

def initialized_at
  @initialized_at
end

#last_activity_atObject (readonly)

Returns the value of attribute last_activity_at.



20
21
22
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 20

def last_activity_at
  @last_activity_at
end

Instance Method Details

#completed?(entry) ⇒ TrueClass, FalseClass

Returns ‘true` if `entry` was already completed.

Parameters:

  • entry (String)

Returns:

  • (TrueClass, FalseClass)

    ‘true` if `entry` was already completed



76
77
78
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 76

def completed?(entry)
  synchronize_with_timeout { completed.include?(entry) }
end

#completed_sizeInteger

Returns amount of completed entries.

Returns:

  • (Integer)

    amount of completed entries



91
92
93
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 91

def completed_size
  completed.size
end

#empty?TrueClass, FalseClass

Returns ‘true` if there is no more enqueued or leased entries.

Returns:

  • (TrueClass, FalseClass)

    ‘true` if there is no more enqueued or leased entries



81
82
83
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 81

def empty?
  size.zero?
end

#entries_listArray<String>

Returns Lists of tests in the queue.

Returns:

  • (Array<String>)

    Lists of tests in the queue



108
109
110
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 108

def entries_list
  synchronize_with_timeout { entries.dup }
end

#leaseObject

Returns the next entry in the queue.

Returns:

  • (Object)

    the next entry in the queue



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 33

def lease
  loop do
    sleep 0.1

    entry = synchronize_with_timeout { entries.pop }
    next unless entry

    next if completed?(entry)

    record_lease(entry)
    return entry
  end
end

#leased_sizeInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns amount of leased entries.

Returns:

  • (Integer)

    amount of leased entries



97
98
99
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 97

def leased_size
  leased.size
end

#release(entry) ⇒ NilClass, Set<String>

Returns ‘nil` if was already completed.

Parameters:

  • entry (String)

Returns:

  • (NilClass, Set<String>)

    ‘nil` if was already completed



65
66
67
68
69
70
71
72
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 65

def release(entry)
  return if completed?(entry)

  synchronize_with_timeout do
    leased.delete(entry)
    completed.add(entry)
  end
end

#repush(entry) ⇒ Object

It’s only necessary to remove the entry from the list of leased ones, and this have to be done atomically with pushing to the queue to avoid race conditions when the entry is released by another thread, or there’s an attempt to lease it and we release it immediately after.

Parameters:

  • entry (String)


53
54
55
56
57
58
59
60
61
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 53

def repush(entry)
  synchronize_with_timeout do
    leased.delete(entry)

    # We want to insert an entry before the last one, so it won't be leased again with the same worker
    # If there is no last entry, we just push it to the end
    entries.insert(entries.empty? ? -1 : -2, entry)
  end
end

#select_leasedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Iterate over leased entries



103
104
105
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 103

def select_leased(...)
  synchronize_with_timeout { leased.dup.select(...) }
end

#sizeInteger

Returns amount of not completed entries.

Returns:

  • (Integer)

    amount of not completed entries



86
87
88
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 86

def size
  synchronize_with_timeout { leased.size + entries.size }
end

#visited?TrueClass, FalseClass

Returns ‘true` if there was already some activity in the queue.

Returns:

  • (TrueClass, FalseClass)

    ‘true` if there was already some activity in the queue



113
114
115
# File 'lib/distrib_core/leader/queue_with_lease.rb', line 113

def visited?
  @last_activity_at != nil
end