Class: Polyphony::Queue
- Defined in:
- ext/polyphony/queue.c,
ext/polyphony/queue.c
Overview
This class implements a FIFO queue that can be used to exchange data between different fibers or threads. The queue can simultaneously service multiple producers and multiple consumers. A consumers trying to remove an item from an empty queue will block at least one item is added to the queue.
A queue can also be capped in order to limit its depth. A producer trying to add an item to a full capped queue will block until at least one item is removed from it.
Direct Known Subclasses
Instance Method Summary collapse
-
#<<(value) ⇒ Queue
Adds the given value to the queue's end.
-
#cap(cap) ⇒ Queue
Sets the capacity for the queue to the given value.
-
#capped? ⇒ boolean
Returns true if the queue is capped.
-
#clear ⇒ Queue
Removes all values from the queue.
-
#close ⇒ Queue
Marks the queue as closed.
-
#closed? ⇒ boolean
Returns true if the queue has been closed.
-
#delete(value) ⇒ Queue
Removes the given value from the queue.
-
#deq(*args) ⇒ any
Removes the first value in the queue and returns it.
-
#empty? ⇒ boolean
Returns true if the queue is empty.
-
#enq(value) ⇒ Queue
Adds the given value to the queue's end.
-
#flush_waiters(value) ⇒ Queue
Flushes all fibers currently blocked waiting to remove items from the queue, resuming them with the given value.
-
#initialize(*args) ⇒ Object
constructor
Initializes a queue instance.
-
#length ⇒ Integer
Returns the number of values currently in the queue.
-
#num_waiting ⇒ Integer
Returns the number of fibers currently waiting to remove items from the queue.
-
#pending? ⇒ boolean
Returns true if any fibers are currently waiting to remove items from the queue.
-
#pop(*args) ⇒ any
Removes the first value in the queue and returns it.
-
#push(value) ⇒ Queue
Adds the given value to the queue's end.
-
#shift(*args) ⇒ any
Removes the first value in the queue and returns it.
-
#shift_all ⇒ Array
Returns all values currently in the queue, clearing the queue.
-
#shift_each {|any| ... } ⇒ Queue
Iterates over all values in the queue, removing each item and passing it to the given block.
-
#size ⇒ Integer
Returns the number of values currently in the queue.
-
#unshift(value) ⇒ Queue
Adds the given value to the queue's beginning.
Constructor Details
#new ⇒ Object #new(capacity) ⇒ Object
Initializes a queue instance. If the capacity is given, the queue becomes capped, i.e. it cannot contain more elements than its capacity. When trying to add items to a capped queue that is full, the current fiber will block until at least one item is removed from the queue.
74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'ext/polyphony/queue.c', line 74
static VALUE Queue_initialize(int argc, VALUE *argv, VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
queue->closed = 0;
ring_buffer_init(&queue->values);
ring_buffer_init(&queue->shift_queue);
ring_buffer_init(&queue->push_queue);
queue->capacity = (argc == 1) ? NUM2UINT(argv[0]) : 0;
return self;
}
|
Instance Method Details
#push(value) ⇒ Queue #enq(value) ⇒ Queue #<<(value) ⇒ Queue
Adds the given value to the queue's end. If the queue is capped and full, the call will block until a value is removed from the queue.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'ext/polyphony/queue.c', line 137
VALUE Queue_push(VALUE self, VALUE value) {
Queue_t *queue;
GetQueue(self, queue);
if (queue->closed)
rb_raise(cClosedQueueError, "queue closed");
if (queue->capacity) capped_queue_block_push(queue);
queue_schedule_first_blocked_fiber(&queue->shift_queue);
ring_buffer_push(&queue->values, value);
return self;
}
|
#cap(cap) ⇒ Queue
Sets the capacity for the queue to the given value. If 0 or nil is given, the queue becomes uncapped.
266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'ext/polyphony/queue.c', line 266
VALUE Queue_cap(VALUE self, VALUE cap) {
unsigned int new_capacity = NUM2UINT(cap);
Queue_t *queue;
GetQueue(self, queue);
queue->capacity = new_capacity;
if (queue->capacity)
queue_schedule_blocked_fibers_to_capacity(queue);
else
queue_schedule_all_blocked_fibers(&queue->push_queue);
return self;
}
|
#capped? ⇒ boolean
Returns true if the queue is capped.
285 286 287 288 289 290 |
# File 'ext/polyphony/queue.c', line 285
VALUE Queue_capped_p(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
return queue->capacity ? INT2FIX(queue->capacity) : Qnil;
}
|
#clear ⇒ Queue
Removes all values from the queue.
297 298 299 300 301 302 303 304 305 |
# File 'ext/polyphony/queue.c', line 297
VALUE Queue_clear(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
ring_buffer_clear(&queue->values);
if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);
return self;
}
|
#close ⇒ Queue
Marks the queue as closed. Any fibers currently waiting on the queue are
resumed with a nil
value. After the queue is closed, trying to remove items
from the queue will cause a ClosedQueueError
to be raised.
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 |
# File 'ext/polyphony/queue.c', line 440
VALUE Queue_close(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
if (queue->closed) goto end;
queue->closed = 1;
// release all fibers waiting on `#shift`
while (queue->shift_queue.count) {
VALUE fiber = ring_buffer_shift(&queue->shift_queue);
if (fiber == Qnil) break;
Fiber_make_runnable(fiber, Qnil);
}
end:
return self;
}
|
#closed? ⇒ boolean
Returns true if the queue has been closed.
426 427 428 429 430 431 |
# File 'ext/polyphony/queue.c', line 426
VALUE Queue_closed_p(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
return (queue->closed) ? Qtrue : Qfalse;
}
|
#delete(value) ⇒ Queue
Removes the given value from the queue.
247 248 249 250 251 252 253 254 255 256 257 |
# File 'ext/polyphony/queue.c', line 247
VALUE Queue_delete(VALUE self, VALUE value) {
Queue_t *queue;
GetQueue(self, queue);
ring_buffer_delete(&queue->values, value);
if (queue->capacity && (queue->capacity > queue->values.count))
queue_schedule_first_blocked_fiber(&queue->push_queue);
return self;
}
|
#shift ⇒ any #shift(true) ⇒ any #pop ⇒ any #pop(true) ⇒ any #deq ⇒ any #deq(true) ⇒ any
Removes the first value in the queue and returns it. If the optional nonblock parameter is true, the operation is non-blocking. In non-blocking mode, if the queue is empty, a ThreadError exception is raised. In blocking mode, if the queue is empty, the call will block until an item is added to the queue.
232 233 234 235 236 237 238 239 240 |
# File 'ext/polyphony/queue.c', line 232
VALUE Queue_shift(int argc,VALUE *argv, VALUE self) {
int nonblock = argc && RTEST(argv[0]);
Queue_t *queue;
GetQueue(self, queue);
return nonblock ?
Queue_shift_nonblock(queue) :
Queue_shift_block(queue);
}
|
#empty? ⇒ boolean
Returns true if the queue is empty.
372 373 374 375 376 377 |
# File 'ext/polyphony/queue.c', line 372
VALUE Queue_empty_p(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
return (!queue->values.count) ? Qtrue : Qfalse;
}
|
#push(value) ⇒ Queue #enq(value) ⇒ Queue #<<(value) ⇒ Queue
Adds the given value to the queue's end. If the queue is capped and full, the call will block until a value is removed from the queue.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'ext/polyphony/queue.c', line 137
VALUE Queue_push(VALUE self, VALUE value) {
Queue_t *queue;
GetQueue(self, queue);
if (queue->closed)
rb_raise(cClosedQueueError, "queue closed");
if (queue->capacity) capped_queue_block_push(queue);
queue_schedule_first_blocked_fiber(&queue->shift_queue);
ring_buffer_push(&queue->values, value);
return self;
}
|
#flush_waiters(value) ⇒ Queue
Flushes all fibers currently blocked waiting to remove items from the queue, resuming them with the given value.
353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'ext/polyphony/queue.c', line 353
VALUE Queue_flush_waiters(VALUE self, VALUE value) {
Queue_t *queue;
GetQueue(self, queue);
while(1) {
VALUE fiber = ring_buffer_shift(&queue->shift_queue);
if (fiber == Qnil) return self;
Fiber_make_runnable(fiber, value);
}
return self;
}
|
#size ⇒ Integer #length ⇒ Integer
Returns the number of values currently in the queue.
414 415 416 417 418 419 |
# File 'ext/polyphony/queue.c', line 414
VALUE Queue_size_m(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
return INT2FIX(queue->values.count);
}
|
#num_waiting ⇒ Integer
Returns the number of fibers currently waiting to remove items from the queue.
398 399 400 401 402 403 |
# File 'ext/polyphony/queue.c', line 398
VALUE Queue_num_waiting(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
return INT2FIX(queue->shift_queue.count);
}
|
#pending? ⇒ boolean
Returns true if any fibers are currently waiting to remove items from the queue.
385 386 387 388 389 390 |
# File 'ext/polyphony/queue.c', line 385
VALUE Queue_pending_p(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
return (queue->shift_queue.count) ? Qtrue : Qfalse;
}
|
#shift ⇒ any #shift(true) ⇒ any #pop ⇒ any #pop(true) ⇒ any #deq ⇒ any #deq(true) ⇒ any
Removes the first value in the queue and returns it. If the optional nonblock parameter is true, the operation is non-blocking. In non-blocking mode, if the queue is empty, a ThreadError exception is raised. In blocking mode, if the queue is empty, the call will block until an item is added to the queue.
232 233 234 235 236 237 238 239 240 |
# File 'ext/polyphony/queue.c', line 232
VALUE Queue_shift(int argc,VALUE *argv, VALUE self) {
int nonblock = argc && RTEST(argv[0]);
Queue_t *queue;
GetQueue(self, queue);
return nonblock ?
Queue_shift_nonblock(queue) :
Queue_shift_block(queue);
}
|
#push(value) ⇒ Queue #enq(value) ⇒ Queue #<<(value) ⇒ Queue
Adds the given value to the queue's end. If the queue is capped and full, the call will block until a value is removed from the queue.
137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'ext/polyphony/queue.c', line 137
VALUE Queue_push(VALUE self, VALUE value) {
Queue_t *queue;
GetQueue(self, queue);
if (queue->closed)
rb_raise(cClosedQueueError, "queue closed");
if (queue->capacity) capped_queue_block_push(queue);
queue_schedule_first_blocked_fiber(&queue->shift_queue);
ring_buffer_push(&queue->values, value);
return self;
}
|
#shift ⇒ any #shift(true) ⇒ any #pop ⇒ any #pop(true) ⇒ any #deq ⇒ any #deq(true) ⇒ any
Removes the first value in the queue and returns it. If the optional nonblock parameter is true, the operation is non-blocking. In non-blocking mode, if the queue is empty, a ThreadError exception is raised. In blocking mode, if the queue is empty, the call will block until an item is added to the queue.
232 233 234 235 236 237 238 239 240 |
# File 'ext/polyphony/queue.c', line 232
VALUE Queue_shift(int argc,VALUE *argv, VALUE self) {
int nonblock = argc && RTEST(argv[0]);
Queue_t *queue;
GetQueue(self, queue);
return nonblock ?
Queue_shift_nonblock(queue) :
Queue_shift_block(queue);
}
|
#shift_all ⇒ Array
Returns all values currently in the queue, clearing the queue.
335 336 337 338 339 340 341 342 343 344 |
# File 'ext/polyphony/queue.c', line 335
VALUE Queue_shift_all(VALUE self) {
Queue_t *queue;
VALUE result;
GetQueue(self, queue);
result = ring_buffer_shift_all(&queue->values);
if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);
return result;
}
|
#shift_each {|any| ... } ⇒ Queue
Iterates over all values in the queue, removing each item and passing it to the given block.
321 322 323 324 325 326 327 328 |
# File 'ext/polyphony/queue.c', line 321
VALUE Queue_shift_each(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
ring_buffer_shift_each(&queue->values);
if (queue->capacity) queue_schedule_blocked_fibers_to_capacity(queue);
return self;
}
|
#size ⇒ Integer #length ⇒ Integer
Returns the number of values currently in the queue.
414 415 416 417 418 419 |
# File 'ext/polyphony/queue.c', line 414
VALUE Queue_size_m(VALUE self) {
Queue_t *queue;
GetQueue(self, queue);
return INT2FIX(queue->values.count);
}
|
#unshift(value) ⇒ Queue
Adds the given value to the queue's beginning. If the queue is capped and full, the call will block until a value is removed from the queue.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'ext/polyphony/queue.c', line 159
VALUE Queue_unshift(VALUE self, VALUE value) {
Queue_t *queue;
GetQueue(self, queue);
if (queue->closed)
rb_raise(cClosedQueueError, "queue closed");
if (queue->capacity) capped_queue_block_push(queue);
queue_schedule_first_blocked_fiber(&queue->shift_queue);
ring_buffer_unshift(&queue->values, value);
return self;
}
|