Class: SysVMQ
- Inherits:
-
Object
- Object
- SysVMQ
- Defined in:
- ext/sysvmq.c
Constant Summary collapse
- IPC_CREAT =
Define platform specific constants from headers
INT2NUM(IPC_CREAT)
- IPC_EXCL =
INT2NUM(IPC_EXCL)
- IPC_NOWAIT =
INT2NUM(IPC_NOWAIT)
- IPC_RMID =
INT2NUM(IPC_RMID)
- IPC_SET =
INT2NUM(IPC_SET)
- IPC_STAT =
INT2NUM(IPC_STAT)
- IPC_INFO =
INT2NUM(IPC_INFO)
Instance Method Summary collapse
-
#destroy ⇒ Object
Proxies a call with IPC_RMID to ‘sysvmq_stats` to remove the queue.
-
#initialize(key, buffer_size, flags) ⇒ Object
constructor
other calls that require a ‘msgid`, for convienence and to share the buffer.
-
#receive(*args) ⇒ Object
Receive a message from the message queue.
-
#send(*args) ⇒ Object
Sends a message to the message queue.
-
#stats(*args) ⇒ Object
TODO: IPC_SET is currently not supported.
Constructor Details
#initialize(key, buffer_size, flags) ⇒ Object
other calls that require a ‘msgid`, for convienence and to share the buffer.
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'ext/sysvmq.c', line 331
VALUE
sysvmq_initialize(VALUE self, VALUE key, VALUE buffer_size, VALUE flags)
{
sysvmq_t* sysv;
size_t msgbuf_size;
// TODO: Also support string keys, so you can pass '0xDEADC0DE'
Check_Type(key, T_FIXNUM);
Check_Type(flags, T_FIXNUM);
Check_Type(buffer_size, T_FIXNUM);
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
// (key_t) is a 32-bit integer (int). It's defined as `int` (at least on OS X
// and Linux). However, `FIX2INT()` (from Ruby) will complain if the key is
// something in the range 2^31-2^32, because of the sign bit. We use UINT to
// trick Ruby, so it won't complain.
sysv->key = (key_t) FIX2UINT(key);
while ((sysv->id = msgget(sysv->key, FIX2INT(flags))) < 0) {
if (errno == EINTR) {
rb_thread_wait_for(polling_interval); // TODO: Really necessary here?
continue;
}
rb_sys_fail("Failed opening the message queue.");
}
// Allocate the msgbuf buffer once for the instance, to not allocate a buffer
// for each message sent. This makes SysVMQ not thread-safe (requiring a
// buffer for each thread), but is a reasonable trade-off for now for the
// performance.
sysv->buffer_size = (size_t) FIX2LONG(buffer_size) + 1;
msgbuf_size = sysv->buffer_size * sizeof(char) + sizeof(long);
// Note that this is a zero-length array, so we size the struct to size of the
// header (long, the mtype) and then the rest of the space for message buffer.
sysv->msgbuf = (sysvmq_msgbuf_t*) xmalloc(msgbuf_size);
return self;
}
|
Instance Method Details
#destroy ⇒ Object
Proxies a call with IPC_RMID to ‘sysvmq_stats` to remove the queue.
137 138 139 140 141 142 143 |
# File 'ext/sysvmq.c', line 137
static VALUE
sysvmq_destroy(VALUE self)
{
VALUE argv[1];
argv[0] = INT2FIX(IPC_RMID);
return sysvmq_stats(1, argv, self);
}
|
#receive(*args) ⇒ Object
Receive a message from the message queue.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'ext/sysvmq.c', line 178
VALUE
sysvmq_receive(int argc, VALUE *argv, VALUE self)
{
VALUE type = INT2FIX(0);
VALUE flags = INT2FIX(0);
sysvmq_t* sysv;
sysvmq_blocking_call_t blocking;
if (argc > 2) {
rb_raise(rb_eArgError, "Wrong number of arguments (0..2)");
}
if (argc >= 1) type = argv[0];
if (argc == 2) flags = argv[1];
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
Check_Type(type, T_FIXNUM);
Check_Type(flags, T_FIXNUM);
// Attach blocking call parameters to the struct passed to the blocking
// function wrapper.
blocking.flags = FIX2INT(flags);
blocking.type = FIX2LONG(type);
blocking.sysv = sysv;
// Initialize error so it's never a garbage value, if
// `sysvmq_maybe_blocking_receive` was interrupted at a non-nice time.
blocking.error = UNINITIALIZED_ERROR;
blocking.length = UNINITIALIZED_ERROR;
if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) {
while(sysvmq_maybe_blocking_receive(&blocking) == NULL && blocking.error < 0) {
if (errno == EINTR) {
continue;
}
rb_sys_fail("Failed receiving message from queue");
}
} else {
// msgrcv(2) can block sending a message, if IPC_NOWAIT is not passed.
// We unlock the GVL waiting for the call so other threads (e.g. signal
// handling) can continue to work. Sets `length` on `blocking` with the size
// of the message returned.
while (WITHOUT_GVL(sysvmq_maybe_blocking_receive, &blocking, RUBY_UBF_IO, NULL) == NULL
&& blocking.error < 0) {
if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) {
continue;
}
rb_sys_fail("Failed receiving message from queue");
}
}
// Guard it..
assert(blocking.length != UNINITIALIZED_ERROR);
// Reencode with default external encoding
return rb_str_new(sysv->msgbuf->mtext, blocking.length);
}
|
#send(*args) ⇒ Object
Sends a message to the message queue.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'ext/sysvmq.c', line 253
VALUE
sysvmq_send(int argc, VALUE *argv, VALUE self)
{
VALUE message;
VALUE priority = INT2FIX(1);
VALUE flags = INT2FIX(0);
sysvmq_blocking_call_t blocking;
sysvmq_t* sysv;
if (argc > 3 || argc == 0) {
rb_raise(rb_eArgError, "Wrong number of arguments (1..3)");
}
message = argv[0];
if (argc >= 2) priority = argv[1];
if (argc == 3) flags = argv[2];
message = rb_funcall(message, rb_intern("to_s"), 0);
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
Check_Type(flags, T_FIXNUM);
Check_Type(priority, T_FIXNUM);
// TODO: Call to_s on message if it responds to
// Attach blocking call parameters to the struct passed to the blocking
// function wrapper.
blocking.flags = FIX2INT(flags);
blocking.size = RSTRING_LEN(message);
blocking.sysv = sysv;
// See msgrcv(2) wrapper
blocking.error = UNINITIALIZED_ERROR;
blocking.length = UNINITIALIZED_ERROR;
// The buffer can be obtained from `sysvmq_maybe_blocking_send`, instead of
// passing it, set it directly on the instance struct.
sysv->msgbuf->mtype = FIX2INT(priority);
if (blocking.size > sysv->buffer_size) {
rb_raise(rb_eArgError, "Size of message is bigger than buffer size.");
}
// TODO: Can a string copy be avoided?
memcpy(sysv->msgbuf->mtext, RSTRING_PTR(message), blocking.size);
// Non-blocking call, skip the expensive GVL release/acquire
if ((blocking.flags & IPC_NOWAIT) == IPC_NOWAIT) {
while(sysvmq_maybe_blocking_send(&blocking) == NULL && blocking.error < 0) {
if (errno == EINTR) {
continue;
}
rb_sys_fail("Failed sending message to queue");
}
} else {
// msgsnd(2) can block waiting for a message, if IPC_NOWAIT is not passed.
// We unlock the GVL waiting for the call so other threads (e.g. signal
// handling) can continue to work.
while (WITHOUT_GVL(sysvmq_maybe_blocking_send, &blocking, RUBY_UBF_IO, NULL) == NULL
&& blocking.error < 0) {
if (errno == EINTR || blocking.error == UNINITIALIZED_ERROR) {
continue;
}
rb_sys_fail("Failed sending message to queue");
}
}
return message;
}
|
#stats(*args) ⇒ Object
TODO: IPC_SET is currently not supported.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'ext/sysvmq.c', line 88
static VALUE
sysvmq_stats(int argc, VALUE *argv, VALUE self)
{
struct msqid_ds info;
VALUE info_hash;
VALUE cmd;
sysvmq_t* sysv;
// Optional argument handling
if (argc > 1) {
rb_raise(rb_eArgError, "Wrong number of arguments (0..1)");
}
// Default to IPC_STAT
cmd = argc == 1 ? argv[0] : INT2FIX(IPC_STAT);
TypedData_Get_Struct(self, sysvmq_t, &sysvmq_type, sysv);
// TODO: Does FIX2INT actually perform this check already?
Check_Type(cmd, T_FIXNUM);
while (msgctl(sysv->id, FIX2INT(cmd), &info) < 0) {
if (errno == EINTR) {
rb_thread_wait_for(polling_interval);
continue;
}
rb_sys_fail("Failed executing msgctl(2) command.");
}
// Map values from struct to a hash
// TODO: Add all the fields
// TODO: They are probably not ints..
info_hash = rb_hash_new();
rb_hash_aset(info_hash, ID2SYM(rb_intern("count")), INT2FIX(info.msg_qnum));
rb_hash_aset(info_hash, ID2SYM(rb_intern("maximum_size")), INT2FIX(info.msg_qbytes));
// TODO: Can probably make a better checker here for whether the struct
// actually has the member.
// TODO: BSD support?
#ifdef __linux__
rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.__msg_cbytes));
#elif __APPLE__
rb_hash_aset(info_hash, ID2SYM(rb_intern("size")), INT2FIX(info.msg_cbytes));
#endif
return info_hash;
}
|