Class: Queue
Overview
The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.
The class implements FIFO type of queue. In a FIFO queue, the first tasks added are the first retrieved.
Example:
queue = Queue.new
producer = Thread.new do
5.times do |i|
sleep rand(i) # simulate expense
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
5.times do |i|
value = queue.pop
sleep rand(i/2) # simulate expense
puts "consumed #{value}"
end
end
consumer.join
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#close ⇒ Object
Closes the queue.
-
#closed? ⇒ Boolean
Returns
true
if the queue is closed. -
#empty? ⇒ Boolean
Returns
true
if the queue is empty. -
#initialize ⇒ Object
constructor
Creates a new queue instance.
-
#length ⇒ Object
(also: #size)
Returns the length of the queue.
-
#marshal_dump ⇒ Object
:nodoc:.
-
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop(*args) ⇒ Object
(also: #deq, #shift)
Retrieves data from the queue.
-
#push(obj) ⇒ Object
(also: #enq, #<<)
Pushes the given
object
to the queue.
Constructor Details
#initialize ⇒ Object
Creates a new queue instance.
769 770 771 772 773 774 775 776 |
# File 'thread_sync.c', line 769
static VALUE
rb_queue_initialize(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
RB_OBJ_WRITE(self, &q->que, ary_buf_new());
list_head_init(queue_waitq(q));
return self;
}
|
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
983 984 985 986 987 988 989 990 |
# File 'thread_sync.c', line 983
static VALUE
rb_queue_clear(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
rb_ary_clear(check_array(self, q->que));
return self;
}
|
#close ⇒ Object
Closes the queue. A closed queue cannot be re-opened.
After the call to close completes, the following are true:
-
closed?
will return true -
close
will be ignored. -
calling enq/push/<< will raise a
ClosedQueueError
. -
when
empty?
is false, calling deq/pop/shift will return an object from the queue as usual. -
when
empty?
is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise aThreadError
.
ClosedQueueError is inherited from StopIteration, so that you can break loop block.
Example:
q = Queue.new
Thread.new{
while e = q.deq # wait for nil to break loop
# ...
end
}
q.close
822 823 824 825 826 827 828 829 830 831 832 833 834 |
# File 'thread_sync.c', line 822
static VALUE
rb_queue_close(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
if (!queue_closed_p(self)) {
FL_SET(self, QUEUE_CLOSED);
wakeup_all(queue_waitq(q));
}
return self;
}
|
#closed? ⇒ Boolean
Returns true
if the queue is closed.
843 844 845 846 847 |
# File 'thread_sync.c', line 843
static VALUE
rb_queue_closed_p(VALUE self)
{
return queue_closed_p(self) ? Qtrue : Qfalse;
}
|
#empty? ⇒ Boolean
Returns true
if the queue is empty.
971 972 973 974 975 |
# File 'thread_sync.c', line 971
static VALUE
rb_queue_empty_p(VALUE self)
{
return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
}
|
#length ⇒ Object #size ⇒ Object Also known as: size
Returns the length of the queue.
1001 1002 1003 1004 1005 |
# File 'thread_sync.c', line 1001
static VALUE
rb_queue_length(VALUE self)
{
return LONG2NUM(queue_length(self, queue_ptr(self)));
}
|
#marshal_dump ⇒ Object
:nodoc:
1441 1442 1443 1444 1445 1446 |
# File 'thread_sync.c', line 1441
static VALUE
undumpable(VALUE obj)
{
rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
UNREACHABLE_RETURN(Qnil);
}
|
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
1013 1014 1015 1016 1017 1018 1019 |
# File 'thread_sync.c', line 1013
static VALUE
rb_queue_num_waiting(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
return INT2NUM(q->num_waiting);
}
|
#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.
957 958 959 960 961 962 |
# File 'thread_sync.c', line 957
static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return queue_do_pop(self, queue_ptr(self), should_block);
}
|