Class: Queue

Inherits:
Object show all
Defined in:
lib/extensions/thread/thread.rb

Overview

This class provides a way to synchronize communication between threads.

Example:

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

Direct Known Subclasses

SizedQueue

Instance Method Summary collapse

Constructor Details

#initializeQueue

Creates a new queue.



140
141
142
143
144
145
146
147
# File 'lib/extensions/thread/thread.rb', line 140

def initialize
  @que = []
  @waiting = []
  @que.taint		# enable tainted comunication
  @waiting.taint
  self.taint
  @mutex = Mutex.new
end

Instance Method Details

#clearObject

Removes all objects from the queue.



213
214
215
# File 'lib/extensions/thread/thread.rb', line 213

def clear
  @que.clear
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


206
207
208
# File 'lib/extensions/thread/thread.rb', line 206

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



220
221
222
# File 'lib/extensions/thread/thread.rb', line 220

def length
  @que.length
end

#num_waitingObject

Returns the number of threads waiting on the queue.



232
233
234
# File 'lib/extensions/thread/thread.rb', line 232

def num_waiting
  @waiting.size
end

#pop(non_block = false) ⇒ Object Also known as: shift, deq

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/extensions/thread/thread.rb', line 179

def pop(non_block=false)
  @mutex.synchronize{
    while true
      if @que.empty?
        raise ThreadError, "queue empty" if non_block
        @waiting.push Thread.current
        @mutex.sleep
      else
        return @que.shift
      end
    end
  }
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue.



152
153
154
155
156
157
158
159
160
161
162
# File 'lib/extensions/thread/thread.rb', line 152

def push(obj)
  @mutex.synchronize{
    @que.push obj
    begin
      t = @waiting.shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  }
end