Class: SizedQueue

Inherits:
Object show all
Defined in:
thread_sync.c,
thread_sync.c

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Constructor Details

#new(max) ⇒ Object

Creates a fixed-length queue with a maximum size of max.



899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
# File 'thread_sync.c', line 899

static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
    long max;

    max = NUM2LONG(vmax);
    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }

    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);

    return self;
}

Instance Method Details

#clearObject

Removes all objects from the queue.



1066
1067
1068
1069
1070
1071
1072
# File 'thread_sync.c', line 1066

static VALUE
rb_szqueue_clear(VALUE self)
{
    rb_ary_clear(GET_QUEUE_QUE(self));
    wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
    return self;
}

#close(exception = false) ⇒ Object

Similar to Queue#close.

The difference is behavior with waiting enqueuing threads.

If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError(‘queue closed’).



929
930
931
932
933
# File 'thread_sync.c', line 929

static VALUE
rb_szqueue_close(VALUE self)
{
    return queue_do_close(self, TRUE);
}

#maxObject

Returns the maximum size of the queue.



941
942
943
944
945
# File 'thread_sync.c', line 941

static VALUE
rb_szqueue_max_get(VALUE self)
{
    return GET_SZQUEUE_MAX(self);
}

#max=(number) ⇒ Object

Sets the maximum size of the queue to the given number.



954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
# File 'thread_sync.c', line 954

static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
    long max = NUM2LONG(vmax), diff = 0;
    VALUE t;

    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }
    if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
	diff = max - GET_SZQUEUE_ULONGMAX(self);
    }
    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
    while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) {
	rb_thread_wakeup_alive(t);
    }
    return vmax;
}

#num_waitingObject

Returns the number of threads waiting on the queue.



1080
1081
1082
1083
1084
1085
# File 'thread_sync.c', line 1080

static VALUE
rb_szqueue_num_waiting(VALUE self)
{
    long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self);
    return ULONG2NUM(len);
}

#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and an exception is raised.



1053
1054
1055
1056
1057
1058
# File 'thread_sync.c', line 1053

static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
    int should_block = queue_pop_should_block(argc, argv);
    return szqueue_do_pop(self, should_block);
}

#push(object, non_block = false) ⇒ Object #enq(object, non_block = false) ⇒ Object #<<(object) ⇒ Object Also known as: enq, <<

Pushes object to the queue.

If there is no space left in the queue, waits until space becomes available, unless non_block is true. If non_block is true, the thread isn’t suspended, and an exception is raised.



998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
# File 'thread_sync.c', line 998

static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
    struct waiting_delete args;
    int should_block = szqueue_push_should_block(argc, argv);
    args.waiting = GET_SZQUEUE_WAITERS(self);
    args.th      = rb_thread_current();

    while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
	if (!should_block) {
	    rb_raise(rb_eThreadError, "queue full");
	}
	else if (queue_closed_p(self)) {
	    goto closed;
	}
	else {
	    rb_ary_push(args.waiting, args.th);
	    rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
	}
    }

    if (queue_closed_p(self)) {
      closed:
	raise_closed_queue_error(self);
    }

    return queue_do_push(self, argv[0]);
}