Class: SizedQueue

Inherits:
Object show all
Defined in:
thread_sync.c,
thread_sync.c

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Constructor Details

#new(max) ⇒ Object

Creates a fixed-length queue with a maximum size of max.


1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
# File 'thread_sync.c', line 1122

static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
    long max;
    struct rb_szqueue *sq = szqueue_ptr(self);

    max = NUM2LONG(vmax);
    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }

    RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
    list_head_init(szqueue_waitq(sq));
    list_head_init(szqueue_pushq(sq));
    sq->max = max;

    return self;
}

Instance Method Details

#clearObject

Removes all objects from the queue.


1305
1306
1307
1308
1309
1310
1311
1312
1313
# File 'thread_sync.c', line 1305

static VALUE
rb_szqueue_clear(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    rb_ary_clear(check_array(self, sq->q.que));
    wakeup_all(szqueue_pushq(sq));
    return self;
}

#closeObject

Similar to Queue#close.

The difference is behavior with waiting enqueuing threads.

If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError('queue closed').


1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
# File 'thread_sync.c', line 1153

static VALUE
rb_szqueue_close(VALUE self)
{
    if (!queue_closed_p(self)) {
	struct rb_szqueue *sq = szqueue_ptr(self);

	FL_SET(self, QUEUE_CLOSED);
	wakeup_all(szqueue_waitq(sq));
	wakeup_all(szqueue_pushq(sq));
    }
    return self;
}

#empty?Boolean

Returns true if the queue is empty.

Returns:

  • (Boolean)

1353
1354
1355
1356
1357
1358
1359
# File 'thread_sync.c', line 1353

static VALUE
rb_szqueue_empty_p(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
}

#lengthObject #sizeObject Also known as: size

Returns the length of the queue.


1324
1325
1326
1327
1328
1329
1330
# File 'thread_sync.c', line 1324

static VALUE
rb_szqueue_length(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return LONG2NUM(queue_length(self, &sq->q));
}

#maxObject

Returns the maximum size of the queue.


1172
1173
1174
1175
1176
# File 'thread_sync.c', line 1172

static VALUE
rb_szqueue_max_get(VALUE self)
{
    return LONG2NUM(szqueue_ptr(self)->max);
}

#max=(number) ⇒ Object

Sets the maximum size of the queue to the given number.


1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
# File 'thread_sync.c', line 1185

static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
    long max = NUM2LONG(vmax);
    long diff = 0;
    struct rb_szqueue *sq = szqueue_ptr(self);

    if (max <= 0) {
	rb_raise(rb_eArgError, "queue size must be positive");
    }
    if (max > sq->max) {
	diff = max - sq->max;
    }
    sq->max = max;
    sync_wakeup(szqueue_pushq(sq), diff);
    return vmax;
}

#num_waitingObject

Returns the number of threads waiting on the queue.


1338
1339
1340
1341
1342
1343
1344
# File 'thread_sync.c', line 1338

static VALUE
rb_szqueue_num_waiting(VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);

    return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
}

#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.


1292
1293
1294
1295
1296
1297
# File 'thread_sync.c', line 1292

static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
    int should_block = queue_pop_should_block(argc, argv);
    return szqueue_do_pop(self, should_block);
}

#push(object, non_block = false) ⇒ Object #enq(object, non_block = false) ⇒ Object #<<(object) ⇒ Object Also known as: enq, <<

Pushes object to the queue.

If there is no space left in the queue, waits until space becomes available, unless non_block is true. If non_block is true, the thread isn't suspended, and ThreadError is raised.


1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
# File 'thread_sync.c', line 1228

static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
    struct rb_szqueue *sq = szqueue_ptr(self);
    int should_block = szqueue_push_should_block(argc, argv);

    while (queue_length(self, &sq->q) >= sq->max) {
        if (!should_block) {
            rb_raise(rb_eThreadError, "queue full");
        }
        else if (queue_closed_p(self)) {
            break;
        }
        else {
            rb_execution_context_t *ec = GET_EC();
            COROUTINE_STACK_LOCAL(struct queue_waiter, qw);
            struct list_head *pushq = szqueue_pushq(sq);

            qw->w.self = self;
            qw->w.th = ec->thread_ptr;
            qw->w.fiber = ec->fiber_ptr;

            qw->as.sq = sq;
            list_add_tail(pushq, &qw->w.node);
            sq->num_waiting_push++;

            rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)qw);
        }
    }

    if (queue_closed_p(self)) {
        raise_closed_queue_error(self);
    }

    return queue_do_push(self, &sq->q, argv[0]);
}