Class: Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/thread.rb

Overview

This class provides a way to synchronize communication between threads.

Example:

require 'thread'

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    sleep rand(i) # simulate expense
    queue << i
    puts "#{i} produced"
  end
end

consumer = Thread.new do
  5.times do |i|
    value = queue.pop
    sleep rand(i/2) # simulate expense
    puts "consumed #{value}"
  end
end

consumer.join

Direct Known Subclasses

SizedQueue

Instance Method Summary collapse

Constructor Details

#initializeQueue

Creates a new queue.



150
151
152
153
154
155
156
157
# File 'lib/thread.rb', line 150

def initialize
  @que = []
  @que.taint          # enable tainted communication
  @num_waiting = 0
  self.taint
  @mutex = Mutex.new
  @cond = ConditionVariable.new
end

Instance Method Details

#clearObject

Removes all objects from the queue.



229
230
231
# File 'lib/thread.rb', line 229

def clear
  @que.clear
end

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


222
223
224
# File 'lib/thread.rb', line 222

def empty?
  @que.empty?
end

#lengthObject Also known as: size

Returns the length of the queue.



236
237
238
# File 'lib/thread.rb', line 236

def length
  @que.length
end

#num_waitingObject

Returns the number of threads waiting on the queue.



248
249
250
# File 'lib/thread.rb', line 248

def num_waiting
  @num_waiting
end

#pop(non_block = false) ⇒ Object Also known as: shift, deq

Retrieves data from the queue. If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/thread.rb', line 186

def pop(non_block=false)
  Thread.handle_interrupt(StandardError => :on_blocking) do
    @mutex.synchronize do
      while true
        if @que.empty?
          if non_block
            raise ThreadError, "queue empty"
          else
            begin
              @num_waiting += 1
              @cond.wait @mutex
            ensure
              @num_waiting -= 1
            end
          end
        else
          return @que.shift
        end
      end
    end
  end
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue.



162
163
164
165
166
167
168
169
# File 'lib/thread.rb', line 162

def push(obj)
  Thread.handle_interrupt(StandardError => :on_blocking) do
    @mutex.synchronize do
      @que.push obj
      @cond.signal
    end
  end
end