Class: SizedQueue

Inherits:
Object show all
Defined in:
thread_sync.c,
thread_sync.c

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Constructor Details

#new(max) ⇒ Object

Creates a fixed-length queue with a maximum size of max.



1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
# File 'thread_sync.c', line 1037

static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
    long max;
    struct rb_szqueue *sq = szqueue_ptr(self);

    max = NUM2LONG(vmax);
    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }

    RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
    list_head_init(szqueue_waitq(sq));
    list_head_init(szqueue_pushq(sq));
    sq->max = max;

    return self;
}

Instance Method Details

#clearObject

Removes all objects from the queue.



1217
1218
1219
1220
1221
1222
1223
1224
1225
# File 'thread_sync.c', line 1217

static VALUE
rb_szqueue_clear(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    rb_ary_clear(check_array(self, sq->q.que));
    wakeup_all(szqueue_pushq(sq));
    return self;
}

#closeObject

Similar to Queue#close.

The difference is behavior with waiting enqueuing threads.

If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError(‘queue closed’).



1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
# File 'thread_sync.c', line 1068

static VALUE
rb_szqueue_close(VALUE self)
{
    if (!queue_closed_p(self)) {
	struct rb_szqueue *sq = szqueue_ptr(self);

	FL_SET(self, QUEUE_CLOSED);
	wakeup_all(szqueue_waitq(sq));
	wakeup_all(szqueue_pushq(sq));
    }
    return self;
}

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)


1265
1266
1267
1268
1269
1270
1271
# File 'thread_sync.c', line 1265

static VALUE
rb_szqueue_empty_p(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
}

#lengthObject #sizeObject Also known as: size

Returns the length of the queue.



1236
1237
1238
1239
1240
1241
1242
# File 'thread_sync.c', line 1236

static VALUE
rb_szqueue_length(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return LONG2NUM(queue_length(self, &sq->q));
}

#maxObject

Returns the maximum size of the queue.



1087
1088
1089
1090
1091
# File 'thread_sync.c', line 1087

static VALUE
rb_szqueue_max_get(VALUE self)
{
    return LONG2NUM(szqueue_ptr(self)->max);
}

#max=(number) ⇒ Object

Sets the maximum size of the queue to the given number.



1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
# File 'thread_sync.c', line 1100

static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
    long max = NUM2LONG(vmax);
    long diff = 0;
    struct rb_szqueue *sq = szqueue_ptr(self);

    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }
    if (max > sq->max) {
	diff = max - sq->max;
    }
    sq->max = max;
    sync_wakeup(szqueue_pushq(sq), diff);
    return vmax;
}

#num_waitingObject

Returns the number of threads waiting on the queue.



1250
1251
1252
1253
1254
1255
1256
# File 'thread_sync.c', line 1250

static VALUE
rb_szqueue_num_waiting(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
}

#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn’t suspended, and ThreadError is raised.



1204
1205
1206
1207
1208
1209
# File 'thread_sync.c', line 1204

static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
    int should_block = queue_pop_should_block(argc, argv);
    return szqueue_do_pop(self, should_block);
}

#push(object, non_block = false) ⇒ Object #enq(object, non_block = false) ⇒ Object #<<(object) ⇒ Object Also known as: enq, <<

Pushes object to the queue.

If there is no space left in the queue, waits until space becomes available, unless non_block is true. If non_block is true, the thread isn’t suspended, and ThreadError is raised.



1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
# File 'thread_sync.c', line 1143

static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);
    int should_block = szqueue_push_should_block(argc, argv);

    while (queue_length(self, &sq->q) >= sq->max) {
	if (!should_block) {
	    rb_raise(rb_eThreadError, "queue full");
	}
	else if (queue_closed_p(self)) {
	    goto closed;
	}
	else {
	    struct queue_waiter qw;
	    struct list_head *pushq = szqueue_pushq(sq);

	    qw.w.th = GET_THREAD();
	    qw.as.sq = sq;
	    list_add_tail(pushq, &qw.w.node);
	    sq->num_waiting_push++;

	    rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
	}
    }

    if (queue_closed_p(self)) {
      closed:
	raise_closed_queue_error(self);
    }

    return queue_do_push(self, &sq->q, argv[0]);
}