Class: ImprovedQueue::ImprovedSizedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/improved-queue.rb

Overview

In most respects this class can be used identically to SizedQueue. It implements all of the methods which are usually available, but bears the important distinction that any thread blocking on a dequeue or enqueue operation can be “woken up” by calling the unblock methods and passing a thread object or an array of thread objects.

This can be done independently of the status of the queue (full, empty, etc.), so it is superior to makeshift resolution mechanisms such as passing a custom message or dummy item into the queue, because there is no need to maintain flags or Mutex objects in your program to ensure that such an operation is not at risk of causing deadlock.

When a blocking thread receives the unblock signal, it will abandon its planned operation and raise UnblockError. As UnblockError inherits from StopIteration, it can be rescued in a variety of ways, including implicitly through Kernel.loop, allowing you to further simplify your program logic:

queue = ImprovedQueue.new 50
threads = []

# a consumer thread
threads << Thread.new do
  loop do
    val = queue.pop
    do_something(val)
  end
end

# another consumer
threads << Thread.new do
  loop do
    val = queue.pop
    do_something(val)
  end
end

# a producer
threads << Thread.new do
  while not_finished_yet == true
    queue << some_stuff
    ...
  end

  queue.unblock_deq [threads[0], threads[1]]
end

threads.each {|thr| thr.join }

One important thing to note: calling #unblock_deq is always safe (will never lose data), but when calling #unblock_enq, it is your responsibility to decide what is to be done with an item which failed to enqueue:

queue = ImprovedQueue.new 10

1.upto(11) do |x|
  begin
    queue << x
  rescue ImprovedQueue::UnblockError
    dont_lose_things(x)
  end
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ ImprovedSizedQueue

Returns a new instance of ImprovedQueue::ImprovedSizedQueue. Argument to ImprovedSizedQueue.new must be an integer value greater than 1.

Raises:

  • (TypeError)


137
138
139
140
141
142
143
144
145
146
147
# File 'lib/improved-queue.rb', line 137

def initialize(size)
  raise TypeError unless size.is_a? Integer and size > 1
  @que = []
  @max = size
  @mutex = Mutex.new
  @kill_enq = []
  @kill_deq = []
  @deq_waiting = []
  @enq_waiting = []
  @unblock = UnblockError.new
end

Instance Attribute Details

#maxObject

Returns the current maximum size of the queue.



152
153
154
# File 'lib/improved-queue.rb', line 152

def max
  @max
end

Instance Method Details

#clearObject

Deletes all items from the queue. Threads waiting to dequeue will continue waiting; threads waiting to enqueue will be allowed to proceed (until the queue is full again).



353
354
355
356
357
358
# File 'lib/improved-queue.rb', line 353

def clear
  @mutex.synchronize do
    @que.clear
    enqueue_multiple_waiting
  end
end

#clear_blacklists(thrs = nil) ⇒ Object

Removes a thread or threads from both blacklists and allows them to block on either enqueue or dequeue. Equivalent to calling:

queue.clear_deq thread1
queue.clear_enq thread1


318
319
320
321
322
323
# File 'lib/improved-queue.rb', line 318

def clear_blacklists(thrs=nil)
  @mutex.synchronize do
    enq_allow thrs
    deq_allow thrs
  end
end

#clear_deq(thrs = nil) ⇒ Object

Clear the dequeue blacklist or remove specified threads to allow them to block on dequeue once again:

queue.clear_deq([thread1, thread2, ...])
queue.clear_deq thread3
queue.clear_deq  # allow all threads to block on dequeue

Note: raises a TypeError if you attempt to remove specific threads from the blacklist after unblocking all. i.e. this is okay:

queue.unblock_deq [thread1, thread2]
queue.clear_deq thread2

… but this is not:

queue.unblock_deq
queue.clear_deq thread2


307
308
309
# File 'lib/improved-queue.rb', line 307

def clear_deq(thrs=nil)
  @mutex.synchronize { deq_allow thrs }
end

#clear_enq(thrs = nil) ⇒ Object

Clear the enqueue blacklist or remove specified threads to allow them to block on enqueue once again:

queue.clear_enq([thread1, thread2, ...])
queue.clear_enq thread3
queue.clear_enq  # allow all threads to block on enqueue

Note: raises a TypeError if you attempt to remove specific threads from the blacklist after unblocking all. i.e. this is okay:

queue.unblock_enq [thread1, thread2]
queue.clear_enq thread2

… but this is not:

queue.unblock_enq
queue.clear_enq thread2


284
285
286
# File 'lib/improved-queue.rb', line 284

def clear_enq(thrs=nil)
  @mutex.synchronize { enq_allow thrs }
end

#deqObject Also known as: pop, shift

Removes and returns the next item from the queue. Will block if the queue is empty, until an item becomes available to dequeue, or until #unblock_deq is called, in which case it will abandon the dequeue and raise UnblockError.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/improved-queue.rb', line 160

def deq
  @mutex.lock
  if @que.size > 0
    val = @que.shift
    if next_waiting = @enq_waiting.shift
      @que << next_waiting[:value]
      next_waiting[:pipe] << true
    end
    @mutex.unlock
  else
    if @kill_deq == true or @kill_deq.include? Thread.current
      @mutex.unlock
      raise UnblockError
    end
    pipe = Queue.new
    @deq_waiting << {thread: Thread.current, pipe: pipe}
    @mutex.unlock
    val = pipe.pop
    raise UnblockError if @unblock.equal? val
  end
  val
end

#empty?Boolean

Returns true if the queue is empty; false otherwise.

Returns:

  • (Boolean)


337
338
339
# File 'lib/improved-queue.rb', line 337

def empty?
  @mutex.synchronize { @que.size == 0 }
end

#enq(val) ⇒ Object Also known as: <<, push

Adds an item to the queue. Will block if the queue is full, until an item is removed from the queue, or until #unblock_enq is called, in which case it will abandon the enqueue and raise an UnblockError. Important: If data integrity is important, you will need to include any code to deal with the orphaned item in your rescue clause.



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/improved-queue.rb', line 193

def enq(val)
  @mutex.lock
  if @que.size < @max
    if next_waiting = @deq_waiting.shift
      next_waiting[:pipe] << val
    else
      @que << val
    end
    @mutex.unlock
  else
    if @kill_enq == true or @kill_enq.include? Thread.current
      @mutex.unlock
      raise UnblockError
    end
    pipe = Queue.new
    @enq_waiting << {thread: Thread.current, pipe: pipe, value: val}
    @mutex.unlock
    answer = pipe.pop
    raise UnblockError if @unblock.equal? answer
  end
  nil
end

#lengthObject Also known as: size

Returns the total number of items currently in the queue.



328
329
330
# File 'lib/improved-queue.rb', line 328

def length
  @mutex.synchronize { @que.size }
end

#num_waitingObject

Returns the number of threads currently waiting on the queue.



344
345
346
# File 'lib/improved-queue.rb', line 344

def num_waiting
  @mutex.synchronize { @enq_waiting.size + @deq_waiting.size }
end

#resetObject

Completely resets the queue: deletes all queue contents, raises UnblockError in any waiting threads, and clears all blacklists, such that all threads can once again block on enqueue or dequeue.



365
366
367
368
369
370
371
372
373
# File 'lib/improved-queue.rb', line 365

def reset
  @mutex.synchronize do
    @que.clear
    @kill_enq = []
    @kill_deq = []
    @enq_waiting.each {|waiting| waiting[:pipe] << @unblock }.clear
    @deq_waiting.each {|waiting| waiting[:pipe] << @unblock }.clear
  end
end

#unblock_deq(thrs = nil) ⇒ Object

Call this method to notify blocking threads that they should no longer wait to dequeue:

queue.unblock_deq([thread1, thread2, ...])
queue.unblock_deq thread3
queue.unblock_deq  # unblock all threads

Note that the effect is permanent; once a thread has been unblocked, it is blacklisted and can no longer wait on this queue. To clear the dequeue blacklist, see the #clear_blacklists and #clear_deq methods.



247
248
249
# File 'lib/improved-queue.rb', line 247

def unblock_deq(thrs=nil)
  @mutex.synchronize { deq_shutdown thrs }
end

#unblock_enq(thrs = nil) ⇒ Object

Call this method to notify blocking threads that they should no longer wait to enqueue:

queue.unblock_enq([thread1, thread2, ...])
queue.unblock_enq thread3
queue.unblock_enq  # unblock all threads

Note that the effect is permanent; once a thread has been unblocked, it is blacklisted and can no longer wait on this queue. To clear the enqueue blacklist, see the #clear_blacklists and #clear_enq methods.



231
232
233
# File 'lib/improved-queue.rb', line 231

def unblock_enq(thrs=nil)
  @mutex.synchronize { enq_shutdown thrs }
end

#unblock_waiting(thrs = nil) ⇒ Object

Stops a thread from blocking on either enqueue or dequeue. Equivalent to calling:

queue.unblock_deq thread1
queue.unblock_enq thread1


258
259
260
261
262
263
# File 'lib/improved-queue.rb', line 258

def unblock_waiting(thrs=nil)
  @mutex.synchronize do
    enq_shutdown thrs
    deq_shutdown thrs
  end
end