Class: SizedQueue
Overview
This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.
See Queue for an example of how a SizedQueue works.
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#close ⇒ Object
Similar to Queue#close.
-
#empty? ⇒ Boolean
Returns
true
if the queue is empty. -
#new(max) ⇒ Object
constructor
Creates a fixed-length queue with a maximum size of
max
. -
#length ⇒ Object
(also: #size)
Returns the length of the queue.
-
#max ⇒ Object
Returns the maximum size of the queue.
-
#max=(number) ⇒ Object
Sets the maximum size of the queue to the given
number
. -
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop(*args) ⇒ Object
(also: #deq, #shift)
Retrieves data from the queue.
-
#push(*args) ⇒ Object
(also: #enq, #<<)
Pushes
object
to the queue.
Constructor Details
#new(max) ⇒ Object
Creates a fixed-length queue with a maximum size of max
.
1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 |
# File 'thread_sync.c', line 1037
static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
long max;
struct rb_szqueue *sq = szqueue_ptr(self);
max = NUM2LONG(vmax);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
list_head_init(szqueue_waitq(sq));
list_head_init(szqueue_pushq(sq));
sq->max = max;
return self;
}
|
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
1217 1218 1219 1220 1221 1222 1223 1224 1225 |
# File 'thread_sync.c', line 1217
static VALUE
rb_szqueue_clear(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
rb_ary_clear(check_array(self, sq->q.que));
wakeup_all(szqueue_pushq(sq));
return self;
}
|
#close ⇒ Object
Similar to Queue#close.
The difference is behavior with waiting enqueuing threads.
If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError(‘queue closed’).
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 |
# File 'thread_sync.c', line 1068
static VALUE
rb_szqueue_close(VALUE self)
{
if (!queue_closed_p(self)) {
struct rb_szqueue *sq = szqueue_ptr(self);
FL_SET(self, QUEUE_CLOSED);
wakeup_all(szqueue_waitq(sq));
wakeup_all(szqueue_pushq(sq));
}
return self;
}
|
#empty? ⇒ Boolean
Returns true
if the queue is empty.
1265 1266 1267 1268 1269 1270 1271 |
# File 'thread_sync.c', line 1265
static VALUE
rb_szqueue_empty_p(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
}
|
#length ⇒ Object #size ⇒ Object Also known as: size
Returns the length of the queue.
1236 1237 1238 1239 1240 1241 1242 |
# File 'thread_sync.c', line 1236
static VALUE
rb_szqueue_length(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return LONG2NUM(queue_length(self, &sq->q));
}
|
#max ⇒ Object
Returns the maximum size of the queue.
1087 1088 1089 1090 1091 |
# File 'thread_sync.c', line 1087
static VALUE
rb_szqueue_max_get(VALUE self)
{
return LONG2NUM(szqueue_ptr(self)->max);
}
|
#max=(number) ⇒ Object
Sets the maximum size of the queue to the given number
.
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 |
# File 'thread_sync.c', line 1100
static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
long max = NUM2LONG(vmax);
long diff = 0;
struct rb_szqueue *sq = szqueue_ptr(self);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
if (max > sq->max) {
diff = max - sq->max;
}
sq->max = max;
sync_wakeup(szqueue_pushq(sq), diff);
return vmax;
}
|
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
1250 1251 1252 1253 1254 1255 1256 |
# File 'thread_sync.c', line 1250
static VALUE
rb_szqueue_num_waiting(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
}
|
#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.
1204 1205 1206 1207 1208 1209 |
# File 'thread_sync.c', line 1204
static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return szqueue_do_pop(self, should_block);
}
|
#push(object, non_block = false) ⇒ Object #enq(object, non_block = false) ⇒ Object #<<(object) ⇒ Object Also known as: enq, <<
Pushes object
to the queue.
If there is no space left in the queue, waits until space becomes available, unless non_block
is true. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 |
# File 'thread_sync.c', line 1143
static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue full");
}
else if (queue_closed_p(self)) {
goto closed;
}
else {
struct queue_waiter qw;
struct list_head *pushq = szqueue_pushq(sq);
qw.w.th = GET_THREAD();
qw.as.sq = sq;
list_add_tail(pushq, &qw.w.node);
sq->num_waiting_push++;
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
}
}
if (queue_closed_p(self)) {
closed:
raise_closed_queue_error(self);
}
return queue_do_push(self, &sq->q, argv[0]);
}
|