Class: SizedQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/thread.rb

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Methods inherited from Queue

#clear, #empty?, #length

Constructor Details

#initialize(max) ⇒ SizedQueue

Creates a fixed-length queue with a maximum size of max.

Raises:

  • (ArgumentError)


263
264
265
266
267
268
269
# File 'lib/thread.rb', line 263

def initialize(max)
  raise ArgumentError, "queue size must be positive" unless max > 0
  @max = max
  @enque_cond = ConditionVariable.new
  @num_enqueue_waiting = 0
  super()
end

Instance Method Details

#maxObject

Returns the maximum size of the queue.



274
275
276
# File 'lib/thread.rb', line 274

def max
  @max
end

#max=(max) ⇒ Object

Sets the maximum size of the queue.

Raises:

  • (ArgumentError)


281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/thread.rb', line 281

def max=(max)
  raise ArgumentError, "queue size must be positive" unless max > 0

  @mutex.synchronize do
    if max <= @max
      @max = max
    else
      diff = max - @max
      @max = max
      diff.times do
        @enque_cond.signal
      end
    end
  end
  max
end

#num_waitingObject

Returns the number of threads waiting on the queue.



357
358
359
# File 'lib/thread.rb', line 357

def num_waiting
  @num_waiting + @num_enqueue_waiting
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



334
335
336
337
338
339
340
341
342
# File 'lib/thread.rb', line 334

def pop(*args)
  retval = super
  @mutex.synchronize do
    if @que.length < @max
      @enque_cond.signal
    end
  end
  retval
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available.



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/thread.rb', line 302

def push(obj)
  Thread.handle_interrupt(RuntimeError => :on_blocking) do
    @mutex.synchronize do
      while true
        break if @que.length < @max
        @num_enqueue_waiting += 1
        begin
          @enque_cond.wait @mutex
        ensure
          @num_enqueue_waiting -= 1
        end
      end

      @que.push obj
      @cond.signal
    end
  end
end