Class: POSIX_MQ

Inherits:
Object
  • Object
show all
Defined in:
lib/posix_mq.rb,
ext/posix_mq/posix_mq.c

Defined Under Namespace

Classes: Attr

Constant Summary collapse

VERSION =

version of POSIX_MQ, currently 0.5.0

'0.5.0'
OPEN_MAX =

The maximum number of open message descriptors supported by the system. This may be -1, in which case it is dynamically set at runtime. Consult your operating system documentation for system-specific information about this.

LONG2NUM(sysconf(_SC_MQ_OPEN_MAX))
PRIO_MAX =

The maximum priority that may be specified for POSIX_MQ#send On POSIX-compliant systems, this is at least 31, but some systems allow higher limits. The minimum priority is always zero.

LONG2NUM(sysconf(_SC_MQ_PRIO_MAX))

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Object

POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq

Opens a POSIX message queue given by name. name should start with a slash (“/”) for portable applications.

If a Symbol is given in place of integer flags, then:

  • :r is equivalent to IO::RDONLY

  • :w is equivalent to IO::CREAT|IO::WRONLY

  • :rw is equivalent to IO::CREAT|IO::RDWR

mode is an integer and only used when IO::CREAT is used. mq_attr is a POSIX_MQ::Attr and only used if IO::CREAT is used. If mq_attr is not specified when creating a queue, then the system defaults will be used.

See the manpage for mq_open(3) for more details on this function.



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'ext/posix_mq/posix_mq.c', line 292

static VALUE init(int argc, VALUE *argv, VALUE self)
{
	struct posix_mq *mq = get(self, 0);
	struct open_args x;
	VALUE name, oflags, mode, attr;

	rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);

	if (TYPE(name) != T_STRING)
		rb_raise(rb_eArgError, "name must be a string");

	switch (TYPE(oflags)) {
	case T_NIL:
		x.oflags = O_RDONLY;
		break;
	case T_SYMBOL:
		if (oflags == sym_r)
			x.oflags = O_RDONLY;
		else if (oflags == sym_w)
			x.oflags = O_CREAT|O_WRONLY;
		else if (oflags == sym_rw)
			x.oflags = O_CREAT|O_RDWR;
		else {
			RB_GC_GUARD(oflags) = oflags;
			rb_raise(rb_eArgError,
			         "symbol must be :r, :w, or :rw: %s",
				 RSTRING_PTR(oflags));
		}
		break;
	case T_BIGNUM:
	case T_FIXNUM:
		x.oflags = NUM2INT(oflags);
		break;
	default:
		rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
	}

	x.name = RSTRING_PTR(name);
	x.argc = 2;

	switch (TYPE(mode)) {
	case T_FIXNUM:
		x.argc = 3;
		x.mode = NUM2UINT(mode);
		break;
	case T_NIL:
		if (x.oflags & O_CREAT) {
			x.argc = 3;
			x.mode = 0666;
		}
		break;
	default:
		rb_raise(rb_eArgError, "mode not an integer");
	}

	switch (TYPE(attr)) {
	case T_STRUCT:
		x.argc = 4;
		attr_from_struct(&x.attr, attr, 1);

		/* principle of least surprise */
		if (x.attr.mq_flags & O_NONBLOCK)
			x.oflags |= O_NONBLOCK;
		break;
	case T_NIL:
		break;
	default:
		RB_GC_GUARD(attr) = rb_inspect(attr);
		rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
		         RSTRING_PTR(attr));
	}

	mq->des = (mqd_t)xopen(&x);
	if (mq->des == MQD_INVALID)
		rb_sys_fail("mq_open");

	mq->name = rb_str_dup(name);
	if (x.oflags & O_NONBLOCK)
		mq->attr.mq_flags = O_NONBLOCK;

	return self;
}

Class Method Details

.open(*args) ⇒ Object

Opens a POSIX message queue and performs operations on the given block, closing the message queue at exit. All all arguments are passed to POSIX_MQ.new.



20
21
22
23
24
25
26
27
28
# File 'lib/posix_mq.rb', line 20

def open(*args)
  mq = new(*args)
  block_given? or return mq
  begin
    yield mq
  ensure
    mq.close unless mq.closed?
  end
end

POSIX_MQ.unlink(name) => 1

Unlinks the message queue given by name. The queue will be destroyed when the last process with the queue open closes its queue descriptors.



382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'ext/posix_mq/posix_mq.c', line 382

static VALUE s_unlink(VALUE self, VALUE name)
{
	mqd_t rv;

	if (TYPE(name) != T_STRING)
		rb_raise(rb_eArgError, "argument must be a string");

	rv = mq_unlink(RSTRING_PTR(name));
	if (rv == MQD_INVALID)
		rb_sys_fail("mq_unlink");

	return INT2NUM(1);
}

Instance Method Details

#<<(buffer) ⇒ Object

mq << string => mq

Inserts the given string into the message queue with a default priority of 0 and no timeout.



470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# File 'ext/posix_mq/posix_mq.c', line 470

static VALUE send0(VALUE self, VALUE buffer)
{
	struct posix_mq *mq = get(self, 1);
	struct rw_args x;
	mqd_t rv;

	setup_send_buffer(&x, buffer);
	x.des = mq->des;
	x.timeout = NULL;
	x.msg_prio = 0;

	if (mq->attr.mq_flags & O_NONBLOCK)
		rv = (mqd_t)xsend(&x);
	else
		rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
		                                      RUBY_UBF_IO, 0);

	if (rv == MQD_INVALID)
		rb_sys_fail("mq_send");

	return self;
}

#attrObject

mq.attr => mq_attr

Returns a POSIX_MQ::Attr struct containing the attributes of the message queue. See the mq_getattr(3) manpage for more details.



608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
# File 'ext/posix_mq/posix_mq.c', line 608

static VALUE getattr(VALUE self)
{
	struct posix_mq *mq = get(self, 1);
	VALUE astruct;
	VALUE *ptr;

	if (mq_getattr(mq->des, &mq->attr) < 0)
		rb_sys_fail("mq_getattr");

	astruct = rb_struct_alloc_noinit(cAttr);
	ptr = RSTRUCT_PTR(astruct);
	ptr[0] = LONG2NUM(mq->attr.mq_flags);
	ptr[1] = LONG2NUM(mq->attr.mq_maxmsg);
	ptr[2] = LONG2NUM(mq->attr.mq_msgsize);
	ptr[3] = LONG2NUM(mq->attr.mq_curmsgs);

	return astruct;
}

#attr=(astruct) ⇒ Object

mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr

Only the IO::NONBLOCK flag may be set or unset (zero) in this manner. See the mq_setattr(3) manpage for more details.

Consider using the POSIX_MQ#nonblock= method as it is easier and more natural to use.



637
638
639
640
641
642
643
644
645
646
647
648
# File 'ext/posix_mq/posix_mq.c', line 637

static VALUE setattr(VALUE self, VALUE astruct)
{
	struct posix_mq *mq = get(self, 1);
	struct mq_attr newattr;

	attr_from_struct(&newattr, astruct, 0);

	if (mq_setattr(mq->des, &newattr, NULL) < 0)
		rb_sys_fail("mq_setattr");

	return astruct;
}

#closeObject

mq.close => nil

Closes the underlying message queue descriptor. If this descriptor had a registered notification request, the request will be removed so another descriptor or process may register a notification request. Message queue descriptors are automatically closed by garbage collection.



660
661
662
663
664
665
666
667
668
669
670
671
# File 'ext/posix_mq/posix_mq.c', line 660

static VALUE _close(VALUE self)
{
	struct posix_mq *mq = get(self, 1);

	if (mq_close(mq->des) < 0)
		rb_sys_fail("mq_close");

	mq->des = MQD_INVALID;
	MQ_IO_SET(mq, Qnil);

	return Qnil;
}

#closed?Boolean

mq.closed? => true or false

Returns true if the message queue descriptor is closed and therefore unusable, otherwise false

Returns:

  • (Boolean)


680
681
682
683
684
685
# File 'ext/posix_mq/posix_mq.c', line 680

static VALUE closed(VALUE self)
{
	struct posix_mq *mq = get(self, 0);

	return mq->des == MQD_INVALID ? Qtrue : Qfalse;
}

#dupObject Also known as: clone

There’s no point in ever duping a POSIX_MQ object. All send/receive operations are atomic and only one native thread may be notified at a time



71
72
73
# File 'lib/posix_mq.rb', line 71

def dup
  self
end

#nameObject

mq.name => string

Returns the string name of message queue associated with mq



693
694
695
696
697
698
# File 'ext/posix_mq/posix_mq.c', line 693

static VALUE name(VALUE self)
{
	struct posix_mq *mq = get(self, 0);

	return rb_str_dup(mq->name);
}

#nonblock=(nb) ⇒ Object

mq.nonblock = boolean => boolean

Enables or disables non-blocking operation for the message queue descriptor. Errno::EAGAIN will be raised in situations where the queue would block. This is not compatible with timeout arguments to POSIX_MQ#send and POSIX_MQ#receive.



849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
# File 'ext/posix_mq/posix_mq.c', line 849

static VALUE setnonblock(VALUE self, VALUE nb)
{
	struct mq_attr newattr;
	struct posix_mq *mq = get(self, 1);

	if (nb == Qtrue)
		newattr.mq_flags = O_NONBLOCK;
	else if (nb == Qfalse)
		newattr.mq_flags = 0;
	else
		rb_raise(rb_eArgError, "must be true or false");

	if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
		rb_sys_fail("mq_setattr");

	mq->attr.mq_flags = newattr.mq_flags;

	return nb;
}

#nonblock?Boolean

mq.nonblock? => true or false

Returns the current non-blocking state of the message queue descriptor.

Returns:

  • (Boolean)


833
834
835
836
837
838
# File 'ext/posix_mq/posix_mq.c', line 833

static VALUE getnonblock(VALUE self)
{
	struct posix_mq *mq = get(self, 1);

	return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
}

#notify(&block) ⇒ Object

Executes the given block upon reception of the next message in an empty queue. If the message queue is not empty, then this block will only be fired after the queue is emptied and repopulated with one message.

This block will only be executed upon the arrival of the first message and must be reset/reenabled for subsequent notifications. This block will execute in a separate Ruby Thread (and thus will safely have the GVL by default).

This method is only supported on platforms that implement SIGEV_THREAD functionality in mq_notify(3). So far we only know of glibc + Linux supporting this. Please let us know if your platform can support this functionality and are willing to test for us <[email protected]>



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/posix_mq.rb', line 46

def notify(&block)
  block.arity == 1 or
    raise ArgumentError, "arity of notify block must be 1"
  r, w = IO.pipe
  self.notify_thread = Thread.new(r, w, self) do |r, w, mq|
    begin
      begin
        r.read(1) or raise Errno::EINTR
      rescue Errno::EINTR, Errno::EAGAIN
        retry
      end
      block.call(mq)
    ensure
      mq.notify_thread = nil
      r.close rescue nil
      w.close rescue nil
    end
  end
  self.notify = w
  nil
end

#notify=(arg) ⇒ Object

mq.notify = signal => signal

Registers the notification request to deliver a given signal to the current process when message is received. If signal is nil, it will unregister and disable the notification request to allow other processes to register a request. If signal is false, it will register a no-op notification request which will prevent other processes from registering a notification. If signal is an IO object, it will spawn a thread upon the arrival of the next message and write one “\0” byte to the file descriptor belonging to that IO object. Only one process may have a notification request for a queue at a time, Errno::EBUSY will be raised if there is already a notification request registration for the queue.

Notifications are only fired once and processes must reregister for subsequent notifications.

For readers of the mq_notify(3) manpage, passing false is equivalent to SIGEV_NONE, and passing nil is equivalent of passing a NULL notification pointer to mq_notify(3).



785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
# File 'ext/posix_mq/posix_mq.c', line 785

static VALUE setnotify(VALUE self, VALUE arg)
{
	struct posix_mq *mq = get(self, 1);
	struct sigevent not;
	struct sigevent * notification = &not;
	VALUE rv = arg;

	if (!NIL_P(mq->thread)) {
		rb_funcall(mq->thread, id_kill, 0, 0);
		mq->thread = Qnil;
	}
	not.sigev_notify = SIGEV_SIGNAL;

	switch (TYPE(arg)) {
	case T_FALSE:
		not.sigev_notify = SIGEV_NONE;
		break;
	case T_NIL:
		notification = NULL;
		break;
	case T_FIXNUM:
		not.sigev_signo = NUM2INT(arg);
		break;
	case T_SYMBOL:
	case T_STRING:
		not.sigev_signo = lookup_sig(arg);
		rv = INT2NUM(not.sigev_signo);
		break;
	case T_FILE:
		setup_notify_io(&not, arg);
		break;
	default:
		/* maybe support Proc+thread via sigev_notify_function.. */
		rb_raise(rb_eArgError, "must be a signal or nil");
	}

	if (mq_notify(mq->des, notification) < 0)
		rb_sys_fail("mq_notify");

	return rv;
}

#notify_thread=(thread) ⇒ Object

:nodoc:



870
871
872
873
874
875
876
# File 'ext/posix_mq/posix_mq.c', line 870

static VALUE setnotifythread(VALUE self, VALUE thread)
{
	struct posix_mq *mq = get(self, 1);

	mq->thread = thread;
	return thread;
}

#receive(*args) ⇒ Object

mq.receive([buffer, [timeout]]) => [ message, priority ]

Takes the highest priority message off the queue and returns an array containing the message as a String and the Integer priority of the message.

If the optional buffer is present, then it must be a String which will receive the data.

If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.



531
532
533
534
# File 'ext/posix_mq/posix_mq.c', line 531

static VALUE receive(int argc, VALUE *argv, VALUE self)
{
	return _receive(1, argc, argv, self);
}

#send(*args) ⇒ Object

mq.send(string [,priority[, timeout]]) => nil

Inserts the given string into the message queue with an optional, unsigned integer priority. If the optional timeout is specified, then Errno::ETIMEDOUT will be raised if the operation cannot complete before timeout seconds has elapsed. Without timeout, this method may block until the queue is writable.



437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'ext/posix_mq/posix_mq.c', line 437

static VALUE _send(int argc, VALUE *argv, VALUE self)
{
	struct posix_mq *mq = get(self, 1);
	struct rw_args x;
	VALUE buffer, prio, timeout;
	mqd_t rv;
	struct timespec expire;

	rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);

	setup_send_buffer(&x, buffer);
	x.des = mq->des;
	x.timeout = convert_timeout(&expire, timeout);
	x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);

	if (mq->attr.mq_flags & O_NONBLOCK)
		rv = (mqd_t)xsend(&x);
	else
		rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
		                                      RUBY_UBF_IO, 0);
	if (rv == MQD_INVALID)
		rb_sys_fail("mq_send");

	return Qnil;
}

#shift(*args) ⇒ Object

mq.shift([buffer, [timeout]]) => message

Takes the highest priority message off the queue and returns the message as a String.

If the optional buffer is present, then it must be a String which will receive the data.

If the optional timeout is present, then it may be a Float or Integer specifying the timeout in seconds. Errno::ETIMEDOUT will be raised if timeout has elapsed and there are no messages in the queue.



551
552
553
554
# File 'ext/posix_mq/posix_mq.c', line 551

static VALUE shift(int argc, VALUE *argv, VALUE self)
{
	return _receive(0, argc, argv, self);
}

#to_ioObject

mq.to_io => IO

Returns an IO.select-able IO object. This method is only available under Linux and FreeBSD and is not intended to be portable.



501
502
503
504
505
506
507
508
509
510
# File 'ext/posix_mq/posix_mq.c', line 501

static VALUE to_io(VALUE self)
{
	struct posix_mq *mq = get(self, 1);
	int fd = MQD_TO_FD(mq->des);

	if (NIL_P(mq->io))
		mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));

	return mq->io;
}

mq.unlink => mq

Unlinks the message queue to prevent other processes from accessing it. All existing queue descriptors to this queue including those opened by other processes are unaffected. The queue will only be destroyed when the last process with open descriptors to this queue closes the descriptors.



406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'ext/posix_mq/posix_mq.c', line 406

static VALUE _unlink(VALUE self)
{
	struct posix_mq *mq = get(self, 0);
	mqd_t rv;

	assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");

	rv = mq_unlink(RSTRING_PTR(mq->name));
	if (rv == MQD_INVALID)
		rb_sys_fail("mq_unlink");

	return self;
}