Class: Majordomo::Worker
- Inherits:
-
Object
- Object
- Majordomo::Worker
- Defined in:
- ext/majordomo/worker.c
Class Method Summary collapse
-
.new(*args) ⇒ Object
Creates a new Majordomo::Worker instance.
Instance Method Summary collapse
-
#broker ⇒ String
Returns the URI of the broker this worker is connected to.
-
#close ⇒ nil
Close the worker connection to the broker.
-
#heartbeat ⇒ Fixnum
Returns the worker heartbeat delay (in msecs).
-
#heartbeat=(val) ⇒ nil
Sets the worker heartbeat delay (in msecs).
-
#reconnect ⇒ Fixnum
Returns the worker reconnect delay (in msecs).
-
#reconnect=(100) ⇒ nil
Sets the worker reconnect delay (in msecs).
-
#recv ⇒ String?
Receives a client request form the broker.
-
#send(message, reply_to) ⇒ Boolean
Send a reply to a client request.
-
#service ⇒ String
Returns the service identifier this worker implements.
Class Method Details
.Majordomo::Worker.new("tcp: //0.0.0.0:5555", "service") ⇒ Majordomo::Worker .Majordomo::Worker.new("tcp: //0.0.0.0:5555", "service", true) ⇒ Majordomo::Worker
Creates a new Majordomo::Worker instance. A broker URI and service identifier is required and an optional verbose flag can be passed to the initializer.
Examples
wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service") => Majordomo::Worker
wk.broker => "tcp://0.0.0.0:5555"
wk.heartbeat => 2500
wk.recv => "request"
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'ext/majordomo/worker.c', line 77
static VALUE rb_majordomo_worker_s_new(int argc, VALUE *argv, VALUE klass)
{
rb_majordomo_worker_t *worker = NULL;
struct nogvl_md_worker_new_args args;
VALUE obj, broker, service, verbose;
rb_scan_args(argc, argv, "21", &broker, &service, &verbose);
if (verbose == Qnil)
verbose = Qfalse;
Check_Type(broker, T_STRING);
Check_Type(service, T_STRING);
obj = Data_Make_Struct(klass, rb_majordomo_worker_t, rb_mark_majordomo_worker, rb_free_majordomo_worker, worker);
args.broker = RSTRING_PTR(broker);
args.service = RSTRING_PTR(service);
args.verbose = (verbose == Qtrue ? 1 : 0);
worker->worker = (mdp_worker_t *)rb_thread_blocking_region(rb_nogvl_mdp_worker_new, (void *)&args, RUBY_UBF_IO, 0);
worker->broker = rb_str_new4(broker);
worker->service = rb_str_new4(service);
worker->heartbeat = INT2NUM(MAJORDOMO_WORKER_HEARTBEAT);
worker->reconnect = INT2NUM(MAJORDOMO_WORKER_RECONNECT);
#ifndef HAVE_RB_THREAD_BLOCKING_REGION
worker->recv_buffer = zlist_new();
#endif
rb_obj_call_init(obj, 0, NULL);
return obj;
}
|
Instance Method Details
#broker ⇒ String
115 116 117 118 |
# File 'ext/majordomo/worker.c', line 115
static VALUE rb_majordomo_worker_broker(VALUE obj){
GetMajordomoWorker(obj);
return worker->broker;
}
|
#close ⇒ nil
344 345 346 347 348 349 350 |
# File 'ext/majordomo/worker.c', line 344
static VALUE rb_majordomo_worker_close(VALUE obj){
VALUE ret;
GetMajordomoWorker(obj);
ret = rb_thread_blocking_region(rb_nogvl_mdp_worker_close, (void *)worker->worker, RUBY_UBF_IO, 0);
worker->worker = NULL;
return ret;
}
|
#heartbeat ⇒ Fixnum
147 148 149 150 |
# File 'ext/majordomo/worker.c', line 147
static VALUE rb_majordomo_worker_heartbeat(VALUE obj){
GetMajordomoWorker(obj);
return worker->heartbeat;
}
|
#heartbeat=(val) ⇒ nil
180 181 182 183 184 185 186 |
# File 'ext/majordomo/worker.c', line 180
static VALUE rb_majordomo_worker_heartbeat_equals(VALUE obj, VALUE heartbeat){
GetMajordomoWorker(obj);
Check_Type(heartbeat, T_FIXNUM);
mdp_worker_set_heartbeat(worker->worker, FIX2INT(heartbeat));
worker->heartbeat = heartbeat;
return Qnil;
}
|
#reconnect ⇒ Fixnum
163 164 165 166 |
# File 'ext/majordomo/worker.c', line 163
static VALUE rb_majordomo_worker_reconnect(VALUE obj){
GetMajordomoWorker(obj);
return worker->reconnect;
}
|
#reconnect=(100) ⇒ nil
200 201 202 203 204 205 206 |
# File 'ext/majordomo/worker.c', line 200
static VALUE rb_majordomo_worker_reconnect_equals(VALUE obj, VALUE reconnect){
GetMajordomoWorker(obj);
Check_Type(reconnect, T_FIXNUM);
mdp_worker_set_reconnect(worker->worker, FIX2INT(reconnect));
worker->reconnect = reconnect;
return Qnil;
}
|
#recv ⇒ String?
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'ext/majordomo/worker.c', line 252
static VALUE rb_majordomo_worker_recv(VALUE obj){
VALUE req, reply;
struct nogvl_md_worker_recv_args args;
GetMajordomoWorker(obj);
args.worker = worker;
args.reply = NULL;
zmsg_t *request = (zmsg_t *)rb_thread_blocking_region(rb_nogvl_mdp_worker_recv, (void *)&args, RUBY_UBF_IO, 0);
if (!request)
return Qnil;
req = MajordomoEncode(rb_str_new2(zmsg_popstr(request)));
zmsg_destroy(&request);
reply = rb_str_new(zframe_data(args.reply), zframe_size(args.reply));
zframe_destroy(&args.reply);
return rb_ary_new3(2, req, reply);
}
|
#send(message, reply_to) ⇒ Boolean
Send a reply to a client request. Returns true if the send was succfessful.
Examples
wk = Majordomo::Worker.new("tcp://0.0.0.0:5555", "service") => Majordomo::Worker
req, reply_to = wk.recv => ["request", "reply"]
wk.send("reply", reply_to) => true
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'ext/majordomo/worker.c', line 312
static VALUE rb_majordomo_worker_send(VALUE obj, VALUE message, VALUE reply_to){
struct nogvl_md_worker_send_args args;
GetMajordomoWorker(obj);
args.worker = worker->worker;
args.progress = zmsg_new();
if (!args.progress)
return Qfalse;
if (zmsg_pushmem(args.progress, RSTRING_PTR(message), RSTRING_LEN(message)) == -1) {
zmsg_destroy(&args.progress);
return Qfalse;
}
args.reply_to = zframe_new(RSTRING_PTR(reply_to), RSTRING_LEN(reply_to));
if (!args.reply_to) {
zmsg_destroy(&args.progress);
return Qfalse;
}
rb_thread_blocking_region(rb_nogvl_mdp_worker_send, (void *)&args, RUBY_UBF_IO, 0);
zframe_destroy(&args.reply_to);
return Qtrue;
}
|
#service ⇒ String
131 132 133 134 |
# File 'ext/majordomo/worker.c', line 131
static VALUE rb_majordomo_worker_service(VALUE obj){
GetMajordomoWorker(obj);
return worker->service;
}
|