Module: Iodine
- Defined in:
- lib/iodine.rb,
lib/iodine/cli.rb,
lib/iodine/pubsub.rb,
lib/iodine/version.rb,
lib/iodine/protocol.rb,
lib/iodine/websocket.rb,
lib/iodine/monkeypatch.rb,
lib/rack/handler/iodine.rb,
ext/iodine/iodine.c
Overview
Iodine is both a Rack server and a platform for writing evented network services on Ruby.
Here is a sample Echo server using Iodine:
# define the protocol for our service
class EchoProtocol
@timeout = 10
# this is just one possible callback with a recyclable buffer
def buffer
# write the data we received
write "echo: #{buffer}"
# close the connection when the time comes
close if buffer =~ /^bye[\n\r]/
end
end
# create the service instance
Iodine.listen 3000, EchoProtocol
# start the service
Iodine.start
Please read the README file for an introduction to Iodine and an overview of it’s API.
The API
The main API methods for the top Iodine namesapce are grouped here by subject.
Event Loop / Concurrency
Iodine manages an internal event-loop and reactor pattern. The following API manages Iodine’s behavior.
-
Iodine.threads, Iodine.threads= gets or sets the amount of threads iodine will use in it’s working thread pool.
-
Iodine.processes, Iodine.processes gets or sets the amount of processes iodine will utilize (‘fork`) to handle connections.
-
Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.
Event and Task Scheduling
-
Iodine.run schedules a block of code to run asynchronously.
-
Iodine.run_after, Iodine.run_every schedules a block of code to run (asynchronously) using a timer.
-
Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.
In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation, which allows for a connection bound task(s) to be scheduled to run within the connection’s lock (for example, Websocket#defer and Websocket.each).
Connection Handling
Iodine handles connections using Protocol objects. The following API manages either built-in or custom Protocol objects (classes / instances) in relation to their network sockets.
-
Iodine.attach_fd, Iodine.attach_io allows Iodine to take controll of an IO object (i.e., a TCP/IP Socket, a Unix Socket or a pipe).
-
Iodine.connect creates a new TCP/IP connection using the specified Protocol.
-
Iodine.listen listens to new TCP/IP connections using the specified Protocol.
-
Iodine.listen2http listens to new TCP/IP connections using the buildin HTTP / Websocket Protocol.
-
Iodine.warmup warms up any HTTP Rack applications.
-
Iodine.count counts the number of connections (including HTTP / Websocket connections).
-
Protocol.each runs a code of block for every connection sharing the process (except HTTP / Websocket connections).
-
Websocket.each runs a code of block for every existing websocket sharing the process.
In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation.
Pub/Sub
Iodine offers a native Pub/Sub engine (no database required) that can be easily extended by implementing a Pub/Sub PubSub::Engine.
The following methods offect server side Pub/Sub that allows the server code to react to channel event.
-
Iodine.subscribe, Iodine.unsubscribe manages a process’s subscription to a channel (which is different than a connection’s subscription, such as employed by Websocket).
-
Iodine.publish publishes a message to a Pub/Sub channel. The message will be sent to all subscribers - connections, other processes in the cluster and even other machines (when using the PubSub::RedisEngine).
-
Iodine.default_pubsub=, Iodine.default_pubsub sets or gets the default Pub/Sub PubSub::Engine. i.e., when set to a new PubSub::RedisEngine instance, all Pub/Sub method calls will use the Redis engine (unless explicitly requiring a different engine).
Websocket objects have a seperate Pub/Sub implementation that manages the subscription’s lifetime to match the connection’s lifetime and allows direct client Pub/Sub (forwards the message to the client directly without invoking the Ruby interpreter).
Patching Rack
Although Iodine offers Rack::Utils optimizations using monkey patching, Iodine does NOT monkey patch Rack automatically.
Choosing to monkey patch Rack::Utils could offer significant performance gains for some applications. i.e. (on my machine):
require 'iodine'
require 'rack'
# a String in need of decoding
s = '%E3%83%AB%E3%83%93%E3%82%A4%E3%82%B9%E3%81%A8'
Benchmark.bm do |bm|
# Pre-Patch
bm.report(" Rack.unescape") {1_000_000.times { Rack::Utils.unescape s } }
bm.report(" Rack.rfc2822") {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
bm.report(" Rack.rfc2109") {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
# Perform Patch
Iodine.patch_rack
puts " --- Monkey Patching Rack ---"
# Post Patch
bm.report("Patched.unescape") {1_000_000.times { Rack::Utils.unescape s } }
bm.report(" Patched.rfc2822") {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
bm.report(" Patched.rfc2109") {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
end && nil
Results:
user system total real
Rack.unescape 8.660000 0.010000 8.670000 ( 8.687807)
Rack.rfc2822 3.730000 0.000000 3.730000 ( 3.727732)
Rack.rfc2109 3.020000 0.010000 3.030000 ( 3.031940)
--- Monkey Patching Rack ---
Patched.unescape 0.340000 0.000000 0.340000 ( 0.341506)
Patched.rfc2822 0.740000 0.000000 0.740000 ( 0.737796)
Patched.rfc2109 0.690000 0.010000 0.700000 ( 0.700155)
At the moment, the extent of the monkey patching offered is very limited. As new optimizations are added, the policy regarding monkey patching (benifit vs. risks) might be re-evaluated.
Defined Under Namespace
Modules: Base, Protocol, PubSub, Rack, Websocket
Constant Summary collapse
- VERSION =
'0.4.15'.freeze
Class Method Summary collapse
-
.attach_fd(rbfd, handler) ⇒ Object
Attaches an existing file descriptor (‘fd`) (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
-
.attach_io(io, handler) ⇒ Object
Attaches an existing IO object (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
-
.connect(address, port, handler) ⇒ Object
Connects (as a TCP/IP client) to a remote TCP/IP server.
-
.count ⇒ Object
Returns the number of total connections managed by Iodine.
-
.default_pubsub ⇒ Object
Returns the default Pub/Sub engine (if any).
-
.default_pubsub=(en) ⇒ Object
Sets the default Pub/Sub engine to be used.
-
.listen(port, handler) ⇒ Object
Sets up a listening socket.
-
.listen2http(opt) ⇒ Object
Listens to incoming HTTP connections and handles incoming requests using the Rack specification.
-
.on_idle ⇒ Object
Schedules a single occuring event for the next idle cycle.
- .patch_rack ⇒ Object
-
.processes ⇒ Object
Get/Set the number of worker processes.
-
.processes=(count) ⇒ Object
Get/Set the number of worker processes.
-
.publish(args) ⇒ Object
Publishes a message to a channel.
- .run ⇒ Object
-
.run_after(milliseconds) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
-
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed.
-
.start ⇒ Object
Starts the Iodine event loop.
-
.subscribe(args) ⇒ Object
Subscribes the process to a channel belonging to a specific pub/sub service (using an PubSub::Engine to connect Iodine to the service).
-
.threads ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool).
-
.threads=(count) ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool).
-
.unsubscribe(sub_id) ⇒ Object
Cancels the subscription matching ‘sub_id`.
-
.warmup(app) ⇒ Object
Runs the warmup sequence.
Class Method Details
.attach_fd(rbfd, handler) ⇒ Object
Attaches an existing file descriptor (‘fd`) (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
i.e.
Iodine.attach my_io_obj.to_i, MyProtocolClass.new
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 |
# File 'ext/iodine/iodine_protocol.c', line 541
static VALUE iodine_attach_fd(VALUE self, VALUE rbfd, VALUE handler) {
Check_Type(rbfd, T_FIXNUM);
if (handler == Qnil || handler == Qfalse)
return Qfalse;
intptr_t uuid = FIX2INT(rbfd);
if (!uuid || uuid == -1)
return Qfalse;
/* make sure the uuid is connected to the sock library */
if (sock_fd2uuid(uuid) == -1)
sock_open(uuid);
if (TYPE(handler) == T_CLASS) {
// include the Protocol module, preventing coder errors
rb_include_module(handler, IodineProtocol);
handler = RubyCaller.call(handler, iodine_new_func_id);
} else {
// include the Protocol module in the object's class
VALUE p_class = rb_obj_class(handler);
// include the Protocol module, preventing coder errors
rb_include_module(p_class, IodineProtocol);
}
Registry.add(handler);
on_open_dyn_protocol_instance(uuid, (void *)handler);
return self;
}
|
.attach_io(io, handler) ⇒ Object
Attaches an existing IO object (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.
i.e.
Iodine.attach my_io_obj, MyProtocolClass.new
574 575 576 577 |
# File 'ext/iodine/iodine_protocol.c', line 574
static VALUE iodine_attach_io(VALUE self, VALUE io, VALUE handler) {
return iodine_attach_fd(self, RubyCaller.call(io, iodine_to_i_func_id),
handler);
}
|
.connect(address, port, handler) ⇒ Object
Connects (as a TCP/IP client) to a remote TCP/IP server.
i.e.
Iodine.connect "example.com", 5000, MyProtocolClass.new
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 |
# File 'ext/iodine/iodine_protocol.c', line 502
static VALUE iodine_connect(VALUE self, VALUE address, VALUE port,
VALUE handler) {
if (TYPE(handler) == T_CLASS || TYPE(handler) == T_MODULE) {
// include the Protocol module, preventing coder errors
rb_include_module(handler, IodineProtocol);
handler = RubyCaller.call(handler, iodine_new_func_id);
} else {
// include the Protocol module in the object's class
VALUE p_class = rb_obj_class(handler);
// include the Protocol module, preventing coder errors
rb_include_module(p_class, IodineProtocol);
}
if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING)
rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String.");
Registry.add(handler);
if (TYPE(port) == T_FIXNUM)
port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
// connect
intptr_t uuid = facil_connect(.port = StringValueCStr(port),
.address = StringValueCStr(address),
.udata = (void *)handler,
.on_connect = on_open_dyn_protocol_instance,
.on_fail = remove_from_registry);
if (uuid == -1)
return Qnil;
iodine_set_fd(handler, uuid);
return handler;
(void)self;
}
|
.count ⇒ Object
Returns the number of total connections managed by Iodine.
59 60 61 62 63 |
# File 'ext/iodine/iodine.c', line 59
static VALUE iodine_count(VALUE self) {
size_t count = facil_count(NULL);
return ULL2NUM(count);
(void)self;
}
|
.default_pubsub ⇒ Object
Returns the default Pub/Sub engine (if any).
See PubSub and Iodine::PubSub::Engine for more details.
442 443 444 |
# File 'ext/iodine/iodine_pubsub.c', line 442 static VALUE ips_get_default(VALUE self) { return rb_ivar_get(self, default_pubsubid); } |
.default_pubsub=(en) ⇒ Object
Sets the default Pub/Sub engine to be used.
See PubSub and Iodine::PubSub::Engine for more details.
425 426 427 428 429 430 431 432 433 434 435 |
# File 'ext/iodine/iodine_pubsub.c', line 425
static VALUE ips_set_default(VALUE self, VALUE en) {
iodine_engine_s *e;
Data_Get_Struct(en, iodine_engine_s, e);
if (!e)
rb_raise(rb_eArgError, "deafult engine must be an Iodine::PubSub::Engine.");
if (!e->p)
rb_raise(rb_eArgError, "This Iodine::PubSub::Engine is broken.");
rb_ivar_set(self, default_pubsubid, en);
PUBSUB_DEFAULT_ENGINE = e->p;
return en;
}
|
.listen(port, handler) ⇒ Object
Sets up a listening socket. Conncetions received at the assigned port will be handled by the assigned handler.
Multiple services (listening sockets) can be registered before starting the Iodine event loop.
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'ext/iodine/iodine_protocol.c', line 436
static VALUE iodine_listen(VALUE self, VALUE port, VALUE handler) {
// validate that the handler is a class and include the Iodine::Protocol
if (TYPE(handler) == T_CLASS) {
// include the Protocol module
// // do we neet to check?
// if (rb_mod_include_p(protocol, rDynProtocol) == Qfalse)
rb_include_module(handler, IodineProtocol);
rb_extend_object(handler, IodineProtocol);
} else {
rb_raise(rb_eTypeError, "The connection handler MUST be of type Class.");
return Qnil;
}
if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING)
rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String.");
if (TYPE(port) == T_FIXNUM)
port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
rb_ivar_set(self, rb_intern("_port"), port);
// listen
if (facil_listen(.port = StringValueCStr(port), .udata = (void *)handler,
.on_open = on_open_dyn_protocol) == -1)
return Qnil;
return self;
}
|
.listen2http(opt) ⇒ Object
Listens to incoming HTTP connections and handles incoming requests using the Rack specification.
This is delegated to a lower level C HTTP and Websocket implementation, no Ruby object will be crated except the ‘env` object required by the Rack specifications.
Accepts a single Hash argument with the following properties:
- app
-
the Rack application that handles incoming requests. Default: ‘nil`.
- port
-
the port to listen to. Default: 3000.
- address
-
the address to bind to. Default: binds to all possible addresses.
- log
-
enable response logging (Hijacked sockets aren’t logged). Default: off.
- public
-
The root public folder for static file service. Default: none.
- timeout
-
Timeout for inactive HTTP/1.x connections. Defaults: 5 seconds.
- max_body
-
The maximum body size for incoming HTTP messages. Default: ~50Mib.
- max_msg
-
The maximum Websocket message size allowed. Default: ~250Kib.
- ping
-
The Websocket ‘ping` interval. Default: 40 sec.
Either the ‘app` or the `public` properties are required. If niether exists, the function will fail. If both exist, Iodine will serve static files as well as dynamic requests.
When using the static file server, it’s possible to serve ‘gzip` versions of the static files by saving a compressed version with the `gz` extension (i.e. `styles.css.gz`).
‘gzip` will only be served to clients tat support the `gzip` transfer encoding.
Once HTTP/2 is supported (planned, but probably very far away), HTTP/2 timeouts will be dynamically managed by Iodine. The ‘timeout` option is only relevant to HTTP/1.x connections.
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 |
# File 'ext/iodine/iodine_http.c', line 666
VALUE iodine_http_listen(VALUE self, VALUE opt) {
static int called_once = 0;
uint8_t log_http = 0;
size_t ping = 0;
size_t max_body = 0;
size_t max_msg = 0;
Check_Type(opt, T_HASH);
VALUE app = rb_hash_aref(opt, ID2SYM(rb_intern("app")));
VALUE www = rb_hash_aref(opt, ID2SYM(rb_intern("public")));
VALUE port = rb_hash_aref(opt, ID2SYM(rb_intern("port")));
VALUE address = rb_hash_aref(opt, ID2SYM(rb_intern("address")));
VALUE tout = rb_hash_aref(opt, ID2SYM(rb_intern("timeout")));
VALUE tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_msg")));
if (tmp != Qnil && tmp != Qfalse) {
Check_Type(tmp, T_FIXNUM);
max_msg = FIX2ULONG(tmp);
}
tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_body")));
if (tmp != Qnil && tmp != Qfalse) {
Check_Type(tmp, T_FIXNUM);
max_body = FIX2ULONG(tmp);
}
tmp = rb_hash_aref(opt, ID2SYM(rb_intern("ping")));
if (tmp != Qnil && tmp != Qfalse) {
Check_Type(tmp, T_FIXNUM);
ping = FIX2ULONG(tmp);
}
if (ping > 255) {
fprintf(stderr, "Iodine Warning: Websocket timeout value "
"is over 255 and is silently ignored.\n");
ping = 0;
}
tmp = rb_hash_aref(opt, ID2SYM(rb_intern("log")));
if (tmp != Qnil && tmp != Qfalse)
log_http = 1;
if ((app == Qnil || app == Qfalse) && (www == Qnil || www == Qfalse)) {
fprintf(stderr, "Iodine Warning: HTTP without application or public folder "
"(is silently ignored).\n");
return Qfalse;
}
if ((www != Qnil && www != Qfalse)) {
Check_Type(www, T_STRING);
Registry.add(www);
} else
www = 0;
if ((address != Qnil && address != Qfalse))
Check_Type(address, T_STRING);
else
address = 0;
if ((tout != Qnil && tout != Qfalse)) {
Check_Type(tout, T_FIXNUM);
tout = FIX2ULONG(tout);
} else
tout = 0;
if (tout > 255) {
fprintf(stderr, "Iodine Warning: HTTP timeout value "
"is over 255 and is silently ignored.\n");
tout = 0;
}
if (port != Qnil && port != Qfalse) {
if (!RB_TYPE_P(port, T_STRING) && !RB_TYPE_P(port, T_FIXNUM))
rb_raise(rb_eTypeError,
"The `port` property MUST be either a String or a Number");
if (RB_TYPE_P(port, T_FIXNUM))
port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
} else
port = rb_str_new("3000", 4);
Registry.add(port);
if ((app != Qnil && app != Qfalse))
Registry.add(app);
else
app = 0;
iodine_http_settings_s *set = malloc(sizeof(*set));
*set = (iodine_http_settings_s){.app = app, .ping = ping, .max_msg = max_msg};
init_env_template(set, (www ? 1 : 0));
if (http_listen(StringValueCStr(port),
(address ? StringValueCStr(address) : NULL),
.on_request = on_rack_request, .udata = set,
.timeout = (tout ? FIX2INT(tout) : tout),
.on_finish = free_iodine_http, .log_static = log_http,
.max_body_size = max_body,
.public_folder = (www ? StringValueCStr(www) : NULL))) {
fprintf(stderr,
"ERROR: Failed to initialize a listening HTTP socket for port %s\n",
port ? StringValueCStr(port) : "3000");
return Qfalse;
}
if ((app == Qnil || app == Qfalse)) {
fprintf(stderr,
"* Iodine: (no app) the HTTP service on port %s will only serve "
"static files.\n",
(port ? StringValueCStr(port) : "3000"));
}
if (called_once)
defer(iodine_print_http_msg2, (www ? (void *)www : NULL), (void *)port);
else {
called_once = 1;
defer(iodine_print_http_msg1, (www ? (void *)www : NULL), (void *)port);
}
return Qtrue;
(void)self;
}
|
.on_idle ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 |
# File 'ext/iodine/iodine.c', line 175
VALUE iodine_sched_on_idle(VALUE self) {
rb_need_block();
iodine_idle_block_s *b = malloc(sizeof(*b));
b->block = rb_block_proc();
Registry.add(b->block);
spn_lock(&iodine_on_idle_lock);
fio_list_push(iodine_idle_block_s, node, iodine_on_idle_list, b);
spn_unlock(&iodine_on_idle_lock);
return b->block;
(void)self;
}
|
.patch_rack ⇒ Object
173 174 175 176 177 178 179 180 |
# File 'lib/iodine.rb', line 173 def self.patch_rack ::Rack::Utils.class_eval do Iodine::Base::MonkeyPatch::RackUtils.methods(false).each do |m| ::Rack::Utils.define_singleton_method(m, Iodine::Base::MonkeyPatch::RackUtils.instance_method(m) ) end end end |
.processes ⇒ Object
Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.
140 141 142 |
# File 'lib/iodine.rb', line 140 def self.processes @processes end |
.processes=(count) ⇒ Object
Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.
145 146 147 |
# File 'lib/iodine.rb', line 145 def self.processes=(count) @processes = count.to_i end |
.publish(args) ⇒ Object
Publishes a message to a channel.
Accepts a single Hash argument with the following possible options:
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
engine is used.
- :channel
-
Required (unless :pattern). The channel to publish to.
- :pattern
-
An alternative to the required :channel, publishes to a pattern.
This is NOT supported by Redis and it’s limited to the local process cluster.
- :message
-
REQUIRED. The message to be published.
:
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 |
# File 'ext/iodine/iodine_pubsub.c', line 551
static VALUE iodine_publish(VALUE self, VALUE args) {
Check_Type(args, T_HASH);
uint8_t use_pattern = 0;
VALUE rb_ch = rb_hash_aref(args, channel_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
use_pattern = 1;
rb_ch = rb_hash_aref(args, pattern_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
}
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
VALUE rb_msg = rb_hash_aref(args, message_var_id);
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required for the :publish method.");
}
Check_Type(rb_msg, T_STRING);
pubsub_engine_s *engine =
iodine_engine_ruby2facil(rb_hash_aref(args, engine_varid));
intptr_t ret =
pubsub_publish(.engine = engine, .channel.name = (RSTRING_PTR(rb_ch)),
.channel.len = (RSTRING_LEN(rb_ch)),
.msg.data = (RSTRING_PTR(rb_msg)),
.msg.len = (RSTRING_LEN(rb_msg)),
.use_pattern = use_pattern);
if (!ret)
return Qfalse;
return Qtrue;
(void)self;
}
|
.run ⇒ Object
138 139 140 141 142 143 144 145 146 147 |
# File 'ext/iodine/iodine.c', line 138
static VALUE iodine_run(VALUE self) {
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
defer(iodine_perform_deferred, (void *)block, NULL);
return block;
(void)self;
}
|
.run_after(milliseconds) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'ext/iodine/iodine.c', line 73
static VALUE iodine_run_after(VALUE self, VALUE milliseconds) {
(void)(self);
if (TYPE(milliseconds) != T_FIXNUM) {
rb_raise(rb_eTypeError, "milliseconds must be a number");
return Qnil;
}
size_t milli = FIX2UINT(milliseconds);
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
if (facil_run_every(milli, 1, iodine_run_task, (void *)block,
(void (*)(void *))Registry.remove) == -1) {
perror("ERROR: Iodine couldn't initialize timer");
return Qnil;
}
return block;
}
|
.run_every(*args) ⇒ Object
Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).
Accepts:
- milliseconds
-
the number of milliseconds between event repetitions.
- repetitions
-
the number of event repetitions. Defaults to 0 (never ending).
- block
-
(required) a block is required, as otherwise there is nothing to
perform.
The event will repeat itself until the number of repetitions had been delpeted.
Always returns a copy of the block object.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'ext/iodine/iodine.c', line 110
static VALUE iodine_run_every(int argc, VALUE *argv, VALUE self) {
(void)(self);
VALUE milliseconds, repetitions, block;
rb_scan_args(argc, argv, "11&", &milliseconds, &repetitions, &block);
if (TYPE(milliseconds) != T_FIXNUM) {
rb_raise(rb_eTypeError, "milliseconds must be a number.");
return Qnil;
}
if (repetitions != Qnil && TYPE(repetitions) != T_FIXNUM) {
rb_raise(rb_eTypeError, "repetitions must be a number or `nil`.");
return Qnil;
}
size_t milli = FIX2UINT(milliseconds);
size_t repeat = (repetitions == Qnil) ? 0 : FIX2UINT(repetitions);
// requires a block to be passed
rb_need_block();
Registry.add(block);
if (facil_run_every(milli, repeat, iodine_run_task, (void *)block,
(void (*)(void *))Registry.remove) == -1) {
perror("ERROR: Iodine couldn't initialize timer");
return Qnil;
}
return block;
}
|
.start ⇒ Object
Starts the Iodine event loop. This will hang the thread until an interrupt (‘^C`) signal is received.
Returns the Iodine module.
310 311 312 313 314 315 316 317 318 |
# File 'ext/iodine/iodine.c', line 310
static VALUE iodine_start(VALUE self) {
/* for the special Iodine::Rack object and backwards compatibility */
if (iodine_review_rack_app()) {
fprintf(stderr, "ERROR: (iodine) cann't start Iodine::Rack.\n");
return Qnil;
}
rb_thread_call_without_gvl2(srv_start_no_gvl, (void *)self, NULL, NULL);
return self;
}
|
.subscribe(args) ⇒ Object
Subscribes the process to a channel belonging to a specific pub/sub service (using an Iodine::PubSub::Engine to connect Iodine to the service).
The function accepts a single argument (a Hash) and a required block.
Accepts a single Hash argument with the following possible options:
- :engine
-
If provided, the engine to use for pub/sub. Otherwise the default
-
engine is used.
- :channel
-
Required (unless :pattern). The channel to subscribe to.
- :pattern
-
An alternative to the required :channel, subscribes to a pattern.
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 |
# File 'ext/iodine/iodine_pubsub.c', line 488
static VALUE iodine_subscribe(VALUE self, VALUE args) {
Check_Type(args, T_HASH);
rb_need_block();
uint8_t use_pattern = 0;
VALUE rb_ch = rb_hash_aref(args, channel_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
use_pattern = 1;
rb_ch = rb_hash_aref(args, pattern_var_id);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "a channel is required for pub/sub methods.");
}
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
VALUE block = rb_block_proc();
pubsub_engine_s *engine =
iodine_engine_ruby2facil(rb_hash_aref(args, engine_varid));
uintptr_t subid = (uintptr_t)
pubsub_subscribe(.channel.name = RSTRING_PTR(rb_ch),
.channel.len = RSTRING_LEN(rb_ch), .engine = engine,
.use_pattern = use_pattern,
.on_message = (block ? on_pubsub_notificationin : NULL),
.on_unsubscribe = (block ? iodine_on_unsubscribe : NULL),
.udata1 = (void *)block);
if (!subid)
return Qnil;
return ULL2NUM(subid);
(void)self;
}
|
.threads ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).
130 131 132 |
# File 'lib/iodine.rb', line 130 def self.threads @threads end |
.threads=(count) ⇒ Object
Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).
135 136 137 |
# File 'lib/iodine.rb', line 135 def self.threads=(count) @threads = count.to_i end |
.unsubscribe(sub_id) ⇒ Object
Cancels the subscription matching ‘sub_id`.
526 527 528 529 530 531 532 533 |
# File 'ext/iodine/iodine_pubsub.c', line 526
static VALUE iodine_unsubscribe(VALUE self, VALUE sub_id) {
if (sub_id == Qnil || sub_id == Qfalse)
return Qnil;
Check_Type(sub_id, T_FIXNUM);
pubsub_unsubscribe((pubsub_sub_pt)NUM2LONG(sub_id));
return Qnil;
(void)self;
}
|
.warmup(app) ⇒ Object
Runs the warmup sequence. warmup attempts to get every “autoloaded” (lazy loaded) file to load now instead of waiting for “first access”. This allows multi-threaded safety and better memory utilization during forking.
However, ‘warmup` might cause undefined behavior and should be avoided when using gems that initiate network / database connections or gems that spawn threads (i.e., ActiveRecord / ActiveCable).
Use warmup when either processes or threads are set to more then 1 and gems don’t spawn threads or initialize network connections.
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/iodine.rb', line 155 def self.warmup app # load anything marked with `autoload`, since autoload isn't thread safe nor fork friendly. Iodine.run do Module.constants.each do |n| begin Object.const_get(n) rescue Exception => _e end end ::Rack::Builder.new(app) do |r| r.warmup do |a| client = ::Rack::MockRequest.new(a) client.get('/') end end end end |