Class: ZMQ::Loop
- Inherits:
-
Object
- Object
- ZMQ::Loop
- Extended by:
- Forwardable
- Defined in:
- lib/zmq/loop.rb,
ext/rbczmq/loop.c
Class Method Summary collapse
-
.add_oneshot_timer(delay, p = nil, &blk) ⇒ Object
Registers a oneshot timer with the event loop.
-
.add_periodic_timer(delay, p = nil, &blk) ⇒ Object
Registers a periodic timer with the event loop.
-
.add_timer(delay, times, p = nil, &blk) ⇒ Object
Lower level interface for timer registration.
-
.bind(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
-
.connect(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
-
.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for readable events notification.
-
.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for writable events notification.
-
.run(proc = nil, &blk) ⇒ Object
Start the reactor.
Instance Method Summary collapse
-
#cancel_timer(timer) ⇒ nil
Cancels a ZMQ::Timer instance previously registered with the reactor.
-
#destroy ⇒ nil
Explicitly destroys a reactor instance.
-
#register(item) ⇒ true
Registers a poll item with the reactor.
-
#register_timer(timer) ⇒ true
Registers a ZMQ::Timer instance with the reactor.
-
#remove(item) ⇒ nil
Removes a previously registered poll item from the reactor loop.
-
#running? ⇒ Boolean
Predicate that returns true if the reactor is currently running.
-
#start ⇒ Fixnum
Creates a new reactor instance and blocks the caller until the process is interrupted, the context terminates or the loop’s explicitly stopped via callback.
-
#stop ⇒ nil
Stops the reactor loop.
-
#verbose=(true) ⇒ nil
Logs reactor activity to stdout - useful for debugging, but can be quite noisy with lots of activity.
Class Method Details
.add_oneshot_timer(delay, p = nil, &blk) ⇒ Object
Registers a oneshot timer with the event loop.
ZMQ::Loop.run do
ZL.add_oneshot_timer(0.2){ :work } # Fires once after 0.2s
end
87 88 89 |
# File 'lib/zmq/loop.rb', line 87 def self.add_oneshot_timer(delay, p = nil, &blk) add_timer(delay, 1, p, &blk) end |
.add_periodic_timer(delay, p = nil, &blk) ⇒ Object
Registers a periodic timer with the event loop.
ZMQ::Loop.run do
ZL.add_oneshot_timer(0.2){ :work } # Fires every 0.2s
end
97 98 99 |
# File 'lib/zmq/loop.rb', line 97 def self.add_periodic_timer(delay, p = nil, &blk) add_timer(delay, 0, p, &blk) end |
.add_timer(delay, times, p = nil, &blk) ⇒ Object
Lower level interface for timer registration
ZMQ::Loop.run do
timer = ZL.add_timer(0.1, 5){ :work } # Fires 5 times at 0.1s intervals
end
107 108 109 110 111 |
# File 'lib/zmq/loop.rb', line 107 def self.add_timer(delay, times, p = nil, &blk) timer = ZMQ::Timer.new(delay, times, p, &blk) instance.register_timer(timer) timer end |
.bind(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
ZMQ::Loop.run do
ZL.bind(pub, "inproc://fanout", Producer)
end
42 43 44 |
# File 'lib/zmq/loop.rb', line 42 def self.bind(socket, address, handler = ZMQ::DefaultHandler, *args) attach(socket, :bind, address, handler, *args) end |
.connect(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
53 54 55 |
# File 'lib/zmq/loop.rb', line 53 def self.connect(socket, address, handler = ZMQ::DefaultHandler, *args) attach(socket, :connect, address, handler, *args) end |
.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for readable events notification.
ZMQ::Loop.run do
ZL.register_readable(sub, "inproc://fanout", Consumer)
end
63 64 65 66 67 |
# File 'lib/zmq/loop.rb', line 63 def self.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLIN) pollitem.handler = handler.new(pollitem, *args) if handler instance.register(pollitem) end |
.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for writable events notification.
ZMQ::Loop.run do
ZL.register_writable(pub, "inproc://fanout", Producer)
end
75 76 77 78 79 |
# File 'lib/zmq/loop.rb', line 75 def self.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLOUT) pollitem.handler = handler.new(pollitem, *args) if handler instance.register(pollitem) end |
Instance Method Details
#cancel_timer(timer) ⇒ nil
494 495 496 497 498 499 500 501 502 503 504 |
# File 'ext/rbczmq/loop.c', line 494
static VALUE rb_czmq_loop_cancel_timer(VALUE obj, VALUE tm)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
ZmqGetTimer(tm);
rc = zloop_timer_end(loop->loop, (void *)tm);
rb_ary_delete(loop->items, tm);
ZmqAssert(rc);
return Qtrue;
}
|
#destroy ⇒ nil
373 374 375 376 377 378 |
# File 'ext/rbczmq/loop.c', line 373
static VALUE rb_czmq_loop_destroy(VALUE obj)
{
ZmqGetLoop(obj);
rb_czmq_free_loop(loop);
return Qnil;
}
|
#register(item) ⇒ true
Registers a poll item with the reactor. Only ZMQ::POLLIN and ZMQ::POLLOUT events are supported.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
item = ZMQ::Pollitem.new(sock, ZMQ::POLLIN) => ZMQ::Pollitem
loop.register(item) => ZMQ::Pollitem (coerced)
415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
# File 'ext/rbczmq/loop.c', line 415
static VALUE rb_czmq_loop_register(VALUE obj, VALUE pollable)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
rb_ary_push(loop->items, pollable);
rc = zloop_poller(loop->loop, pollitem->item, rb_czmq_loop_pollitem_callback, (void *)pollitem);
ZmqAssert(rc);
/* Let pollable be verbose if loop is verbose */
if (loop->verbose == true) rb_czmq_pollitem_set_verbose(pollable, Qtrue);
return pollable;
}
|
#register_timer(timer) ⇒ true
468 469 470 471 472 473 474 475 476 477 478 |
# File 'ext/rbczmq/loop.c', line 468
static VALUE rb_czmq_loop_register_timer(VALUE obj, VALUE tm)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
ZmqGetTimer(tm);
rc = zloop_timer(loop->loop, timer->delay, timer->times, rb_czmq_loop_timer_callback, (void *)tm);
rb_ary_push(loop->items, tm);
ZmqAssert(rc);
return Qtrue;
}
|
#remove(item) ⇒ nil
444 445 446 447 448 449 450 451 452 453 |
# File 'ext/rbczmq/loop.c', line 444
static VALUE rb_czmq_loop_remove(VALUE obj, VALUE pollable)
{
errno = 0;
ZmqGetLoop(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
zloop_poller_end(loop->loop, pollitem->item);
rb_ary_delete(loop->items, pollable);
return Qnil;
}
|
#running? ⇒ Boolean
322 323 324 325 326 |
# File 'ext/rbczmq/loop.c', line 322
static VALUE rb_czmq_loop_running_p(VALUE obj)
{
ZmqGetLoop(obj);
return (loop->running == true) ? Qtrue : Qfalse;
}
|
#start ⇒ Fixnum
296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'ext/rbczmq/loop.c', line 296
static VALUE rb_czmq_loop_start(VALUE obj)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
rb_thread_schedule();
zloop_timer(loop->loop, 1, 1, rb_czmq_loop_started_callback, loop);
rc = (int)rb_thread_call_without_gvl(rb_czmq_loop_start_nogvl, (void *)loop, rb_czmq_loop_start_ubf, (void*)loop);
if (rc > 0) rb_raise(rb_eZmqError, "internal event loop error!");
return INT2NUM(rc);
}
|
#stop ⇒ nil
351 352 353 354 355 356 357 |
# File 'ext/rbczmq/loop.c', line 351
static VALUE rb_czmq_loop_stop(VALUE obj)
{
ZmqGetLoop(obj);
if (loop->running == false) rb_raise(rb_eZmqError, "event loop not running!");
rb_czmq_loop_stop0(loop);
return Qnil;
}
|
#verbose=(true) ⇒ nil
392 393 394 395 396 397 398 399 400 |
# File 'ext/rbczmq/loop.c', line 392
static VALUE rb_czmq_loop_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
ZmqGetLoop(obj);
vlevel = (level == Qtrue) ? true : false;
zloop_set_verbose(loop->loop, vlevel);
loop->verbose = vlevel;
return Qnil;
}
|