Class: ZMQ::Loop
- Inherits:
-
Object
- Object
- ZMQ::Loop
- Extended by:
- Forwardable
- Defined in:
- lib/zmq/loop.rb,
ext/rbczmq/loop.c
Class Method Summary collapse
-
.add_oneshot_timer(delay, p = nil, &blk) ⇒ Object
Registers a oneshot timer with the event loop.
-
.add_periodic_timer(delay, p = nil, &blk) ⇒ Object
Registers a periodic timer with the event loop.
-
.add_timer(delay, times, p = nil, &blk) ⇒ Object
Lower level interface for timer registration.
-
.bind(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
-
.connect(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
-
.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for readable events notification.
-
.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for writable events notification.
-
.run(proc = nil, &blk) ⇒ Object
Start the reactor.
Instance Method Summary collapse
-
#cancel_timer(timer) ⇒ nil
Cancels a ZMQ::Timer instance previously registered with the reactor.
-
#destroy ⇒ nil
Explicitly destroys a reactor instance.
-
#register(item) ⇒ true
Registers a poll item with the reactor.
-
#register_timer(timer) ⇒ true
Registers a ZMQ::Timer instance with the reactor.
-
#remove(item) ⇒ nil
Removes a previously registered poll item from the reactor loop.
-
#running? ⇒ Boolean
Predicate that returns true if the reactor is currently running.
-
#start ⇒ Fixnum
Creates a new reactor instance and blocks the caller until the process is interrupted, the context terminates or the loop’s explicitly stopped via callback.
-
#stop ⇒ nil
Stops the reactor loop.
-
#verbose=(true) ⇒ nil
Logs reactor activity to stdout - useful for debugging, but can be quite noisy with lots of activity.
Class Method Details
.add_oneshot_timer(delay, p = nil, &blk) ⇒ Object
Registers a oneshot timer with the event loop.
ZMQ::Loop.run do
ZL.add_oneshot_timer(0.2){ :work } # Fires once after 0.2s
end
87 88 89 |
# File 'lib/zmq/loop.rb', line 87 def self.add_oneshot_timer(delay, p = nil, &blk) add_timer(delay, 1, p, &blk) end |
.add_periodic_timer(delay, p = nil, &blk) ⇒ Object
Registers a periodic timer with the event loop.
ZMQ::Loop.run do
ZL.add_oneshot_timer(0.2){ :work } # Fires every 0.2s
end
97 98 99 |
# File 'lib/zmq/loop.rb', line 97 def self.add_periodic_timer(delay, p = nil, &blk) add_timer(delay, 0, p, &blk) end |
.add_timer(delay, times, p = nil, &blk) ⇒ Object
Lower level interface for timer registration
ZMQ::Loop.run do
timer = ZL.add_timer(0.1, 5){ :work } # Fires 5 times at 0.1s intervals
end
107 108 109 110 111 |
# File 'lib/zmq/loop.rb', line 107 def self.add_timer(delay, times, p = nil, &blk) timer = ZMQ::Timer.new(delay, times, p, &blk) instance.register_timer(timer) timer end |
.bind(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
ZMQ::Loop.run do
ZL.bind(pub, "inproc://fanout", Producer)
end
42 43 44 |
# File 'lib/zmq/loop.rb', line 42 def self.bind(socket, address, handler = ZMQ::DefaultHandler, *args) attach(socket, :bind, address, handler, *args) end |
.connect(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
53 54 55 |
# File 'lib/zmq/loop.rb', line 53 def self.connect(socket, address, handler = ZMQ::DefaultHandler, *args) attach(socket, :connect, address, handler, *args) end |
.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for readable events notification.
ZMQ::Loop.run do
ZL.register_readable(sub, "inproc://fanout", Consumer)
end
63 64 65 66 67 |
# File 'lib/zmq/loop.rb', line 63 def self.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLIN) pollitem.handler = handler.new(pollitem, *args) if handler instance.register(pollitem) end |
.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for writable events notification.
ZMQ::Loop.run do
ZL.register_writable(pub, "inproc://fanout", Producer)
end
75 76 77 78 79 |
# File 'lib/zmq/loop.rb', line 75 def self.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLOUT) pollitem.handler = handler.new(pollitem, *args) if handler instance.register(pollitem) end |
Instance Method Details
#cancel_timer(timer) ⇒ nil
478 479 480 481 482 483 484 485 486 487 |
# File 'ext/rbczmq/loop.c', line 478
static VALUE rb_czmq_loop_cancel_timer(VALUE obj, VALUE tm)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
ZmqGetTimer(tm);
rc = zloop_timer_end(loop->loop, (void *)tm);
ZmqAssert(rc);
return Qtrue;
}
|
#destroy ⇒ nil
360 361 362 363 364 365 |
# File 'ext/rbczmq/loop.c', line 360
static VALUE rb_czmq_loop_destroy(VALUE obj)
{
ZmqGetLoop(obj);
rb_czmq_free_loop(loop);
return Qnil;
}
|
#register(item) ⇒ true
402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'ext/rbczmq/loop.c', line 402
static VALUE rb_czmq_loop_register(VALUE obj, VALUE pollable)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
rc = zloop_poller(loop->loop, pollitem->item, rb_czmq_loop_pollitem_callback, (void *)pollitem);
ZmqAssert(rc);
/* Let pollable be verbose if loop is verbose */
if (loop->verbose == true) rb_czmq_pollitem_set_verbose(pollable, Qtrue);
return Qtrue;
}
|
#register_timer(timer) ⇒ true
453 454 455 456 457 458 459 460 461 462 |
# File 'ext/rbczmq/loop.c', line 453
static VALUE rb_czmq_loop_register_timer(VALUE obj, VALUE tm)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
ZmqGetTimer(tm);
rc = zloop_timer(loop->loop, timer->delay, timer->times, rb_czmq_loop_timer_callback, (void *)tm);
ZmqAssert(rc);
return Qtrue;
}
|
#remove(item) ⇒ nil
430 431 432 433 434 435 436 437 438 |
# File 'ext/rbczmq/loop.c', line 430
static VALUE rb_czmq_loop_remove(VALUE obj, VALUE pollable)
{
errno = 0;
ZmqGetLoop(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
zloop_poller_end(loop->loop, pollitem->item);
return Qnil;
}
|
#running? ⇒ Boolean
309 310 311 312 313 |
# File 'ext/rbczmq/loop.c', line 309
static VALUE rb_czmq_loop_running_p(VALUE obj)
{
ZmqGetLoop(obj);
return (loop->running == true) ? Qtrue : Qfalse;
}
|
#start ⇒ Fixnum
283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'ext/rbczmq/loop.c', line 283
static VALUE rb_czmq_loop_start(VALUE obj)
{
int rc;
errno = 0;
ZmqGetLoop(obj);
THREAD_PASS;
zloop_timer(loop->loop, 1, 1, rb_czmq_loop_started_callback, loop);
rc = (int)rb_thread_call_without_gvl(rb_czmq_loop_start_nogvl, (void *)loop, rb_czmq_loop_start_ubf, (void*)loop);
if (rc > 0) rb_raise(rb_eZmqError, "internal event loop error!");
return INT2NUM(rc);
}
|
#stop ⇒ nil
338 339 340 341 342 343 344 |
# File 'ext/rbczmq/loop.c', line 338
static VALUE rb_czmq_loop_stop(VALUE obj)
{
ZmqGetLoop(obj);
if (loop->running == false) rb_raise(rb_eZmqError, "event loop not running!");
rb_czmq_loop_stop0(loop);
return Qnil;
}
|
#verbose=(true) ⇒ nil
379 380 381 382 383 384 385 386 387 |
# File 'ext/rbczmq/loop.c', line 379
static VALUE rb_czmq_loop_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
ZmqGetLoop(obj);
vlevel = (level == Qtrue) ? true : false;
zloop_set_verbose(loop->loop, vlevel);
loop->verbose = vlevel;
return Qnil;
}
|