Class: Que::JobBuffer
- Inherits:
-
Object
- Object
- Que::JobBuffer
- Defined in:
- lib/que/job_buffer.rb
Defined Under Namespace
Classes: PriorityQueue
Instance Attribute Summary collapse
-
#maximum_size ⇒ Object
readonly
Returns the value of attribute maximum_size.
-
#priority_queues ⇒ Object
readonly
Returns the value of attribute priority_queues.
Instance Method Summary collapse
- #accept?(metajobs) ⇒ Boolean
- #available_priorities ⇒ Object
- #buffer_space ⇒ Object
- #clear ⇒ Object
-
#initialize(maximum_size:, priorities:) ⇒ JobBuffer
constructor
Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it.
- #job_available?(priority) ⇒ Boolean
- #push(*metajobs) ⇒ Object
- #shift(priority = nil) ⇒ Object
- #shift_job(priority = nil) ⇒ Object
- #size ⇒ Object
- #stop ⇒ Object
- #stopping? ⇒ Boolean
- #to_a ⇒ Object
- #waiting_count ⇒ Object
Constructor Details
#initialize(maximum_size:, priorities:) ⇒ JobBuffer
Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it. So, as a general rule, public methods handle locking the mutex when necessary, while private methods handle the actual underlying data changes. This lets us reuse those private methods without running into locking issues.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/que/job_buffer.rb', line 18 def initialize( maximum_size:, priorities: ) @maximum_size = Que.assert(Integer, maximum_size) Que.assert(maximum_size >= 0) { "maximum_size for a JobBuffer must be at least zero!" } @stop = false @array = [] @mutex = Mutex.new @priority_queues = Hash[ # Make sure that priority = nil sorts highest. priorities.sort_by{|p| p || MAXIMUM_PRIORITY}.map do |p| [p, PriorityQueue.new(priority: p, job_buffer: self)] end ].freeze end |
Instance Attribute Details
#maximum_size ⇒ Object (readonly)
Returns the value of attribute maximum_size.
9 10 11 |
# File 'lib/que/job_buffer.rb', line 9 def maximum_size @maximum_size end |
#priority_queues ⇒ Object (readonly)
Returns the value of attribute priority_queues.
9 10 11 |
# File 'lib/que/job_buffer.rb', line 9 def priority_queues @priority_queues end |
Instance Method Details
#accept?(metajobs) ⇒ Boolean
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/que/job_buffer.rb', line 74 def accept?() .sort! sync do return [] if _stopping? start_index = _buffer_space final_index = .length - 1 return if start_index > final_index index_to_lose = @array.length - 1 start_index.upto(final_index) do |index| if index_to_lose >= 0 && ([index] <=> @array[index_to_lose]) < 0 return if index == final_index index_to_lose -= 1 else return .slice(0...index) end end [] end end |
#available_priorities ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/que/job_buffer.rb', line 107 def available_priorities hash = {} lowest_priority = true priority_queues.reverse_each do |priority, pq| count = pq.waiting_count if lowest_priority count += buffer_space lowest_priority = false end hash[priority || MAXIMUM_PRIORITY] = count if count > 0 end hash end |
#buffer_space ⇒ Object
125 126 127 |
# File 'lib/que/job_buffer.rb', line 125 def buffer_space sync { _buffer_space } end |
#clear ⇒ Object
142 143 144 |
# File 'lib/que/job_buffer.rb', line 142 def clear sync { pop(_size) } end |
#job_available?(priority) ⇒ Boolean
150 151 152 |
# File 'lib/que/job_buffer.rb', line 150 def job_available?(priority) (job = @array.first) && job.priority_sufficient?(priority) end |
#push(*metajobs) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/que/job_buffer.rb', line 37 def push(*) Que.internal_log(:job_buffer_push, self) do { maximum_size: maximum_size, ids: .map(&:id), current_queue: to_a, } end sync do return if _stopping? @array.concat().sort! # Relying on the hash's contents being sorted, here. priority_queues.reverse_each do |_, pq| pq.populate do _shift_job(pq.priority) end end # If we passed the maximum buffer size, drop the lowest sort keys and # return their ids to be unlocked. overage = -_buffer_space pop(overage) if overage > 0 end end |
#shift(priority = nil) ⇒ Object
65 66 67 68 |
# File 'lib/que/job_buffer.rb', line 65 def shift(priority = nil) queue = priority_queues.fetch(priority) { raise Error, "not a permitted priority! #{priority}" } queue.pop || shift_job(priority) end |
#shift_job(priority = nil) ⇒ Object
70 71 72 |
# File 'lib/que/job_buffer.rb', line 70 def shift_job(priority = nil) sync { _shift_job(priority) } end |
#size ⇒ Object
129 130 131 |
# File 'lib/que/job_buffer.rb', line 129 def size sync { _size } end |
#stop ⇒ Object
137 138 139 140 |
# File 'lib/que/job_buffer.rb', line 137 def stop sync { @stop = true } priority_queues.each_value(&:stop) end |
#stopping? ⇒ Boolean
146 147 148 |
# File 'lib/que/job_buffer.rb', line 146 def stopping? sync { _stopping? } end |
#to_a ⇒ Object
133 134 135 |
# File 'lib/que/job_buffer.rb', line 133 def to_a sync { @array.dup } end |
#waiting_count ⇒ Object
99 100 101 102 103 104 105 |
# File 'lib/que/job_buffer.rb', line 99 def waiting_count count = 0 priority_queues.each_value do |pq| count += pq.waiting_count end count end |