Class: Queue

Inherits:
Object show all
Defined in:
thread_sync.c,
thread_sync.c

Overview

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

The class implements FIFO type of queue. In a FIFO queue, the first tasks added are the first retrieved.

Example:

queue = Queue.new

producer = Thread.new do

5.times do |i|
   sleep rand(i) # simulate expense
   queue << i
   puts "#{i} produced"
end

end

consumer = Thread.new do

5.times do |i|
   value = queue.pop
   sleep rand(i/2) # simulate expense
   puts "consumed #{value}"
end

end

consumer.join

Instance Method Summary collapse

Constructor Details

#initializeObject

Creates a new queue instance.


845
846
847
848
849
850
851
852
# File 'thread_sync.c', line 845

static VALUE
rb_queue_initialize(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);
    RB_OBJ_WRITE(self, &q->que, ary_buf_new());
    list_head_init(queue_waitq(q));
    return self;
}

Instance Method Details

#clearObject

Removes all objects from the queue.


1068
1069
1070
1071
1072
1073
1074
1075
# File 'thread_sync.c', line 1068

static VALUE
rb_queue_clear(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    rb_ary_clear(check_array(self, q->que));
    return self;
}

#closeObject

Closes the queue. A closed queue cannot be re-opened.

After the call to close completes, the following are true:

  • closed? will return true

  • close will be ignored.

  • calling enq/push/<< will raise a ClosedQueueError.

  • when empty? is false, calling deq/pop/shift will return an object from the queue as usual.

  • when empty? is true, deq(false) will not suspend the thread and will return nil. deq(true) will raise a ThreadError.

ClosedQueueError is inherited from StopIteration, so that you can break loop block.

Example:

  	q = Queue.new
    Thread.new{
      while e = q.deq # wait for nil to break loop
        # ...
      end
    }
    q.close

898
899
900
901
902
903
904
905
906
907
908
909
910
# File 'thread_sync.c', line 898

static VALUE
rb_queue_close(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    if (!queue_closed_p(self)) {
	FL_SET(self, QUEUE_CLOSED);

	wakeup_all(queue_waitq(q));
    }

    return self;
}

#closed?Boolean

Returns true if the queue is closed.

Returns:

  • (Boolean)

919
920
921
922
923
# File 'thread_sync.c', line 919

static VALUE
rb_queue_closed_p(VALUE self)
{
    return queue_closed_p(self) ? Qtrue : Qfalse;
}

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)

1056
1057
1058
1059
1060
# File 'thread_sync.c', line 1056

static VALUE
rb_queue_empty_p(VALUE self)
{
    return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
}

#lengthObject #sizeObject Also known as: size

Returns the length of the queue.


1086
1087
1088
1089
1090
# File 'thread_sync.c', line 1086

static VALUE
rb_queue_length(VALUE self)
{
    return LONG2NUM(queue_length(self, queue_ptr(self)));
}

#marshal_dumpObject

:nodoc:


1526
1527
1528
1529
1530
1531
# File 'thread_sync.c', line 1526

static VALUE
undumpable(VALUE obj)
{
    rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
    UNREACHABLE_RETURN(Qnil);
}

#num_waitingObject

Returns the number of threads waiting on the queue.


1098
1099
1100
1101
1102
1103
1104
# File 'thread_sync.c', line 1098

static VALUE
rb_queue_num_waiting(VALUE self)
{
    struct rb_queue *q = queue_ptr(self);

    return INT2NUM(q->num_waiting);
}

#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.


1042
1043
1044
1045
1046
1047
# File 'thread_sync.c', line 1042

static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
    int should_block = queue_pop_should_block(argc, argv);
    return queue_do_pop(self, queue_ptr(self), should_block);
}

#push(object) ⇒ Object #enq(object) ⇒ Object #<<(object) ⇒ Object Also known as: enq, <<

Pushes the given object to the queue.


935
936
937
938
939
# File 'thread_sync.c', line 935

static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
    return queue_do_push(self, queue_ptr(self), obj);
}