Class: ZMQ::Context
- Inherits:
-
Object
- Object
- ZMQ::Context
- Defined in:
- lib/zmq/context.rb,
ext/rbczmq/context.c
Class Method Summary collapse
-
.new(*args) ⇒ Object
Returns a handle to a new ZMQ context.
Instance Method Summary collapse
-
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint.
-
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint.
-
#destroy ⇒ nil
Destroy a ZMQ context and all sockets in it.
-
#initialize(*args) ⇒ Context
constructor
Overload the libczmq handler installed for SIGINT and SIGTERM on context init.
-
#iothreads=(2) ⇒ nil
Raises default I/O threads from 1 - there should be no need to fiddle with this.
-
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them.
-
#socket(type) ⇒ Object
Creates a socket within this ZMQ context.
Constructor Details
#initialize(*args) ⇒ Context
Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :
def initialize(*args)
super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }
end
17 18 19 20 21 |
# File 'lib/zmq/context.rb', line 17 def initialize(*args) super trap(:INT, "DEFAULT") trap(:TERM, "DEFAULT") end |
Class Method Details
.ZMQ::Context.new ⇒ ZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context
Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.
Examples
ZMQ::Context.new => ZMQ::Context
ZMQ::Context.new(1) => ZMQ::Context
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'ext/rbczmq/context.c', line 179
static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
VALUE process_ctx;
VALUE ctx_map;
VALUE io_threads;
zmq_ctx_wrapper *ctx = NULL;
rb_scan_args(argc, argv, "01", &io_threads);
ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
process_ctx = rb_hash_aref(ctx_map, get_pid());
if (!NIL_P(process_ctx)){
Data_Get_Struct(process_ctx, zmq_ctx_wrapper, ctx);
rb_raise(rb_eZmqError, "single ZMQ context per process allowed (previous context created at %s:%d)", ctx->file, ctx->line);
}
context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, rb_czmq_mark_ctx_gc, rb_czmq_free_ctx_gc, ctx);
ctx->ctx = (zctx_t*)rb_thread_call_without_gvl(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(ctx->ctx, ctx);
ctx->flags = 0;
ctx->pid = getpid();
ctx->pidValue = get_pid();
ctx->sockets = zlist_new();
ctx->file = rb_sourcefile();
ctx->line = rb_sourceline();
rb_obj_call_init(context, 0, NULL);
rb_hash_aset(ctx_map, ctx->pidValue, context);
if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
return context;
}
|
Instance Method Details
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint
ctx.bind(:PUB, “tcp://127.0.0.1:5000”)
35 36 37 38 39 |
# File 'lib/zmq/context.rb', line 35 def bind(sock_type, endpoint) s = socket(sock_type) s.bind(endpoint) s end |
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint
ctx.connect(:SUB, “tcp://127.0.0.1:5000”)
45 46 47 48 49 |
# File 'lib/zmq/context.rb', line 45 def connect(sock_type, endpoint) s = socket(sock_type) s.connect(endpoint) s end |
#destroy ⇒ nil
221 222 223 224 225 226 227 |
# File 'ext/rbczmq/context.c', line 221
static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
rb_czmq_free_ctx(ctx);
return Qnil;
}
|
#iothreads=(2) ⇒ nil
241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'ext/rbczmq/context.c', line 241
static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
int iothreads;
errno = 0;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
Check_Type(threads, T_FIXNUM);
iothreads = FIX2INT(threads);
if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
zctx_set_iothreads(ctx->ctx, iothreads);
if (zmq_errno() == EINVAL) ZmqRaiseSysError();
return Qnil;
}
|
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.
Examples
ctx = ZMQ::Context.new
ctx.linger = 100 => nil
269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'ext/rbczmq/context.c', line 269
static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
errno = 0;
int msecs;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
Check_Type(linger, T_FIXNUM);
msecs = FIX2INT(linger);
if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
zctx_set_linger(ctx->ctx, msecs);
return Qnil;
}
|
#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket
Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they’re always spawned off a context. Sockets also track state of the current Ruby thread they’re created in to ensure they always only ever do work on the thread they were spawned on.
Examples
ctx = ZMQ::Context.new
ctx.socket(:PUSH) => ZMQ::Socket
ctx.socket(ZMQ::PUSH) => ZMQ::Socket
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'ext/rbczmq/context.c', line 377
static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
int socket_type;
struct nogvl_socket_args args;
errno = 0;
void *socket;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);
args.ctx = ctx->ctx;
args.type = socket_type;
socket = rb_thread_call_without_gvl(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0);
VALUE socket_object = rb_czmq_socket_alloc(obj, ctx->ctx, socket);
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(socket_object);
zlist_push(ctx->sockets, sock);
return socket_object;
}
|