Class: Que::JobBuffer::PriorityQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/que/job_buffer.rb

Overview

A queue object dedicated to a specific worker priority. It’s basically a Queue object from the standard library, but it’s able to reach into the JobBuffer’s buffer in order to satisfy a pop.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_buffer:, priority:) ⇒ PriorityQueue

Returns a new instance of PriorityQueue.



190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/que/job_buffer.rb', line 190

def initialize(
  job_buffer:,
  priority:
)
  @job_buffer = job_buffer
  @priority   = priority
  @waiting    = 0
  @stopping   = false
  @items      = [] # Items pending distribution to waiting threads.
  @mutex      = Mutex.new
  @cv         = ConditionVariable.new
end

Instance Attribute Details

#job_bufferObject (readonly)

Returns the value of attribute job_buffer.



188
189
190
# File 'lib/que/job_buffer.rb', line 188

def job_buffer
  @job_buffer
end

#mutexObject (readonly)

Returns the value of attribute mutex.



188
189
190
# File 'lib/que/job_buffer.rb', line 188

def mutex
  @mutex
end

#priorityObject (readonly)

Returns the value of attribute priority.



188
189
190
# File 'lib/que/job_buffer.rb', line 188

def priority
  @priority
end

Instance Method Details

#popObject



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/que/job_buffer.rb', line 203

def pop
  sync do
    loop do
      if @stopping
        return false
      elsif item = @items.pop
        return item
      elsif job_buffer.job_available?(priority)
        return false
      end

      @waiting += 1
      @cv.wait(mutex)
      @waiting -= 1
    end
  end
end

#populateObject



228
229
230
231
232
233
234
235
236
# File 'lib/que/job_buffer.rb', line 228

def populate
  sync do
    waiting_count.times do
      job = yield
      break if job.nil? # False would mean we're stopping.
      _push(job)
    end
  end
end

#stopObject



221
222
223
224
225
226
# File 'lib/que/job_buffer.rb', line 221

def stop
  sync do
    @stopping = true
    @cv.broadcast
  end
end

#waiting_countObject



238
239
240
# File 'lib/que/job_buffer.rb', line 238

def waiting_count
  @waiting
end