Module: Iodine

Defined in:
lib/iodine.rb,
lib/iodine/cli.rb,
lib/iodine/pubsub.rb,
lib/iodine/version.rb,
lib/iodine/protocol.rb,
lib/iodine/websocket.rb,
lib/iodine/monkeypatch.rb,
lib/rack/handler/iodine.rb,
ext/iodine/iodine.c

Overview

Iodine is both a Rack server and a platform for writing evented network services on Ruby.

Here is a sample Echo server using Iodine:

# define the protocol for our service
class EchoProtocol
  @timeout = 10
  # this is just one possible callback with a recyclable buffer
  def on_message buffer
    # write the data we received
    write "echo: #{buffer}"
    # close the connection when the time comes
    close if buffer =~ /^bye[\n\r]/
  end
end
# create the service instance
Iodine.listen 3000, EchoProtocol
# start the service
Iodine.start

Please read the README file for an introduction to Iodine and an overview of it’s API.

The API

The main API methods for the top Iodine namesapce are grouped here by subject.

Event Loop / Concurrency

Iodine manages an internal event-loop and reactor pattern. The following API manages Iodine’s behavior.

  • Iodine.threads, Iodine.threads= gets or sets the amount of threads iodine will use in it’s working thread pool.

  • Iodine.processes, Iodine.processes gets or sets the amount of processes iodine will utilize (‘fork`) to handle connections.

  • Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.

Event and Task Scheduling

  • Iodine.run schedules a block of code to run asynchronously.

  • Iodine.run_after, Iodine.run_every schedules a block of code to run (asynchronously) using a timer.

  • Iodine.start starts iodine’s event loop and reactor pattern. At this point, it’s impossible to change the number of threads or processes used.

In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation, which allows for a connection bound task(s) to be scheduled to run within the connection’s lock (for example, Websocket#defer and Websocket.each).

Connection Handling

Iodine handles connections using Protocol objects. The following API manages either built-in or custom Protocol objects (classes / instances) in relation to their network sockets.

  • Iodine.attach_fd, Iodine.attach_io allows Iodine to take controll of an IO object (i.e., a TCP/IP Socket, a Unix Socket or a pipe).

  • Iodine.connect creates a new TCP/IP connection using the specified Protocol.

  • Iodine.listen listens to new TCP/IP connections using the specified Protocol.

  • Iodine.listen2http listens to new TCP/IP connections using the buildin HTTP / Websocket Protocol.

  • Iodine.warmup warms up any HTTP Rack applications.

  • Iodine.count counts the number of connections (including HTTP / Websocket connections).

  • Protocol.each runs a code of block for every connection sharing the process (except HTTP / Websocket connections).

  • Websocket.each runs a code of block for every existing websocket sharing the process.

In addition to the top level API, there’s also the connection class and connection instance API, as specified in the Protocol and Websocket documentation.

Pub/Sub

Iodine offers a native Pub/Sub engine (no database required) that can be easily extended by implementing a Pub/Sub PubSub::Engine.

The following methods offect server side Pub/Sub that allows the server code to react to channel event.

Websocket objects have a seperate Pub/Sub implementation that manages the subscription’s lifetime to match the connection’s lifetime and allows direct client Pub/Sub (forwards the message to the client directly without invoking the Ruby interpreter).

Patching Rack

Although Iodine offers Rack::Utils optimizations using monkey patching, Iodine does NOT monkey patch Rack automatically.

Choosing to monkey patch Rack::Utils could offer significant performance gains for some applications. i.e. (on my machine):

require 'iodine'
require 'rack'
# a String in need of decoding
s = '%E3%83%AB%E3%83%93%E3%82%A4%E3%82%B9%E3%81%A8'
Benchmark.bm do |bm|
  # Pre-Patch
  bm.report("   Rack.unescape")    {1_000_000.times { Rack::Utils.unescape s } }
  bm.report("    Rack.rfc2822")    {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
  bm.report("    Rack.rfc2109")    {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
  # Perform Patch
  Iodine.patch_rack
  puts "            --- Monkey Patching Rack ---"
  # Post Patch
  bm.report("Patched.unescape")    {1_000_000.times { Rack::Utils.unescape s } }
  bm.report(" Patched.rfc2822")    {1_000_000.times { Rack::Utils.rfc2822(Time.now) } }
  bm.report(" Patched.rfc2109")    {1_000_000.times { Rack::Utils.rfc2109(Time.now) } }
end && nil

Results:

user     system      total        real
Rack.unescape  8.660000   0.010000   8.670000 (  8.687807)
Rack.rfc2822  3.730000   0.000000   3.730000 (  3.727732)
Rack.rfc2109  3.020000   0.010000   3.030000 (  3.031940)
             --- Monkey Patching Rack ---
Patched.unescape  0.340000   0.000000   0.340000 (  0.341506)
Patched.rfc2822  0.740000   0.000000   0.740000 (  0.737796)
Patched.rfc2109  0.690000   0.010000   0.700000 (  0.700155)

At the moment, the extent of the monkey patching offered is very limited. As new optimizations are added, the policy regarding monkey patching (benifit vs. risks) might be re-evaluated.

Defined Under Namespace

Modules: Base, Protocol, PubSub, Rack, Websocket

Constant Summary collapse

VERSION =
'0.4.15'.freeze

Class Method Summary collapse

Class Method Details

.attach_fd(rbfd, handler) ⇒ Object

Attaches an existing file descriptor (‘fd`) (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.

i.e.

Iodine.attach my_io_obj.to_i, MyProtocolClass.new


541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
# File 'ext/iodine/iodine_protocol.c', line 541

static VALUE iodine_attach_fd(VALUE self, VALUE rbfd, VALUE handler) {
  Check_Type(rbfd, T_FIXNUM);
  if (handler == Qnil || handler == Qfalse)
    return Qfalse;
  intptr_t uuid = FIX2INT(rbfd);
  if (!uuid || uuid == -1)
    return Qfalse;
  /* make sure the uuid is connected to the sock library */
  if (sock_fd2uuid(uuid) == -1)
    sock_open(uuid);
  if (TYPE(handler) == T_CLASS) {
    // include the Protocol module, preventing coder errors
    rb_include_module(handler, IodineProtocol);
    handler = RubyCaller.call(handler, iodine_new_func_id);
  } else {
    // include the Protocol module in the object's class
    VALUE p_class = rb_obj_class(handler);
    // include the Protocol module, preventing coder errors
    rb_include_module(p_class, IodineProtocol);
  }
  Registry.add(handler);
  on_open_dyn_protocol_instance(uuid, (void *)handler);
  return self;
}

.attach_io(io, handler) ⇒ Object

Attaches an existing IO object (i.e., a pipe, a unix socket, etc’) as if it were a regular connection.

i.e.

Iodine.attach my_io_obj, MyProtocolClass.new


574
575
576
577
# File 'ext/iodine/iodine_protocol.c', line 574

static VALUE iodine_attach_io(VALUE self, VALUE io, VALUE handler) {
  return iodine_attach_fd(self, RubyCaller.call(io, iodine_to_i_func_id),
                          handler);
}

.connect(address, port, handler) ⇒ Object

Connects (as a TCP/IP client) to a remote TCP/IP server.

i.e.

Iodine.connect "example.com", 5000, MyProtocolClass.new


502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'ext/iodine/iodine_protocol.c', line 502

static VALUE iodine_connect(VALUE self, VALUE address, VALUE port,
                            VALUE handler) {
  if (TYPE(handler) == T_CLASS || TYPE(handler) == T_MODULE) {
    // include the Protocol module, preventing coder errors
    rb_include_module(handler, IodineProtocol);
    handler = RubyCaller.call(handler, iodine_new_func_id);
  } else {
    // include the Protocol module in the object's class
    VALUE p_class = rb_obj_class(handler);
    // include the Protocol module, preventing coder errors
    rb_include_module(p_class, IodineProtocol);
  }
  if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING)
    rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String.");
  Registry.add(handler);
  if (TYPE(port) == T_FIXNUM)
    port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
  // connect
  intptr_t uuid = facil_connect(.port = StringValueCStr(port),
                                .address = StringValueCStr(address),
                                .udata = (void *)handler,
                                .on_connect = on_open_dyn_protocol_instance,
                                .on_fail = remove_from_registry);
  if (uuid == -1)
    return Qnil;
  iodine_set_fd(handler, uuid);
  return handler;
  (void)self;
}

.countObject

Returns the number of total connections managed by Iodine.



59
60
61
62
63
# File 'ext/iodine/iodine.c', line 59

static VALUE iodine_count(VALUE self) {
  size_t count = facil_count(NULL);
  return ULL2NUM(count);
  (void)self;
}

.default_pubsubObject

Returns the default Pub/Sub engine (if any).

See PubSub and Iodine::PubSub::Engine for more details.



442
443
444
# File 'ext/iodine/iodine_pubsub.c', line 442

static VALUE ips_get_default(VALUE self) {
  return rb_ivar_get(self, default_pubsubid);
}

.default_pubsub=(en) ⇒ Object

Sets the default Pub/Sub engine to be used.

See PubSub and Iodine::PubSub::Engine for more details.



425
426
427
428
429
430
431
432
433
434
435
# File 'ext/iodine/iodine_pubsub.c', line 425

static VALUE ips_set_default(VALUE self, VALUE en) {
  iodine_engine_s *e;
  Data_Get_Struct(en, iodine_engine_s, e);
  if (!e)
    rb_raise(rb_eArgError, "deafult engine must be an Iodine::PubSub::Engine.");
  if (!e->p)
    rb_raise(rb_eArgError, "This Iodine::PubSub::Engine is broken.");
  rb_ivar_set(self, default_pubsubid, en);
  PUBSUB_DEFAULT_ENGINE = e->p;
  return en;
}

.listen(port, handler) ⇒ Object

Sets up a listening socket. Conncetions received at the assigned port will be handled by the assigned handler.

Multiple services (listening sockets) can be registered before starting the Iodine event loop.



436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
# File 'ext/iodine/iodine_protocol.c', line 436

static VALUE iodine_listen(VALUE self, VALUE port, VALUE handler) {
  // validate that the handler is a class and include the Iodine::Protocol
  if (TYPE(handler) == T_CLASS) {
    // include the Protocol module
    // // do we neet to check?
    // if (rb_mod_include_p(protocol, rDynProtocol) == Qfalse)
    rb_include_module(handler, IodineProtocol);
    rb_extend_object(handler, IodineProtocol);
  } else {
    rb_raise(rb_eTypeError, "The connection handler MUST be of type Class.");
    return Qnil;
  }
  if (TYPE(port) != T_FIXNUM && TYPE(port) != T_STRING)
    rb_raise(rb_eTypeError, "The port variable must be a Fixnum or a String.");
  if (TYPE(port) == T_FIXNUM)
    port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
  rb_ivar_set(self, rb_intern("_port"), port);
  // listen
  if (facil_listen(.port = StringValueCStr(port), .udata = (void *)handler,
                   .on_open = on_open_dyn_protocol) == -1)
    return Qnil;
  return self;
}

.listen2http(opt) ⇒ Object

Listens to incoming HTTP connections and handles incoming requests using the Rack specification.

This is delegated to a lower level C HTTP and Websocket implementation, no Ruby object will be crated except the ‘env` object required by the Rack specifications.

Accepts a single Hash argument with the following properties:

app

the Rack application that handles incoming requests. Default: ‘nil`.

port

the port to listen to. Default: 3000.

address

the address to bind to. Default: binds to all possible addresses.

log

enable response logging (Hijacked sockets aren’t logged). Default: off.

public

The root public folder for static file service. Default: none.

timeout

Timeout for inactive HTTP/1.x connections. Defaults: 5 seconds.

max_body

The maximum body size for incoming HTTP messages. Default: ~50Mib.

max_msg

The maximum Websocket message size allowed. Default: ~250Kib.

ping

The Websocket ‘ping` interval. Default: 40 sec.

Either the ‘app` or the `public` properties are required. If niether exists, the function will fail. If both exist, Iodine will serve static files as well as dynamic requests.

When using the static file server, it’s possible to serve ‘gzip` versions of the static files by saving a compressed version with the `gz` extension (i.e. `styles.css.gz`).

‘gzip` will only be served to clients tat support the `gzip` transfer encoding.

Once HTTP/2 is supported (planned, but probably very far away), HTTP/2 timeouts will be dynamically managed by Iodine. The ‘timeout` option is only relevant to HTTP/1.x connections.



666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
# File 'ext/iodine/iodine_http.c', line 666

VALUE iodine_http_listen(VALUE self, VALUE opt) {
  static int called_once = 0;
  uint8_t log_http = 0;
  size_t ping = 0;
  size_t max_body = 0;
  size_t max_msg = 0;
  Check_Type(opt, T_HASH);
  VALUE app = rb_hash_aref(opt, ID2SYM(rb_intern("app")));
  VALUE www = rb_hash_aref(opt, ID2SYM(rb_intern("public")));
  VALUE port = rb_hash_aref(opt, ID2SYM(rb_intern("port")));
  VALUE address = rb_hash_aref(opt, ID2SYM(rb_intern("address")));
  VALUE tout = rb_hash_aref(opt, ID2SYM(rb_intern("timeout")));

  VALUE tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_msg")));
  if (tmp != Qnil && tmp != Qfalse) {
    Check_Type(tmp, T_FIXNUM);
    max_msg = FIX2ULONG(tmp);
  }

  tmp = rb_hash_aref(opt, ID2SYM(rb_intern("max_body")));
  if (tmp != Qnil && tmp != Qfalse) {
    Check_Type(tmp, T_FIXNUM);
    max_body = FIX2ULONG(tmp);
  }

  tmp = rb_hash_aref(opt, ID2SYM(rb_intern("ping")));
  if (tmp != Qnil && tmp != Qfalse) {
    Check_Type(tmp, T_FIXNUM);
    ping = FIX2ULONG(tmp);
  }
  if (ping > 255) {
    fprintf(stderr, "Iodine Warning: Websocket timeout value "
                    "is over 255 and is silently ignored.\n");
    ping = 0;
  }

  tmp = rb_hash_aref(opt, ID2SYM(rb_intern("log")));
  if (tmp != Qnil && tmp != Qfalse)
    log_http = 1;

  if ((app == Qnil || app == Qfalse) && (www == Qnil || www == Qfalse)) {
    fprintf(stderr, "Iodine Warning: HTTP without application or public folder "
                    "(is silently ignored).\n");
    return Qfalse;
  }

  if ((www != Qnil && www != Qfalse)) {
    Check_Type(www, T_STRING);
    Registry.add(www);
  } else
    www = 0;

  if ((address != Qnil && address != Qfalse))
    Check_Type(address, T_STRING);
  else
    address = 0;

  if ((tout != Qnil && tout != Qfalse)) {
    Check_Type(tout, T_FIXNUM);
    tout = FIX2ULONG(tout);
  } else
    tout = 0;
  if (tout > 255) {
    fprintf(stderr, "Iodine Warning: HTTP timeout value "
                    "is over 255 and is silently ignored.\n");
    tout = 0;
  }

  if (port != Qnil && port != Qfalse) {
    if (!RB_TYPE_P(port, T_STRING) && !RB_TYPE_P(port, T_FIXNUM))
      rb_raise(rb_eTypeError,
               "The `port` property MUST be either a String or a Number");
    if (RB_TYPE_P(port, T_FIXNUM))
      port = rb_funcall2(port, iodine_to_s_method_id, 0, NULL);
  } else
    port = rb_str_new("3000", 4);
  Registry.add(port);

  if ((app != Qnil && app != Qfalse))
    Registry.add(app);
  else
    app = 0;

  iodine_http_settings_s *set = malloc(sizeof(*set));
  *set = (iodine_http_settings_s){.app = app, .ping = ping, .max_msg = max_msg};

  init_env_template(set, (www ? 1 : 0));

  if (http_listen(StringValueCStr(port),
                  (address ? StringValueCStr(address) : NULL),
                  .on_request = on_rack_request, .udata = set,
                  .timeout = (tout ? FIX2INT(tout) : tout),
                  .on_finish = free_iodine_http, .log_static = log_http,
                  .max_body_size = max_body,
                  .public_folder = (www ? StringValueCStr(www) : NULL))) {
    fprintf(stderr,
            "ERROR: Failed to initialize a listening HTTP socket for port %s\n",
            port ? StringValueCStr(port) : "3000");
    return Qfalse;
  }

  if ((app == Qnil || app == Qfalse)) {
    fprintf(stderr,
            "* Iodine: (no app) the HTTP service on port %s will only serve "
            "static files.\n",
            (port ? StringValueCStr(port) : "3000"));
  }
  if (called_once)
    defer(iodine_print_http_msg2, (www ? (void *)www : NULL), (void *)port);
  else {
    called_once = 1;
    defer(iodine_print_http_msg1, (www ? (void *)www : NULL), (void *)port);
  }

  return Qtrue;
  (void)self;
}

.on_idleObject

Schedules a single occuring event for the next idle cycle.

To schedule a reoccuring event, simply reschedule the event at the end of it’s run.

i.e.

IDLE_PROC = Proc.new { puts "idle"; Iodine.on_idle &IDLE_PROC }
Iodine.on_idle &IDLE_PROC


175
176
177
178
179
180
181
182
183
184
185
# File 'ext/iodine/iodine.c', line 175

VALUE iodine_sched_on_idle(VALUE self) {
  rb_need_block();
  iodine_idle_block_s *b = malloc(sizeof(*b));
  b->block = rb_block_proc();
  Registry.add(b->block);
  spn_lock(&iodine_on_idle_lock);
  fio_list_push(iodine_idle_block_s, node, iodine_on_idle_list, b);
  spn_unlock(&iodine_on_idle_lock);
  return b->block;
  (void)self;
}

.patch_rackObject



173
174
175
176
177
178
179
180
# File 'lib/iodine.rb', line 173

def self.patch_rack
  ::Rack::Utils.class_eval do
    Iodine::Base::MonkeyPatch::RackUtils.methods(false).each do |m|
      ::Rack::Utils.define_singleton_method(m,
            Iodine::Base::MonkeyPatch::RackUtils.instance_method(m) )
    end
  end
end

.processesObject

Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.



140
141
142
# File 'lib/iodine.rb', line 140

def self.processes
  @processes
end

.processes=(count) ⇒ Object

Get/Set the number of worker processes. A value greater then 1 will cause the Iodine to “fork” any extra worker processes needed.



145
146
147
# File 'lib/iodine.rb', line 145

def self.processes=(count)
  @processes = count.to_i
end

.publish(args) ⇒ Object

Publishes a message to a channel.

Accepts a single Hash argument with the following possible options:

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.

:channel

Required (unless :pattern). The channel to publish to.

:pattern

An alternative to the required :channel, publishes to a pattern.

This is NOT supported by Redis and it’s limited to the local process cluster.

:message

REQUIRED. The message to be published.

:



551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
# File 'ext/iodine/iodine_pubsub.c', line 551

static VALUE iodine_publish(VALUE self, VALUE args) {
  Check_Type(args, T_HASH);
  uint8_t use_pattern = 0;

  VALUE rb_ch = rb_hash_aref(args, channel_var_id);
  if (rb_ch == Qnil || rb_ch == Qfalse) {
    use_pattern = 1;
    rb_ch = rb_hash_aref(args, pattern_var_id);
    if (rb_ch == Qnil || rb_ch == Qfalse)
      rb_raise(rb_eArgError, "channel is required for pub/sub methods.");
  }
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  VALUE rb_msg = rb_hash_aref(args, message_var_id);
  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required for the :publish method.");
  }
  Check_Type(rb_msg, T_STRING);

  pubsub_engine_s *engine =
      iodine_engine_ruby2facil(rb_hash_aref(args, engine_varid));

  intptr_t ret =
      pubsub_publish(.engine = engine, .channel.name = (RSTRING_PTR(rb_ch)),
                     .channel.len = (RSTRING_LEN(rb_ch)),
                     .msg.data = (RSTRING_PTR(rb_msg)),
                     .msg.len = (RSTRING_LEN(rb_msg)),
                     .use_pattern = use_pattern);
  if (!ret)
    return Qfalse;
  return Qtrue;
  (void)self;
}

.runObject



138
139
140
141
142
143
144
145
146
147
# File 'ext/iodine/iodine.c', line 138

static VALUE iodine_run(VALUE self) {
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  defer(iodine_perform_deferred, (void *)block, NULL);
  return block;
  (void)self;
}

.run_after(milliseconds) ⇒ Object

Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).

Tasks scheduled before calling start will run once for every process.

Always returns a copy of the block object.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'ext/iodine/iodine.c', line 73

static VALUE iodine_run_after(VALUE self, VALUE milliseconds) {
  (void)(self);
  if (TYPE(milliseconds) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "milliseconds must be a number");
    return Qnil;
  }
  size_t milli = FIX2UINT(milliseconds);
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  Registry.add(block);
  if (facil_run_every(milli, 1, iodine_run_task, (void *)block,
                      (void (*)(void *))Registry.remove) == -1) {
    perror("ERROR: Iodine couldn't initialize timer");
    return Qnil;
  }
  return block;
}

.run_every(*args) ⇒ Object

Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).

Accepts:

milliseconds

the number of milliseconds between event repetitions.

repetitions

the number of event repetitions. Defaults to 0 (never ending).

block

(required) a block is required, as otherwise there is nothing to

perform.

The event will repeat itself until the number of repetitions had been delpeted.

Always returns a copy of the block object.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'ext/iodine/iodine.c', line 110

static VALUE iodine_run_every(int argc, VALUE *argv, VALUE self) {
  (void)(self);
  VALUE milliseconds, repetitions, block;

  rb_scan_args(argc, argv, "11&", &milliseconds, &repetitions, &block);

  if (TYPE(milliseconds) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "milliseconds must be a number.");
    return Qnil;
  }
  if (repetitions != Qnil && TYPE(repetitions) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "repetitions must be a number or `nil`.");
    return Qnil;
  }

  size_t milli = FIX2UINT(milliseconds);
  size_t repeat = (repetitions == Qnil) ? 0 : FIX2UINT(repetitions);
  // requires a block to be passed
  rb_need_block();
  Registry.add(block);
  if (facil_run_every(milli, repeat, iodine_run_task, (void *)block,
                      (void (*)(void *))Registry.remove) == -1) {
    perror("ERROR: Iodine couldn't initialize timer");
    return Qnil;
  }
  return block;
}

.startObject

Starts the Iodine event loop. This will hang the thread until an interrupt (‘^C`) signal is received.

Returns the Iodine module.



310
311
312
313
314
315
316
317
318
# File 'ext/iodine/iodine.c', line 310

static VALUE iodine_start(VALUE self) {
  /* for the special Iodine::Rack object and backwards compatibility */
  if (iodine_review_rack_app()) {
    fprintf(stderr, "ERROR: (iodine) cann't start Iodine::Rack.\n");
    return Qnil;
  }
  rb_thread_call_without_gvl2(srv_start_no_gvl, (void *)self, NULL, NULL);
  return self;
}

.subscribe(args) ⇒ Object

Subscribes the process to a channel belonging to a specific pub/sub service (using an Iodine::PubSub::Engine to connect Iodine to the service).

The function accepts a single argument (a Hash) and a required block.

Accepts a single Hash argument with the following possible options:

:engine

If provided, the engine to use for pub/sub. Otherwise the default

engine is used.

:channel

Required (unless :pattern). The channel to subscribe to.

:pattern

An alternative to the required :channel, subscribes to a pattern.



488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'ext/iodine/iodine_pubsub.c', line 488

static VALUE iodine_subscribe(VALUE self, VALUE args) {
  Check_Type(args, T_HASH);
  rb_need_block();

  uint8_t use_pattern = 0;

  VALUE rb_ch = rb_hash_aref(args, channel_var_id);
  if (rb_ch == Qnil || rb_ch == Qfalse) {
    use_pattern = 1;
    rb_ch = rb_hash_aref(args, pattern_var_id);
    if (rb_ch == Qnil || rb_ch == Qfalse)
      rb_raise(rb_eArgError, "a channel is required for pub/sub methods.");
  }
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  VALUE block = rb_block_proc();

  pubsub_engine_s *engine =
      iodine_engine_ruby2facil(rb_hash_aref(args, engine_varid));

  uintptr_t subid = (uintptr_t)
      pubsub_subscribe(.channel.name = RSTRING_PTR(rb_ch),
                       .channel.len = RSTRING_LEN(rb_ch), .engine = engine,
                       .use_pattern = use_pattern,
                       .on_message = (block ? on_pubsub_notificationin : NULL),
                       .on_unsubscribe = (block ? iodine_on_unsubscribe : NULL),
                       .udata1 = (void *)block);
  if (!subid)
    return Qnil;
  return ULL2NUM(subid);
  (void)self;
}

.threadsObject

Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).



130
131
132
# File 'lib/iodine.rb', line 130

def self.threads
  @threads
end

.threads=(count) ⇒ Object

Get/Set the number of threads used in the thread pool (a static thread pool). Can be 1 (single working thread, the main thread will sleep) and can be 0 (the main thread will be used as the only active thread).



135
136
137
# File 'lib/iodine.rb', line 135

def self.threads=(count)
  @threads = count.to_i
end

.unsubscribe(sub_id) ⇒ Object

Cancels the subscription matching ‘sub_id`.



526
527
528
529
530
531
532
533
# File 'ext/iodine/iodine_pubsub.c', line 526

static VALUE iodine_unsubscribe(VALUE self, VALUE sub_id) {
  if (sub_id == Qnil || sub_id == Qfalse)
    return Qnil;
  Check_Type(sub_id, T_FIXNUM);
  pubsub_unsubscribe((pubsub_sub_pt)NUM2LONG(sub_id));
  return Qnil;
  (void)self;
}

.warmup(app) ⇒ Object

Runs the warmup sequence. warmup attempts to get every “autoloaded” (lazy loaded) file to load now instead of waiting for “first access”. This allows multi-threaded safety and better memory utilization during forking.

However, ‘warmup` might cause undefined behavior and should be avoided when using gems that initiate network / database connections or gems that spawn threads (i.e., ActiveRecord / ActiveCable).

Use warmup when either processes or threads are set to more then 1 and gems don’t spawn threads or initialize network connections.



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/iodine.rb', line 155

def self.warmup app
  # load anything marked with `autoload`, since autoload isn't thread safe nor fork friendly.
  Iodine.run do
    Module.constants.each do |n|
      begin
        Object.const_get(n)
      rescue Exception => _e
      end
    end
    ::Rack::Builder.new(app) do |r|
      r.warmup do |a|
        client = ::Rack::MockRequest.new(a)
        client.get('/')
      end
    end
  end
end