Class: ZMQ::Loop

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/zmq/loop.rb,
ext/rbczmq/loop.c

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.add_oneshot_timer(delay, p = nil, &blk) ⇒ Object

Registers a oneshot timer with the event loop.

ZMQ::Loop.run do

ZL.add_oneshot_timer(0.2){ :work } # Fires once after 0.2s

end



87
88
89
# File 'lib/zmq/loop.rb', line 87

def self.add_oneshot_timer(delay, p = nil, &blk)
  add_timer(delay, 1, p, &blk)
end

.add_periodic_timer(delay, p = nil, &blk) ⇒ Object

Registers a periodic timer with the event loop.

ZMQ::Loop.run do

ZL.add_oneshot_timer(0.2){ :work } # Fires every 0.2s

end



97
98
99
# File 'lib/zmq/loop.rb', line 97

def self.add_periodic_timer(delay, p = nil, &blk)
  add_timer(delay, 0, p, &blk)
end

.add_timer(delay, times, p = nil, &blk) ⇒ Object

Lower level interface for timer registration

ZMQ::Loop.run do

timer = ZL.add_timer(0.1, 5){ :work } # Fires 5 times at 0.1s intervals

end



107
108
109
110
111
# File 'lib/zmq/loop.rb', line 107

def self.add_timer(delay, times, p = nil, &blk)
  timer = ZMQ::Timer.new(delay, times, p, &blk)
  instance.register_timer(timer)
  timer
end

.bind(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object

A higher level API for ZMQ socket bind and loop registration.

ZMQ::Loop.run do

ZL.bind(pub, "inproc://fanout", Producer)

end



42
43
44
# File 'lib/zmq/loop.rb', line 42

def self.bind(socket, address, handler = ZMQ::DefaultHandler, *args)
  attach(socket, :bind, address, handler, *args)
end

.connect(socket, address, handler = ZMQ::DefaultHandler, *args) ⇒ Object

A higher level API for ZMQ socket bind and loop registration.

ZMQ::Loop.run do

ZL.bind(pub, "inproc://fanout", Producer)
ZL.connect(sub, "inproc://fanout", Consumer)

end



53
54
55
# File 'lib/zmq/loop.rb', line 53

def self.connect(socket, address, handler = ZMQ::DefaultHandler, *args)
  attach(socket, :connect, address, handler, *args)
end

.register_readable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object

Registers a given ZMQ::Socket or IO instance for readable events notification.

ZMQ::Loop.run do

ZL.register_readable(sub, "inproc://fanout", Consumer)

end



63
64
65
66
67
# File 'lib/zmq/loop.rb', line 63

def self.register_readable(pollable, handler = ZMQ::DefaultHandler, *args)
  pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLIN)
  pollitem.handler = handler.new(pollitem, *args) if handler
  instance.register(pollitem)
end

.register_writable(pollable, handler = ZMQ::DefaultHandler, *args) ⇒ Object

Registers a given ZMQ::Socket or IO instance for writable events notification.

ZMQ::Loop.run do

ZL.register_writable(pub, "inproc://fanout", Producer)

end



75
76
77
78
79
# File 'lib/zmq/loop.rb', line 75

def self.register_writable(pollable, handler = ZMQ::DefaultHandler, *args)
  pollitem = ZMQ::Pollitem.new(pollable, ZMQ::POLLOUT)
  pollitem.handler = handler.new(pollitem, *args) if handler
  instance.register(pollitem)
end

.run(proc = nil, &blk) ⇒ Object

Start the reactor. Takes control of the current thread and returns when :

  • the 0MQ context is terminated or

  • the process is interrupted or

  • any event handler raises an error

  • any event handler returns false

ZMQ::Loop.run do # or ZMQ::Loop.run(&proc)

ZL.add_oneshot_timer(0.2){ ZL.stop }

end



30
31
32
33
34
# File 'lib/zmq/loop.rb', line 30

def self.run(proc = nil, &blk)
  self.instance = ZMQ::Loop.new
  (proc || blk).call
  instance.start
end

Instance Method Details

#cancel_timer(timer) ⇒ nil

Cancels a ZMQ::Timer instance previously registered with the reactor.

Examples

loop = ZMQ::Loop.new    =>   ZMQ::Loop
timer = ZMQ::Timer.new(1, 2){ :fired }
loop.register_timer(timer)   =>   true
loop.cancel_timer(timer)   =>   nil

Returns:

  • (nil)


494
495
496
497
498
499
500
501
502
503
504
# File 'ext/rbczmq/loop.c', line 494

static VALUE rb_czmq_loop_cancel_timer(VALUE obj, VALUE tm)
{
    int rc;
    errno = 0;
    ZmqGetLoop(obj);
    ZmqGetTimer(tm);
    rc = zloop_timer_end(loop->loop, (void *)tm);
    rb_ary_delete(loop->items, tm);
    ZmqAssert(rc);
    return Qtrue;
}

#destroynil

Explicitly destroys a reactor instance. Useful for manual memory management, otherwise the GC will take the same action if a message object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

loop = ZMQ::Loop.new    =>   ZMQ::Loop
loop.destroy   =>    nil

Returns:

  • (nil)


373
374
375
376
377
378
# File 'ext/rbczmq/loop.c', line 373

static VALUE rb_czmq_loop_destroy(VALUE obj)
{
    ZmqGetLoop(obj);
    rb_czmq_free_loop(loop);
    return Qnil;
}

#register(item) ⇒ true

Registers a poll item with the reactor. Only ZMQ::POLLIN and ZMQ::POLLOUT events are supported.

Examples

loop = ZMQ::Loop.new                        =>   ZMQ::Loop
item = ZMQ::Pollitem.new(sock, ZMQ::POLLIN) =>   ZMQ::Pollitem
loop.register(item)                         =>   ZMQ::Pollitem (coerced)

Returns:

  • (true)


415
416
417
418
419
420
421
422
423
424
425
426
427
428
# File 'ext/rbczmq/loop.c', line 415

static VALUE rb_czmq_loop_register(VALUE obj, VALUE pollable)
{
    int rc;
    errno = 0;
    ZmqGetLoop(obj);
    pollable = rb_czmq_pollitem_coerce(pollable);
    ZmqGetPollitem(pollable);
    rb_ary_push(loop->items, pollable);
    rc = zloop_poller(loop->loop, pollitem->item, rb_czmq_loop_pollitem_callback, (void *)pollitem);
    ZmqAssert(rc);
    /* Let pollable be verbose if loop is verbose */
    if (loop->verbose == true) rb_czmq_pollitem_set_verbose(pollable, Qtrue);
    return pollable;
}

#register_timer(timer) ⇒ true

Registers a ZMQ::Timer instance with the reactor.

Examples

loop = ZMQ::Loop.new    =>   ZMQ::Loop
timer = ZMQ::Timer.new(1, 2){ :fired }
loop.register_timer(timer)   =>   true

Returns:

  • (true)


468
469
470
471
472
473
474
475
476
477
478
# File 'ext/rbczmq/loop.c', line 468

static VALUE rb_czmq_loop_register_timer(VALUE obj, VALUE tm)
{
    int rc;
    errno = 0;
    ZmqGetLoop(obj);
    ZmqGetTimer(tm);
    rc = zloop_timer(loop->loop, timer->delay, timer->times, rb_czmq_loop_timer_callback, (void *)tm);
    rb_ary_push(loop->items, tm);
    ZmqAssert(rc);
    return Qtrue;
}

#remove(item) ⇒ nil

Removes a previously registered poll item from the reactor loop.

Examples

loop = ZMQ::Loop.new                        =>   ZMQ::Loop
item = ZMQ::Pollitem.new(sock, ZMQ::POLLIN) =>   ZMQ::Pollitem
loop.register(item)                         =>   true
loop.remove(item)                           =>   nil

Returns:

  • (nil)


444
445
446
447
448
449
450
451
452
453
# File 'ext/rbczmq/loop.c', line 444

static VALUE rb_czmq_loop_remove(VALUE obj, VALUE pollable)
{
    errno = 0;
    ZmqGetLoop(obj);
    pollable = rb_czmq_pollitem_coerce(pollable);
    ZmqGetPollitem(pollable);
    zloop_poller_end(loop->loop, pollitem->item);
    rb_ary_delete(loop->items, pollable);
    return Qnil;
}

#running?Boolean

Predicate that returns true if the reactor is currently running.

Examples

loop = ZMQ::Loop.new    =>   ZMQ::Loop
loop.running?    =>   false

Returns:

  • (Boolean)


322
323
324
325
326
# File 'ext/rbczmq/loop.c', line 322

static VALUE rb_czmq_loop_running_p(VALUE obj)
{
    ZmqGetLoop(obj);
    return (loop->running == true) ? Qtrue : Qfalse;
}

#startFixnum

Creates a new reactor instance and blocks the caller until the process is interrupted, the context terminates or the loop’s explicitly stopped via callback. Returns 0 if interrupted and -1 when stopped via a handler.

Examples

loop = ZMQ::Loop.new    =>   ZMQ::Loop
loop.start    =>   Fixnum

Returns:

  • (Fixnum)


296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'ext/rbczmq/loop.c', line 296

static VALUE rb_czmq_loop_start(VALUE obj)
{
    int rc;
    errno = 0;
    ZmqGetLoop(obj);
    rb_thread_schedule();
    zloop_timer(loop->loop, 1, 1, rb_czmq_loop_started_callback, loop);

    rc = (int)rb_thread_call_without_gvl(rb_czmq_loop_start_nogvl, (void *)loop, rb_czmq_loop_start_ubf, (void*)loop);
  
    if (rc > 0) rb_raise(rb_eZmqError, "internal event loop error!");
    return INT2NUM(rc);
}

#stopnil

Stops the reactor loop. ZMQ::Loop#start will return a -1 status code as this can only be called via a handler.

Examples

loop = ZMQ::Loop.new    =>   ZMQ::Loop
loop.add_timer(1){ loop.stop }
loop.start    =>   -1

Returns:

  • (nil)


351
352
353
354
355
356
357
# File 'ext/rbczmq/loop.c', line 351

static VALUE rb_czmq_loop_stop(VALUE obj)
{
    ZmqGetLoop(obj);
    if (loop->running == false) rb_raise(rb_eZmqError, "event loop not running!");
    rb_czmq_loop_stop0(loop);
    return Qnil;
}

#verbose=(true) ⇒ nil

Logs reactor activity to stdout - useful for debugging, but can be quite noisy with lots of activity.

Examples

loop = ZMQ::Loop.new    =>   ZMQ::Loop
loop.verbose = true   =>    nil

Returns:

  • (nil)


392
393
394
395
396
397
398
399
400
# File 'ext/rbczmq/loop.c', line 392

static VALUE rb_czmq_loop_set_verbose(VALUE obj, VALUE level)
{
    bool vlevel;
    ZmqGetLoop(obj);
    vlevel = (level == Qtrue) ? true : false;
    zloop_set_verbose(loop->loop, vlevel);
    loop->verbose = vlevel;
    return Qnil;
}