Class: SizedQueue

Inherits:
Queue show all
Defined in:
lib/extensions/thread/thread.rb

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Methods inherited from Queue

#clear, #empty?, #length

Constructor Details

#initialize(max) ⇒ SizedQueue

Creates a fixed-length queue with a maximum size of max.

Raises:

  • (ArgumentError)


247
248
249
250
251
252
253
# File 'lib/extensions/thread/thread.rb', line 247

def initialize(max)
  raise ArgumentError, "queue size must be positive" unless max > 0
  @max = max
  @queue_wait = []
  @queue_wait.taint		# enable tainted comunication
  super()
end

Instance Method Details

#maxObject

Returns the maximum size of the queue.



258
259
260
# File 'lib/extensions/thread/thread.rb', line 258

def max
  @max
end

#max=(max) ⇒ Object

Sets the maximum size of the queue.



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/extensions/thread/thread.rb', line 265

def max=(max)
  diff = nil
  @mutex.synchronize {
    if max <= @max
      @max = max
    else
      diff = max - @max
      @max = max
    end
  }
  if diff
    diff.times do
	begin
 t = @queue_wait.shift
 t.run if t
	rescue ThreadError
 retry
	end
    end
  end
  max
end

#num_waitingObject

Returns the number of threads waiting on the queue.



351
352
353
# File 'lib/extensions/thread/thread.rb', line 351

def num_waiting
  @waiting.size + @queue_wait.size
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/extensions/thread/thread.rb', line 323

def pop(*args)
  retval = super
  @mutex.synchronize {
    if @que.length < @max
      begin
        t = @queue_wait.shift
        t.wakeup if t
      rescue ThreadError
        retry
      end
    end
  }
  retval
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available.



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/extensions/thread/thread.rb', line 292

def push(obj)
  @mutex.synchronize{
    while true
      break if @que.length < @max
      @queue_wait.push Thread.current
      @mutex.sleep
    end

    @que.push obj
    begin
      t = @waiting.shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  }
end