Class: ZMQ::Loop
- Inherits:
-
Object
- Object
- ZMQ::Loop
- Extended by:
- Forwardable
- Defined in:
- lib/zmq/loop.rb,
ext/rbczmq/loop.c
Class Method Summary collapse
-
.add_oneshot_timer(delay, p = nil, &blk) ⇒ Object
Registers a oneshot timer with the event loop.
-
.add_periodic_timer(delay, p = nil, &blk) ⇒ Object
Registers a periodic timer with the event loop.
-
.add_timer(delay, times, p = nil, &blk) ⇒ Object
Lower level interface for timer registration.
-
.bind(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
-
.connect(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
-
.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for readable events notification.
-
.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for writable events notification.
-
.run(proc = nil, &blk) ⇒ Object
Start the reactor.
Instance Method Summary collapse
-
#cancel_timer(timer) ⇒ nil
Cancels a ZMQ::Timer instance previously registered with the reactor.
-
#destroy ⇒ nil
Explicitly destroys a reactor instance.
-
#register(item) ⇒ true
Registers a poll item with the reactor.
-
#register_timer(timer) ⇒ true
Registers a ZMQ::Timer instance with the reactor.
-
#remove(item) ⇒ nil
Removes a previously registered poll item from the reactor loop.
-
#running? ⇒ Boolean
Predicate that returns true if the reactor is currently running.
-
#start ⇒ Fixnum
Creates a new reactor instance and blocks the caller until the process is interrupted, the context terminates or the loop’s explicitly stopped via callback.
-
#stop ⇒ nil
Stops the reactor loop.
-
#verbose=(true) ⇒ nil
Logs reactor activity to stdout - useful for debugging, but can be quite noisy with lots of activity.
Class Method Details
.add_oneshot_timer(delay, p = nil, &blk) ⇒ Object
Registers a oneshot timer with the event loop.
ZMQ::Loop.run do
ZL.add_oneshot_timer(0.2){ :work } # Fires once after 0.2s
end
87 88 89 |
# File 'lib/zmq/loop.rb', line 87 def self.add_oneshot_timer(delay, p = nil, &blk) add_timer(delay, 1, p, &blk) end |
.add_periodic_timer(delay, p = nil, &blk) ⇒ Object
Registers a periodic timer with the event loop.
ZMQ::Loop.run do
ZL.add_oneshot_timer(0.2){ :work } # Fires every 0.2s
end
97 98 99 |
# File 'lib/zmq/loop.rb', line 97 def self.add_periodic_timer(delay, p = nil, &blk) add_timer(delay, 0, p, &blk) end |
.add_timer(delay, times, p = nil, &blk) ⇒ Object
Lower level interface for timer registration
ZMQ::Loop.run do
timer = ZL.add_timer(0.1, 5){ :work } # Fires 5 times at 0.1s intervals
end
107 108 109 110 111 |
# File 'lib/zmq/loop.rb', line 107 def self.add_timer(delay, times, p = nil, &blk) timer = ZMQ::Timer.new(delay, times, p, &blk) instance.register_timer(timer) timer end |
.bind(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
ZMQ::Loop.run do
ZL.bind(pub, "inproc://fanout", Producer)
end
42 43 44 |
# File 'lib/zmq/loop.rb', line 42 def self.bind(socket, address, handler = ZMQ::DefaultHandler, *args) attach(socket, :bind, address, handler, *args) end |
.connect(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object
A higher level API for ZMQ socket bind and loop registration.
ZMQ::Loop.run do
ZL.bind(pub, "inproc://fanout", Producer)
ZL.connect(sub, "inproc://fanout", Consumer)
end
53 54 55 |
# File 'lib/zmq/loop.rb', line 53 def self.connect(socket, address, handler = ZMQ::DefaultHandler, *args) attach(socket, :connect, address, handler, *args) end |
.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for readable events notification.
ZMQ::Loop.run do
ZL.register_readable(sub, "inproc://fanout", Consumer)
end
63 64 65 66 67 |
# File 'lib/zmq/loop.rb', line 63 def self.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLIN) pollitem.handler = handler.new(pollitem, *args) if handler instance.register(pollitem) end |
.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object
Registers a given ZMQ::Socket or IO instance for writable events notification.
ZMQ::Loop.run do
ZL.register_writable(pub, "inproc://fanout", Producer)
end
75 76 77 78 79 |
# File 'lib/zmq/loop.rb', line 75 def self.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLOUT) pollitem.handler = handler.new(pollitem, *args) if handler instance.register(pollitem) end |
.run(proc = nil, &blk) ⇒ Object
Start the reactor. Takes control of the current thread and returns when :
-
the 0MQ context is terminated or
-
the process is interrupted or
-
any event handler raises an error
-
any event handler returns false
ZMQ::Loop.run do # or ZMQ::Loop.run(&proc)
ZL.add_oneshot_timer(0.2){ ZL.stop }
end
30 31 32 33 34 |
# File 'lib/zmq/loop.rb', line 30 def self.run(proc = nil, &blk) self.instance = ZMQ::Loop.new (proc || blk).call instance.start end |
Instance Method Details
#cancel_timer(timer) ⇒ nil
Cancels a ZMQ::Timer instance previously registered with the reactor.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
timer = ZMQ::Timer.new(1, 2){ :fired }
loop.register_timer(timer) => true
loop.cancel_timer(timer) => nil
494 495 496 497 498 499 500 501 502 503 504 |
# File 'ext/rbczmq/loop.c', line 494 static VALUE rb_czmq_loop_cancel_timer(VALUE obj, VALUE tm) { int rc; errno = 0; ZmqGetLoop(obj); ZmqGetTimer(tm); rc = zloop_timer_end(loop->loop, (void *)tm); rb_ary_delete(loop->items, tm); ZmqAssert(rc); return Qtrue; } |
#destroy ⇒ nil
Explicitly destroys a reactor instance. Useful for manual memory management, otherwise the GC will take the same action if a message object is not reachable anymore on the next GC cycle. This is a lower level API.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
loop.destroy => nil
373 374 375 376 377 378 |
# File 'ext/rbczmq/loop.c', line 373 static VALUE rb_czmq_loop_destroy(VALUE obj) { ZmqGetLoop(obj); rb_czmq_free_loop(loop); return Qnil; } |
#register(item) ⇒ true
Registers a poll item with the reactor. Only ZMQ::POLLIN and ZMQ::POLLOUT events are supported.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
item = ZMQ::Pollitem.new(sock, ZMQ::POLLIN) => ZMQ::Pollitem
loop.register(item) => ZMQ::Pollitem (coerced)
415 416 417 418 419 420 421 422 423 424 425 426 427 428 |
# File 'ext/rbczmq/loop.c', line 415 static VALUE rb_czmq_loop_register(VALUE obj, VALUE pollable) { int rc; errno = 0; ZmqGetLoop(obj); pollable = rb_czmq_pollitem_coerce(pollable); ZmqGetPollitem(pollable); rb_ary_push(loop->items, pollable); rc = zloop_poller(loop->loop, pollitem->item, rb_czmq_loop_pollitem_callback, (void *)pollitem); ZmqAssert(rc); /* Let pollable be verbose if loop is verbose */ if (loop->verbose == true) rb_czmq_pollitem_set_verbose(pollable, Qtrue); return pollable; } |
#register_timer(timer) ⇒ true
Registers a ZMQ::Timer instance with the reactor.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
timer = ZMQ::Timer.new(1, 2){ :fired }
loop.register_timer(timer) => true
468 469 470 471 472 473 474 475 476 477 478 |
# File 'ext/rbczmq/loop.c', line 468 static VALUE rb_czmq_loop_register_timer(VALUE obj, VALUE tm) { int rc; errno = 0; ZmqGetLoop(obj); ZmqGetTimer(tm); rc = zloop_timer(loop->loop, timer->delay, timer->times, rb_czmq_loop_timer_callback, (void *)tm); rb_ary_push(loop->items, tm); ZmqAssert(rc); return Qtrue; } |
#remove(item) ⇒ nil
Removes a previously registered poll item from the reactor loop.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
item = ZMQ::Pollitem.new(sock, ZMQ::POLLIN) => ZMQ::Pollitem
loop.register(item) => true
loop.remove(item) => nil
444 445 446 447 448 449 450 451 452 453 |
# File 'ext/rbczmq/loop.c', line 444 static VALUE rb_czmq_loop_remove(VALUE obj, VALUE pollable) { errno = 0; ZmqGetLoop(obj); pollable = rb_czmq_pollitem_coerce(pollable); ZmqGetPollitem(pollable); zloop_poller_end(loop->loop, pollitem->item); rb_ary_delete(loop->items, pollable); return Qnil; } |
#running? ⇒ Boolean
Predicate that returns true if the reactor is currently running.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
loop.running? => false
322 323 324 325 326 |
# File 'ext/rbczmq/loop.c', line 322 static VALUE rb_czmq_loop_running_p(VALUE obj) { ZmqGetLoop(obj); return (loop->running == true) ? Qtrue : Qfalse; } |
#start ⇒ Fixnum
Creates a new reactor instance and blocks the caller until the process is interrupted, the context terminates or the loop’s explicitly stopped via callback. Returns 0 if interrupted and -1 when stopped via a handler.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
loop.start => Fixnum
296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'ext/rbczmq/loop.c', line 296 static VALUE rb_czmq_loop_start(VALUE obj) { int rc; errno = 0; ZmqGetLoop(obj); rb_thread_schedule(); zloop_timer(loop->loop, 1, 1, rb_czmq_loop_started_callback, loop); rc = (int)rb_thread_call_without_gvl(rb_czmq_loop_start_nogvl, (void *)loop, rb_czmq_loop_start_ubf, (void*)loop); if (rc > 0) rb_raise(rb_eZmqError, "internal event loop error!"); return INT2NUM(rc); } |
#stop ⇒ nil
Stops the reactor loop. ZMQ::Loop#start will return a -1 status code as this can only be called via a handler.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
loop.add_timer(1){ loop.stop }
loop.start => -1
351 352 353 354 355 356 357 |
# File 'ext/rbczmq/loop.c', line 351 static VALUE rb_czmq_loop_stop(VALUE obj) { ZmqGetLoop(obj); if (loop->running == false) rb_raise(rb_eZmqError, "event loop not running!"); rb_czmq_loop_stop0(loop); return Qnil; } |
#verbose=(true) ⇒ nil
Logs reactor activity to stdout - useful for debugging, but can be quite noisy with lots of activity.
Examples
loop = ZMQ::Loop.new => ZMQ::Loop
loop.verbose = true => nil
392 393 394 395 396 397 398 399 400 |
# File 'ext/rbczmq/loop.c', line 392 static VALUE rb_czmq_loop_set_verbose(VALUE obj, VALUE level) { bool vlevel; ZmqGetLoop(obj); vlevel = (level == Qtrue) ? true : false; zloop_set_verbose(loop->loop, vlevel); loop->verbose = vlevel; return Qnil; } |