Module: ZMQ
- Defined in:
- lib/zmq.rb,
lib/zmq/logger.rb,
lib/zmq/version.rb,
ext/rbczmq/rbczmq_ext.c
Defined Under Namespace
Modules: DownstreamSocket, UpstreamSocket Classes: Beacon, Context, DefaultHandler, Error, Frame, Handler, Logger, Loop, Message, Monitor, Poller, Pollitem, Socket, Timer
Constant Summary collapse
- VERSION =
"1.7.9"
- POLLIN =
INT2NUM(ZMQ_POLLIN)
- POLLOUT =
INT2NUM(ZMQ_POLLOUT)
- POLLERR =
INT2NUM(ZMQ_POLLERR)
- PAIR =
INT2NUM(ZMQ_PAIR)
- SUB =
INT2NUM(ZMQ_SUB)
- PUB =
INT2NUM(ZMQ_PUB)
- REQ =
INT2NUM(ZMQ_REQ)
- REP =
INT2NUM(ZMQ_REP)
- DEALER =
INT2NUM(ZMQ_DEALER)
- ROUTER =
INT2NUM(ZMQ_ROUTER)
- PUSH =
INT2NUM(ZMQ_PUSH)
- PULL =
INT2NUM(ZMQ_PULL)
- XSUB =
INT2NUM(ZMQ_XSUB)
- XPUB =
INT2NUM(ZMQ_XPUB)
- STREAM =
INT2NUM(ZMQ_STREAM)
- EFSM =
INT2NUM(EFSM)
- ENOCOMPATPROTO =
INT2NUM(ENOCOMPATPROTO)
- ETERM =
INT2NUM(ETERM)
- EMTHREAD =
INT2NUM(EMTHREAD)
- EVENT_CONNECTED =
INT2NUM(ZMQ_EVENT_CONNECTED)
- EVENT_CONNECT_DELAYED =
INT2NUM(ZMQ_EVENT_CONNECT_DELAYED)
- EVENT_CONNECT_RETRIED =
INT2NUM(ZMQ_EVENT_CONNECT_RETRIED)
- EVENT_LISTENING =
INT2NUM(ZMQ_EVENT_LISTENING)
- EVENT_BIND_FAILED =
INT2NUM(ZMQ_EVENT_BIND_FAILED)
- EVENT_ACCEPTED =
INT2NUM(ZMQ_EVENT_ACCEPTED)
- EVENT_ACCEPT_FAILED =
INT2NUM(ZMQ_EVENT_ACCEPT_FAILED)
- EVENT_CLOSED =
INT2NUM(ZMQ_EVENT_CLOSED)
- EVENT_CLOSE_FAILED =
INT2NUM(ZMQ_EVENT_CLOSE_FAILED)
- EVENT_DISCONNECTED =
INT2NUM(ZMQ_EVENT_DISCONNECTED)
- EVENT_ALL =
INT2NUM(ZMQ_EVENT_ALL)
Class Method Summary collapse
-
.context ⇒ Object
Returns the ZMQ context for this process, if any.
-
.czmq_version ⇒ Array
Returns the czmq version linked against.
-
.errno ⇒ Fixnum
Returns the last known ZMQ errno (if any) as a Fixnum.
-
.error ⇒ ZMQ::Error
Returns the last known ZMQ error (if any) as a ZMQ::Error instance.
-
.Frame(data = nil) ⇒ Object
Sugaring for creating new ZMQ frames.
-
.interrupted! ⇒ nil
Callback for Ruby signal handlers for terminating blocking functions and the reactor loop in libczmq.
-
.interrupted? ⇒ Boolean
Returns true if the process was interrupted by signal.
-
.log("msg") ⇒ nil
Logs a timestamped message to stdout.
-
.loop ⇒ Object
Higher level loop API.
-
.Message(*parts) ⇒ Object
Sugaring for creating new ZMQ messages.
-
.now ⇒ Fixnum
Returns a current timestamp as a Fixnum.
-
.Pollitem(pollable, events = nil) ⇒ Object
Sugaring for creating new poll items.
-
.proxy(frontend, backend, capture = nil) ⇒ nil
Run the ZMQ proxy method echoing messages received from front end socket to back end and vice versa, copying messages to the capture socket if provided.
- .resolver ⇒ Object
-
.select(read = [], write = [], error = [], timeout = nil) ⇒ Object
API sugaring: IO.select compatible API, but for ZMQ sockets.
-
.version ⇒ Array
Returns the libzmq version linked against.
Class Method Details
.context ⇒ Object
Returns the ZMQ context for this process, if any
44 45 46 |
# File 'lib/zmq.rb', line 44 def self.context @__zmq_ctx_process[Process.pid] end |
.czmq_version ⇒ Array
81 82 83 84 |
# File 'ext/rbczmq/rbczmq_ext.c', line 81
static VALUE rb_czmq_m_czmq_version(ZMQ_UNUSED VALUE obj)
{
return rb_ary_new3(3, INT2NUM(CZMQ_VERSION_MAJOR), INT2NUM(CZMQ_VERSION_MINOR), INT2NUM(CZMQ_VERSION_PATCH));
}
|
.errno ⇒ Fixnum
150 151 152 153 |
# File 'ext/rbczmq/rbczmq_ext.c', line 150
static VALUE rb_czmq_m_errno(ZMQ_UNUSED VALUE obj)
{
return INT2NUM(zmq_errno());
}
|
.error ⇒ ZMQ::Error
131 132 133 134 135 136 137 |
# File 'ext/rbczmq/rbczmq_ext.c', line 131
static VALUE rb_czmq_m_error(ZMQ_UNUSED VALUE obj)
{
int err;
err = zmq_errno();
if (err == 0) return Qnil;
return rb_exc_new2(rb_eZmqError, zmq_strerror(zmq_errno()));
}
|
.Frame(data = nil) ⇒ Object
Sugaring for creating new ZMQ frames
ZMQ::Frame(“frame”) => ZMQ::Frame
20 21 22 |
# File 'lib/zmq.rb', line 20 def self.Frame(data = nil) ZMQ::Frame.new(data) end |
.interrupted! ⇒ nil
Callback for Ruby signal handlers for terminating blocking functions and the reactor loop in libczmq.
Examples
ZMQ.interrupted! => nil
166 167 168 169 170 |
# File 'ext/rbczmq/rbczmq_ext.c', line 166
static VALUE rb_czmq_m_interrupted_bang(ZMQ_UNUSED VALUE obj)
{
zctx_interrupted = 1;
return Qnil;
}
|
.interrupted? ⇒ Boolean
47 48 49 50 |
# File 'ext/rbczmq/rbczmq_ext.c', line 47
static VALUE rb_czmq_m_interrupted_p(ZMQ_UNUSED VALUE obj)
{
return (zctx_interrupted == true) ? Qtrue : Qfalse;
}
|
.log("msg") ⇒ nil
113 114 115 116 117 118 |
# File 'ext/rbczmq/rbczmq_ext.c', line 113
static VALUE rb_czmq_m_log(ZMQ_UNUSED VALUE obj, VALUE msg)
{
Check_Type(msg, T_STRING);
zclock_log(StringValueCStr(msg));
return Qnil;
}
|
.loop ⇒ Object
Higher level loop API.
XXX: Handle cases where context is nil
52 53 54 |
# File 'lib/zmq.rb', line 52 def self.loop @loop ||= ZMQ::Loop.new(context) end |
.Message(*parts) ⇒ Object
Sugaring for creating new ZMQ messages
ZMQ::Message(“one”, “two”, “three”) => ZMQ::Message
28 29 30 31 32 |
# File 'lib/zmq.rb', line 28 def self.Message(*parts) m = ZMQ::Message.new parts.each{|p| m.addstr(p) } m end |
.now ⇒ Fixnum
97 98 99 100 |
# File 'ext/rbczmq/rbczmq_ext.c', line 97
static VALUE rb_czmq_m_now(ZMQ_UNUSED VALUE obj)
{
return INT2NUM(zclock_time());
}
|
.Pollitem(pollable, events = nil) ⇒ Object
Sugaring for creating new poll items
ZMQ::Pollitem(STDIN, ZMQ::POLLIN) => ZMQ::Pollitem
38 39 40 |
# File 'lib/zmq.rb', line 38 def self.Pollitem(pollable, events = nil) ZMQ::Pollitem.new(pollable, events) end |
.proxy(frontend, backend, capture = nil) ⇒ nil
Run the ZMQ proxy method echoing messages received from front end socket to back end and vice versa, copying messages to the capture socket if provided. This method does not return unless the application is interrupted with a signal.
Examples
context = ZMQ::Context.new
frontend = context.socket(ZMQ::ROUTER)
frontend.bind("tcp://127.0.0.1:5555")
backend = context.socket(ZMQ::DEALER)
backend.bind("tcp://127.0.0.1:5556")
ZMQ.proxy(frontend, backend) => -1 when interrupted
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'ext/rbczmq/rbczmq_ext.c', line 201
static VALUE rb_czmq_m_proxy(int argc, VALUE *argv, ZMQ_UNUSED VALUE klass)
{
zmq_sock_wrapper *sock = NULL;
VALUE frontend, backend, capture;
void *sockets[3];
int rc;
rb_scan_args(argc, argv, "21", &frontend, &backend, &capture);
GetZmqSocket(frontend);
sockets[0] = sock->socket;
GetZmqSocket(backend);
sockets[1] = sock->socket;
if (!NIL_P(capture))
{
GetZmqSocket(capture);
sockets[2] = sock->socket;
}
else
{
sockets[2] = NULL;
}
rc = (int)rb_thread_call_without_gvl(rb_czmq_m_proxy_nogvl, (void *)sockets, RUBY_UBF_IO, 0);
// int result = zmq_proxy(frontend_socket, backend_socket, capture_socket);
return INT2NUM(rc);
}
|
.resolver ⇒ Object
66 67 68 69 70 71 |
# File 'lib/zmq.rb', line 66 def self.resolver @resolver ||= begin require 'resolv' Resolv::DNS.new end end |
.select(read = [], write = [], error = [], timeout = nil) ⇒ Object
API sugaring: IO.select compatible API, but for ZMQ sockets.
58 59 60 61 62 63 64 |
# File 'lib/zmq.rb', line 58 def self.select(read = [], write = [], error = [], timeout = nil) poller = ZMQ::Poller.new read.each{|s| poller.register_readable(s) } if read write.each{|s| poller.register_writable(s) } if write ready = poller.poll(timeout) [poller.readables, poller.writables, []] if ready end |
.version ⇒ Array
63 64 65 66 67 68 |
# File 'ext/rbczmq/rbczmq_ext.c', line 63
static VALUE rb_czmq_m_version(ZMQ_UNUSED VALUE obj)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
return rb_ary_new3(3, INT2NUM(major), INT2NUM(minor), INT2NUM(patch));
}
|