Class: POSIX_MQ
- Inherits:
-
Object
- Object
- POSIX_MQ
- Defined in:
- lib/posix_mq.rb,
ext/posix_mq/posix_mq.c
Defined Under Namespace
Classes: Attr
Constant Summary collapse
- VERSION =
version of POSIX_MQ, currently 0.5.0
'0.5.0'
- OPEN_MAX =
The maximum number of open message descriptors supported by the system. This may be -1, in which case it is dynamically set at runtime. Consult your operating system documentation for system-specific information about this.
LONG2NUM(sysconf(_SC_MQ_OPEN_MAX))
- PRIO_MAX =
The maximum priority that may be specified for POSIX_MQ#send On POSIX-compliant systems, this is at least 31, but some systems allow higher limits. The minimum priority is always zero.
LONG2NUM(sysconf(_SC_MQ_PRIO_MAX))
Class Method Summary collapse
-
.open(*args) ⇒ Object
Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit.
-
.unlink(name) ⇒ Object
POSIX_MQ.unlink(name) => 1.
Instance Method Summary collapse
-
#<<(buffer) ⇒ Object
mq << string => mq.
-
#attr ⇒ Object
mq.attr => mq_attr.
-
#attr=(astruct) ⇒ Object
mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr.
-
#close ⇒ Object
mq.close => nil.
-
#closed? ⇒ Boolean
mq.closed? => true or false.
-
#dup ⇒ Object
(also: #clone)
There’s no point in ever duping a POSIX_MQ object.
-
#initialize(*args) ⇒ Object
constructor
POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq.
-
#name ⇒ Object
mq.name => string.
-
#nonblock=(nb) ⇒ Object
mq.nonblock = boolean => boolean.
-
#nonblock? ⇒ Boolean
mq.nonblock? => true or false.
-
#notify(&block) ⇒ Object
Executes the given block upon reception of the next message in an empty queue.
-
#notify=(arg) ⇒ Object
mq.notify = signal => signal.
-
#notify_thread=(thread) ⇒ Object
:nodoc:.
-
#receive(*args) ⇒ Object
mq.receive([buffer, [timeout]]) => [ message, priority ].
-
#send(*args) ⇒ Object
mq.send(string [,priority[, timeout]]) => nil.
-
#shift(*args) ⇒ Object
mq.shift([buffer, [timeout]]) => message.
-
#to_io ⇒ Object
mq.to_io => IO.
-
#unlink ⇒ Object
mq.unlink => mq.
Constructor Details
#initialize(*args) ⇒ Object
POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
Opens a POSIX message queue given by name
. name
should start with a slash (“/”) for portable applications.
If a Symbol is given in place of integer flags
, then:
-
:r
is equivalent to IO::RDONLY -
:w
is equivalent to IO::CREAT|IO::WRONLY -
:rw
is equivalent to IO::CREAT|IO::RDWR
mode
is an integer and only used when IO::CREAT is used. mq_attr
is a POSIX_MQ::Attr and only used if IO::CREAT is used. If mq_attr
is not specified when creating a queue, then the system defaults will be used.
See the manpage for mq_open(3) for more details on this function.
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'ext/posix_mq/posix_mq.c', line 292
static VALUE init(int argc, VALUE *argv, VALUE self)
{
struct posix_mq *mq = get(self, 0);
struct open_args x;
VALUE name, oflags, mode, attr;
rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
if (TYPE(name) != T_STRING)
rb_raise(rb_eArgError, "name must be a string");
switch (TYPE(oflags)) {
case T_NIL:
x.oflags = O_RDONLY;
break;
case T_SYMBOL:
if (oflags == sym_r)
x.oflags = O_RDONLY;
else if (oflags == sym_w)
x.oflags = O_CREAT|O_WRONLY;
else if (oflags == sym_rw)
x.oflags = O_CREAT|O_RDWR;
else {
RB_GC_GUARD(oflags) = oflags;
rb_raise(rb_eArgError,
"symbol must be :r, :w, or :rw: %s",
RSTRING_PTR(oflags));
}
break;
case T_BIGNUM:
case T_FIXNUM:
x.oflags = NUM2INT(oflags);
break;
default:
rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
}
x.name = RSTRING_PTR(name);
x.argc = 2;
switch (TYPE(mode)) {
case T_FIXNUM:
x.argc = 3;
x.mode = NUM2UINT(mode);
break;
case T_NIL:
if (x.oflags & O_CREAT) {
x.argc = 3;
x.mode = 0666;
}
break;
default:
rb_raise(rb_eArgError, "mode not an integer");
}
switch (TYPE(attr)) {
case T_STRUCT:
x.argc = 4;
attr_from_struct(&x.attr, attr, 1);
/* principle of least surprise */
if (x.attr.mq_flags & O_NONBLOCK)
x.oflags |= O_NONBLOCK;
break;
case T_NIL:
break;
default:
RB_GC_GUARD(attr) = rb_inspect(attr);
rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
RSTRING_PTR(attr));
}
mq->des = (mqd_t)xopen(&x);
if (mq->des == MQD_INVALID)
rb_sys_fail("mq_open");
mq->name = rb_str_dup(name);
if (x.oflags & O_NONBLOCK)
mq->attr.mq_flags = O_NONBLOCK;
return self;
}
|
Class Method Details
.open(*args) ⇒ Object
Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit. All all arguments are passed to POSIX_MQ.new.
20 21 22 23 24 25 26 27 28 |
# File 'lib/posix_mq.rb', line 20 def open(*args) mq = new(*args) block_given? or return mq begin yield mq ensure mq.close unless mq.closed? end end |
.unlink(name) ⇒ Object
POSIX_MQ.unlink(name) => 1
Unlinks the message queue given by name
. The queue will be destroyed when the last process with the queue open closes its queue descriptors.
382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'ext/posix_mq/posix_mq.c', line 382
static VALUE s_unlink(VALUE self, VALUE name)
{
mqd_t rv;
if (TYPE(name) != T_STRING)
rb_raise(rb_eArgError, "argument must be a string");
rv = mq_unlink(RSTRING_PTR(name));
if (rv == MQD_INVALID)
rb_sys_fail("mq_unlink");
return INT2NUM(1);
}
|
Instance Method Details
#<<(buffer) ⇒ Object
mq << string => mq
Inserts the given string
into the message queue with a default priority of 0 and no timeout.
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 |
# File 'ext/posix_mq/posix_mq.c', line 470
static VALUE send0(VALUE self, VALUE buffer)
{
struct posix_mq *mq = get(self, 1);
struct rw_args x;
mqd_t rv;
setup_send_buffer(&x, buffer);
x.des = mq->des;
x.timeout = NULL;
x.msg_prio = 0;
if (mq->attr.mq_flags & O_NONBLOCK)
rv = (mqd_t)xsend(&x);
else
rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
RUBY_UBF_IO, 0);
if (rv == MQD_INVALID)
rb_sys_fail("mq_send");
return self;
}
|
#attr ⇒ Object
mq.attr => mq_attr
Returns a POSIX_MQ::Attr struct containing the attributes of the message queue. See the mq_getattr(3) manpage for more details.
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 |
# File 'ext/posix_mq/posix_mq.c', line 608
static VALUE getattr(VALUE self)
{
struct posix_mq *mq = get(self, 1);
VALUE astruct;
VALUE *ptr;
if (mq_getattr(mq->des, &mq->attr) < 0)
rb_sys_fail("mq_getattr");
astruct = rb_struct_alloc_noinit(cAttr);
ptr = RSTRUCT_PTR(astruct);
ptr[0] = LONG2NUM(mq->attr.mq_flags);
ptr[1] = LONG2NUM(mq->attr.mq_maxmsg);
ptr[2] = LONG2NUM(mq->attr.mq_msgsize);
ptr[3] = LONG2NUM(mq->attr.mq_curmsgs);
return astruct;
}
|
#attr=(astruct) ⇒ Object
mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
Only the IO::NONBLOCK flag may be set or unset (zero) in this manner. See the mq_setattr(3) manpage for more details.
Consider using the POSIX_MQ#nonblock= method as it is easier and more natural to use.
637 638 639 640 641 642 643 644 645 646 647 648 |
# File 'ext/posix_mq/posix_mq.c', line 637
static VALUE setattr(VALUE self, VALUE astruct)
{
struct posix_mq *mq = get(self, 1);
struct mq_attr newattr;
attr_from_struct(&newattr, astruct, 0);
if (mq_setattr(mq->des, &newattr, NULL) < 0)
rb_sys_fail("mq_setattr");
return astruct;
}
|
#close ⇒ Object
mq.close => nil
Closes the underlying message queue descriptor. If this descriptor had a registered notification request, the request will be removed so another descriptor or process may register a notification request. Message queue descriptors are automatically closed by garbage collection.
660 661 662 663 664 665 666 667 668 669 670 671 |
# File 'ext/posix_mq/posix_mq.c', line 660
static VALUE _close(VALUE self)
{
struct posix_mq *mq = get(self, 1);
if (mq_close(mq->des) < 0)
rb_sys_fail("mq_close");
mq->des = MQD_INVALID;
MQ_IO_SET(mq, Qnil);
return Qnil;
}
|
#closed? ⇒ Boolean
mq.closed? => true or false
Returns true
if the message queue descriptor is closed and therefore unusable, otherwise false
680 681 682 683 684 685 |
# File 'ext/posix_mq/posix_mq.c', line 680
static VALUE closed(VALUE self)
{
struct posix_mq *mq = get(self, 0);
return mq->des == MQD_INVALID ? Qtrue : Qfalse;
}
|
#dup ⇒ Object Also known as: clone
There’s no point in ever duping a POSIX_MQ object. All send/receive operations are atomic and only one native thread may be notified at a time
71 72 73 |
# File 'lib/posix_mq.rb', line 71 def dup self end |
#name ⇒ Object
mq.name => string
Returns the string name of message queue associated with mq
693 694 695 696 697 698 |
# File 'ext/posix_mq/posix_mq.c', line 693
static VALUE name(VALUE self)
{
struct posix_mq *mq = get(self, 0);
return rb_str_dup(mq->name);
}
|
#nonblock=(nb) ⇒ Object
mq.nonblock = boolean => boolean
Enables or disables non-blocking operation for the message queue descriptor. Errno::EAGAIN will be raised in situations where the queue would block. This is not compatible with timeout
arguments to POSIX_MQ#send and POSIX_MQ#receive.
849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 |
# File 'ext/posix_mq/posix_mq.c', line 849
static VALUE setnonblock(VALUE self, VALUE nb)
{
struct mq_attr newattr;
struct posix_mq *mq = get(self, 1);
if (nb == Qtrue)
newattr.mq_flags = O_NONBLOCK;
else if (nb == Qfalse)
newattr.mq_flags = 0;
else
rb_raise(rb_eArgError, "must be true or false");
if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
rb_sys_fail("mq_setattr");
mq->attr.mq_flags = newattr.mq_flags;
return nb;
}
|
#nonblock? ⇒ Boolean
mq.nonblock? => true or false
Returns the current non-blocking state of the message queue descriptor.
833 834 835 836 837 838 |
# File 'ext/posix_mq/posix_mq.c', line 833
static VALUE getnonblock(VALUE self)
{
struct posix_mq *mq = get(self, 1);
return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
}
|
#notify(&block) ⇒ Object
Executes the given block upon reception of the next message in an empty queue. If the message queue is not empty, then this block will only be fired after the queue is emptied and repopulated with one message.
This block will only be executed upon the arrival of the first message and must be reset/reenabled for subsequent notifications. This block will execute in a separate Ruby Thread (and thus will safely have the GVL by default).
This method is only supported on platforms that implement SIGEV_THREAD functionality in mq_notify(3). So far we only know of glibc + Linux supporting this. Please let us know if your platform can support this functionality and are willing to test for us <[email protected]>
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/posix_mq.rb', line 46 def notify(&block) block.arity == 1 or raise ArgumentError, "arity of notify block must be 1" r, w = IO.pipe self.notify_thread = Thread.new(r, w, self) do |r, w, mq| begin begin r.read(1) or raise Errno::EINTR rescue Errno::EINTR, Errno::EAGAIN retry end block.call(mq) ensure mq.notify_thread = nil r.close rescue nil w.close rescue nil end end self.notify = w nil end |
#notify=(arg) ⇒ Object
mq.notify = signal => signal
Registers the notification request to deliver a given signal
to the current process when message is received. If signal
is nil
, it will unregister and disable the notification request to allow other processes to register a request. If signal
is false
, it will register a no-op notification request which will prevent other processes from registering a notification. If signal
is an IO
object, it will spawn a thread upon the arrival of the next message and write one “\0” byte to the file descriptor belonging to that IO object. Only one process may have a notification request for a queue at a time, Errno::EBUSY will be raised if there is already a notification request registration for the queue.
Notifications are only fired once and processes must reregister for subsequent notifications.
For readers of the mq_notify(3) manpage, passing false
is equivalent to SIGEV_NONE, and passing nil
is equivalent of passing a NULL notification pointer to mq_notify(3).
785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 |
# File 'ext/posix_mq/posix_mq.c', line 785
static VALUE setnotify(VALUE self, VALUE arg)
{
struct posix_mq *mq = get(self, 1);
struct sigevent not;
struct sigevent * notification = ¬
VALUE rv = arg;
if (!NIL_P(mq->thread)) {
rb_funcall(mq->thread, id_kill, 0, 0);
mq->thread = Qnil;
}
not.sigev_notify = SIGEV_SIGNAL;
switch (TYPE(arg)) {
case T_FALSE:
not.sigev_notify = SIGEV_NONE;
break;
case T_NIL:
notification = NULL;
break;
case T_FIXNUM:
not.sigev_signo = NUM2INT(arg);
break;
case T_SYMBOL:
case T_STRING:
not.sigev_signo = lookup_sig(arg);
rv = INT2NUM(not.sigev_signo);
break;
case T_FILE:
setup_notify_io(¬, arg);
break;
default:
/* maybe support Proc+thread via sigev_notify_function.. */
rb_raise(rb_eArgError, "must be a signal or nil");
}
if (mq_notify(mq->des, notification) < 0)
rb_sys_fail("mq_notify");
return rv;
}
|
#notify_thread=(thread) ⇒ Object
:nodoc:
870 871 872 873 874 875 876 |
# File 'ext/posix_mq/posix_mq.c', line 870
static VALUE setnotifythread(VALUE self, VALUE thread)
{
struct posix_mq *mq = get(self, 1);
mq->thread = thread;
return thread;
}
|
#receive(*args) ⇒ Object
mq.receive([buffer, [timeout]]) => [ message, priority ]
Takes the highest priority message off the queue and returns an array containing the message as a String and the Integer priority of the message.
If the optional buffer
is present, then it must be a String which will receive the data.
If the optional timeout
is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout
has elapsed and there are no messages in the queue.
531 532 533 534 |
# File 'ext/posix_mq/posix_mq.c', line 531
static VALUE receive(int argc, VALUE *argv, VALUE self)
{
return _receive(1, argc, argv, self);
}
|
#send(*args) ⇒ Object
mq.send(string [,priority[, timeout]]) => nil
Inserts the given string
into the message queue with an optional, unsigned integer priority
. If the optional timeout
is specified, then Errno::ETIMEDOUT will be raised if the operation cannot complete before timeout
seconds has elapsed. Without timeout
, this method may block until the queue is writable.
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'ext/posix_mq/posix_mq.c', line 437
static VALUE _send(int argc, VALUE *argv, VALUE self)
{
struct posix_mq *mq = get(self, 1);
struct rw_args x;
VALUE buffer, prio, timeout;
mqd_t rv;
struct timespec expire;
rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
setup_send_buffer(&x, buffer);
x.des = mq->des;
x.timeout = convert_timeout(&expire, timeout);
x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
if (mq->attr.mq_flags & O_NONBLOCK)
rv = (mqd_t)xsend(&x);
else
rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
RUBY_UBF_IO, 0);
if (rv == MQD_INVALID)
rb_sys_fail("mq_send");
return Qnil;
}
|
#shift(*args) ⇒ Object
mq.shift([buffer, [timeout]]) => message
Takes the highest priority message off the queue and returns the message as a String.
If the optional buffer
is present, then it must be a String which will receive the data.
If the optional timeout
is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout
has elapsed and there are no messages in the queue.
551 552 553 554 |
# File 'ext/posix_mq/posix_mq.c', line 551
static VALUE shift(int argc, VALUE *argv, VALUE self)
{
return _receive(0, argc, argv, self);
}
|
#to_io ⇒ Object
mq.to_io => IO
Returns an IO.select-able IO
object. This method is only available under Linux and FreeBSD and is not intended to be portable.
501 502 503 504 505 506 507 508 509 510 |
# File 'ext/posix_mq/posix_mq.c', line 501
static VALUE to_io(VALUE self)
{
struct posix_mq *mq = get(self, 1);
int fd = MQD_TO_FD(mq->des);
if (NIL_P(mq->io))
mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
return mq->io;
}
|
#unlink ⇒ Object
mq.unlink => mq
Unlinks the message queue to prevent other processes from accessing it. All existing queue descriptors to this queue including those opened by other processes are unaffected. The queue will only be destroyed when the last process with open descriptors to this queue closes the descriptors.
406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'ext/posix_mq/posix_mq.c', line 406
static VALUE _unlink(VALUE self)
{
struct posix_mq *mq = get(self, 0);
mqd_t rv;
assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
rv = mq_unlink(RSTRING_PTR(mq->name));
if (rv == MQD_INVALID)
rb_sys_fail("mq_unlink");
return self;
}
|