Class: Polyphony::Queue

Inherits:
Object show all
Defined in:
ext/polyphony/queue.c,
ext/polyphony/queue.c

Overview

This class implements a FIFO queue that can be used to exchange data between different fibers or threads. The queue can simultaneously service multiple producers and multiple consumers. A consumers trying to remove an item from an empty queue will block at least one item is added to the queue.

A queue can also be capped in order to limit its depth. A producer trying to add an item to a full capped queue will block until at least one item is removed from it.

Direct Known Subclasses

Channel

Instance Method Summary collapse

Constructor Details

#newObject #new(capacity) ⇒ Object

Initializes a queue instance. If the capacity is given, the queue becomes capped, i.e. it cannot contain more elements than its capacity. When trying to add items to a capped queue that is full, the current fiber will block until at least one item is removed from the queue.

Overloads:

  • #new(capacity) ⇒ Object

    Parameters:

    • capacity (Integer)

      maximum items in queue



74
75
76
77
78
79
80
81
82
83
84
85
# File 'ext/polyphony/queue.c', line 74

static VALUE Queue_initialize(int argc, VALUE *argv, VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  queue->closed = 0;
  ring_buffer_init(&queue->values);
  ring_buffer_init(&queue->shift_queue);
  ring_buffer_init(&queue->push_queue);
  queue->capacity = (argc == 1) ?  NUM2UINT(argv[0]) : 0;

  return self;
}

Instance Method Details

#push(value) ⇒ Queue #enq(value) ⇒ Queue #<<(value) ⇒ Queue

Adds the given value to the queue's end. If the queue is capped and full, the call will block until a value is removed from the queue.

Parameters:

  • value (any)

    value to be added to the queue

Returns:



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'ext/polyphony/queue.c', line 137

VALUE Queue_push(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  if (queue->closed)
    rb_raise(cClosedQueueError, "queue closed");

  if (queue->capacity) capped_queue_block_push(queue);

  queue_schedule_first_blocked_fiber(&queue->shift_queue);
  ring_buffer_push(&queue->values, value);

  return self;
}

#cap(cap) ⇒ Queue

Sets the capacity for the queue to the given value. If 0 or nil is given, the queue becomes uncapped.

Parameters:

  • cap (Integer, nil)

    new capacity

Returns:



266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'ext/polyphony/queue.c', line 266

VALUE Queue_cap(VALUE self, VALUE cap) {
  unsigned int new_capacity = NUM2UINT(cap);
  Queue_t *queue;
  GetQueue(self, queue);
  queue->capacity = new_capacity;

  if (queue->capacity)
    queue_schedule_blocked_fibers_to_capacity(queue);
  else
    queue_schedule_all_blocked_fibers(&queue->push_queue);

  return self;
}

#capped?boolean

Returns true if the queue is capped.

Returns:

  • (boolean)

    is the queue capped



285
286
287
288
289
290
# File 'ext/polyphony/queue.c', line 285

VALUE Queue_capped_p(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return queue->capacity ? INT2FIX(queue->capacity) : Qnil;
}

#clearQueue

Removes all values from the queue.

Returns:



297
298
299
300
301
302
303
304
305
# File 'ext/polyphony/queue.c', line 297

VALUE Queue_clear(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  ring_buffer_clear(&queue->values);
  if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);

  return self;
}

#closeQueue

Marks the queue as closed. Any fibers currently waiting on the queue are resumed with a nil value. After the queue is closed, trying to remove items from the queue will cause a ClosedQueueError to be raised.

Returns:



440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'ext/polyphony/queue.c', line 440

VALUE Queue_close(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  if (queue->closed) goto end;
  queue->closed = 1;

  // release all fibers waiting on `#shift`
  while (queue->shift_queue.count) {
    VALUE fiber = ring_buffer_shift(&queue->shift_queue);
    if (fiber == Qnil) break;
    Fiber_make_runnable(fiber, Qnil);
  }

end:
  return self;
}

#closed?boolean

Returns true if the queue has been closed.

Returns:

  • (boolean)


426
427
428
429
430
431
# File 'ext/polyphony/queue.c', line 426

VALUE Queue_closed_p(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return (queue->closed) ? Qtrue : Qfalse;
}

#delete(value) ⇒ Queue

Removes the given value from the queue.

Returns:



247
248
249
250
251
252
253
254
255
256
257
# File 'ext/polyphony/queue.c', line 247

VALUE Queue_delete(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  ring_buffer_delete(&queue->values, value);

  if (queue->capacity && (queue->capacity > queue->values.count))
    queue_schedule_first_blocked_fiber(&queue->push_queue);

  return self;
}

#shiftany #shift(true) ⇒ any #popany #pop(true) ⇒ any #deqany #deq(true) ⇒ any

Removes the first value in the queue and returns it. If the optional nonblock parameter is true, the operation is non-blocking. In non-blocking mode, if the queue is empty, a ThreadError exception is raised. In blocking mode, if the queue is empty, the call will block until an item is added to the queue.

Parameters:

  • nonblock (boolean)

    non-blocking mode

Returns:

  • (any)

    first value in queue



232
233
234
235
236
237
238
239
240
# File 'ext/polyphony/queue.c', line 232

VALUE Queue_shift(int argc,VALUE *argv, VALUE self) {
  int nonblock = argc && RTEST(argv[0]);
  Queue_t *queue;
  GetQueue(self, queue);

  return nonblock ?
    Queue_shift_nonblock(queue) :
    Queue_shift_block(queue);
}

#empty?boolean

Returns true if the queue is empty.

Returns:

  • (boolean)


372
373
374
375
376
377
# File 'ext/polyphony/queue.c', line 372

VALUE Queue_empty_p(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return (!queue->values.count) ? Qtrue : Qfalse;
}

#push(value) ⇒ Queue #enq(value) ⇒ Queue #<<(value) ⇒ Queue

Adds the given value to the queue's end. If the queue is capped and full, the call will block until a value is removed from the queue.

Parameters:

  • value (any)

    value to be added to the queue

Returns:



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'ext/polyphony/queue.c', line 137

VALUE Queue_push(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  if (queue->closed)
    rb_raise(cClosedQueueError, "queue closed");

  if (queue->capacity) capped_queue_block_push(queue);

  queue_schedule_first_blocked_fiber(&queue->shift_queue);
  ring_buffer_push(&queue->values, value);

  return self;
}

#flush_waiters(value) ⇒ Queue

Flushes all fibers currently blocked waiting to remove items from the queue, resuming them with the given value.

Parameters:

  • value (any)

    value to resome all waiting fibers with

Returns:



353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'ext/polyphony/queue.c', line 353

VALUE Queue_flush_waiters(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  while(1) {
    VALUE fiber = ring_buffer_shift(&queue->shift_queue);
    if (fiber == Qnil) return self;

    Fiber_make_runnable(fiber, value);
  }

  return self;
}

#sizeInteger #lengthInteger

Returns the number of values currently in the queue.

Overloads:

  • #sizeInteger

    Returns:

    • (Integer)
  • #lengthInteger

    Returns:

    • (Integer)

Returns:

  • (Integer)

    number of values in the queue



414
415
416
417
418
419
# File 'ext/polyphony/queue.c', line 414

VALUE Queue_size_m(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return INT2FIX(queue->values.count);
}

#num_waitingInteger

Returns the number of fibers currently waiting to remove items from the queue.

Returns:

  • (Integer)


398
399
400
401
402
403
# File 'ext/polyphony/queue.c', line 398

VALUE Queue_num_waiting(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return INT2FIX(queue->shift_queue.count);
}

#pending?boolean

Returns true if any fibers are currently waiting to remove items from the queue.

Returns:

  • (boolean)


385
386
387
388
389
390
# File 'ext/polyphony/queue.c', line 385

VALUE Queue_pending_p(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return (queue->shift_queue.count) ? Qtrue : Qfalse;
}

#shiftany #shift(true) ⇒ any #popany #pop(true) ⇒ any #deqany #deq(true) ⇒ any

Removes the first value in the queue and returns it. If the optional nonblock parameter is true, the operation is non-blocking. In non-blocking mode, if the queue is empty, a ThreadError exception is raised. In blocking mode, if the queue is empty, the call will block until an item is added to the queue.

Parameters:

  • nonblock (boolean)

    non-blocking mode

Returns:

  • (any)

    first value in queue



232
233
234
235
236
237
238
239
240
# File 'ext/polyphony/queue.c', line 232

VALUE Queue_shift(int argc,VALUE *argv, VALUE self) {
  int nonblock = argc && RTEST(argv[0]);
  Queue_t *queue;
  GetQueue(self, queue);

  return nonblock ?
    Queue_shift_nonblock(queue) :
    Queue_shift_block(queue);
}

#push(value) ⇒ Queue #enq(value) ⇒ Queue #<<(value) ⇒ Queue

Adds the given value to the queue's end. If the queue is capped and full, the call will block until a value is removed from the queue.

Parameters:

  • value (any)

    value to be added to the queue

Returns:



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'ext/polyphony/queue.c', line 137

VALUE Queue_push(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  if (queue->closed)
    rb_raise(cClosedQueueError, "queue closed");

  if (queue->capacity) capped_queue_block_push(queue);

  queue_schedule_first_blocked_fiber(&queue->shift_queue);
  ring_buffer_push(&queue->values, value);

  return self;
}

#shiftany #shift(true) ⇒ any #popany #pop(true) ⇒ any #deqany #deq(true) ⇒ any

Removes the first value in the queue and returns it. If the optional nonblock parameter is true, the operation is non-blocking. In non-blocking mode, if the queue is empty, a ThreadError exception is raised. In blocking mode, if the queue is empty, the call will block until an item is added to the queue.

Parameters:

  • nonblock (boolean)

    non-blocking mode

Returns:

  • (any)

    first value in queue



232
233
234
235
236
237
238
239
240
# File 'ext/polyphony/queue.c', line 232

VALUE Queue_shift(int argc,VALUE *argv, VALUE self) {
  int nonblock = argc && RTEST(argv[0]);
  Queue_t *queue;
  GetQueue(self, queue);

  return nonblock ?
    Queue_shift_nonblock(queue) :
    Queue_shift_block(queue);
}

#shift_allArray

Returns all values currently in the queue, clearing the queue.

Returns:

  • (Array)

    all values



335
336
337
338
339
340
341
342
343
344
# File 'ext/polyphony/queue.c', line 335

VALUE Queue_shift_all(VALUE self) {
  Queue_t *queue;
  VALUE result;

  GetQueue(self, queue);

  result = ring_buffer_shift_all(&queue->values);
  if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);
  return result;
}

#shift_each {|any| ... } ⇒ Queue

Iterates over all values in the queue, removing each item and passing it to the given block.

Yields:

  • (any)

    removed item

Returns:



321
322
323
324
325
326
327
328
# File 'ext/polyphony/queue.c', line 321

VALUE Queue_shift_each(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  ring_buffer_shift_each(&queue->values);
  if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);
  return self;
}

#sizeInteger #lengthInteger

Returns the number of values currently in the queue.

Overloads:

  • #sizeInteger

    Returns:

    • (Integer)
  • #lengthInteger

    Returns:

    • (Integer)

Returns:

  • (Integer)

    number of values in the queue



414
415
416
417
418
419
# File 'ext/polyphony/queue.c', line 414

VALUE Queue_size_m(VALUE self) {
  Queue_t *queue;
  GetQueue(self, queue);

  return INT2FIX(queue->values.count);
}

#unshift(value) ⇒ Queue

Adds the given value to the queue's beginning. If the queue is capped and full, the call will block until a value is removed from the queue.

Parameters:

  • value (any)

    value to be added to the queue

Returns:



159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'ext/polyphony/queue.c', line 159

VALUE Queue_unshift(VALUE self, VALUE value) {
  Queue_t *queue;
  GetQueue(self, queue);

  if (queue->closed)
    rb_raise(cClosedQueueError, "queue closed");

  if (queue->capacity) capped_queue_block_push(queue);

  queue_schedule_first_blocked_fiber(&queue->shift_queue);
  ring_buffer_unshift(&queue->values, value);

  return self;
}