Class: Queue
Overview
The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.
The class implements FIFO type of queue. In a FIFO queue, the first tasks added are the first retrieved.
Example:
queue = Queue.new
producer = Thread.new do
5.times do |i|
sleep rand(i) # simulate expense
queue << i
puts "#{i} produced"
end
end
consumer = Thread.new do
5.times do |i|
value = queue.pop
sleep rand(i/2) # simulate expense
puts "consumed #{value}"
end
end
consumer.join
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#close ⇒ Object
Closes the queue.
-
#closed? ⇒ Boolean
Returns
true
if the queue is closed. -
#empty? ⇒ Boolean
Returns
true
if the queue is empty. -
#initialize ⇒ Object
constructor
Creates a new queue instance.
-
#length ⇒ Object
(also: #size)
Returns the length of the queue.
-
#marshal_dump ⇒ Object
:nodoc:.
-
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop(*args) ⇒ Object
(also: #deq, #shift)
Retrieves data from the queue.
-
#push(obj) ⇒ Object
(also: #enq, #<<)
Pushes the given
object
to the queue.
Constructor Details
#initialize ⇒ Object
Creates a new queue instance.
845 846 847 848 849 850 851 852 |
# File 'thread_sync.c', line 845
static VALUE
rb_queue_initialize(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
RB_OBJ_WRITE(self, &q->que, ary_buf_new());
list_head_init(queue_waitq(q));
return self;
}
|
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
1068 1069 1070 1071 1072 1073 1074 1075 |
# File 'thread_sync.c', line 1068
static VALUE
rb_queue_clear(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
rb_ary_clear(check_array(self, q->que));
return self;
}
|
#close ⇒ Object
Closes the queue. A closed queue cannot be re-opened.
After the call to close completes, the following are true:
-
closed?
will return true -
close
will be ignored. -
calling enq/push/<< will raise a
ClosedQueueError
. -
when
empty?
is false, calling deq/pop/shift will return an object from the queue as usual. -
when
empty?
is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise aThreadError
.
ClosedQueueError is inherited from StopIteration, so that you can break loop block.
Example:
q = Queue.new
Thread.new{
while e = q.deq # wait for nil to break loop
# ...
end
}
q.close
898 899 900 901 902 903 904 905 906 907 908 909 910 |
# File 'thread_sync.c', line 898
static VALUE
rb_queue_close(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
if (!queue_closed_p(self)) {
FL_SET(self, QUEUE_CLOSED);
wakeup_all(queue_waitq(q));
}
return self;
}
|
#closed? ⇒ Boolean
Returns true
if the queue is closed.
919 920 921 922 923 |
# File 'thread_sync.c', line 919
static VALUE
rb_queue_closed_p(VALUE self)
{
return queue_closed_p(self) ? Qtrue : Qfalse;
}
|
#empty? ⇒ Boolean
Returns true
if the queue is empty.
1056 1057 1058 1059 1060 |
# File 'thread_sync.c', line 1056
static VALUE
rb_queue_empty_p(VALUE self)
{
return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
}
|
#length ⇒ Object #size ⇒ Object Also known as: size
Returns the length of the queue.
1086 1087 1088 1089 1090 |
# File 'thread_sync.c', line 1086
static VALUE
rb_queue_length(VALUE self)
{
return LONG2NUM(queue_length(self, queue_ptr(self)));
}
|
#marshal_dump ⇒ Object
:nodoc:
1526 1527 1528 1529 1530 1531 |
# File 'thread_sync.c', line 1526
static VALUE
undumpable(VALUE obj)
{
rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
UNREACHABLE_RETURN(Qnil);
}
|
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
1098 1099 1100 1101 1102 1103 1104 |
# File 'thread_sync.c', line 1098
static VALUE
rb_queue_num_waiting(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
return INT2NUM(q->num_waiting);
}
|
#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.
1042 1043 1044 1045 1046 1047 |
# File 'thread_sync.c', line 1042
static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return queue_do_pop(self, queue_ptr(self), should_block);
}
|