Module: Iodine

Defined in:
lib/iodine.rb,
lib/iodine/tls.rb,
lib/iodine/json.rb,
lib/iodine/pubsub.rb,
lib/iodine/version.rb,
lib/iodine/mustache.rb,
lib/iodine/connection.rb,
lib/iodine/rack_utils.rb,
lib/rack/handler/iodine.rb,
ext/iodine_ext/iodine.c

Overview

Iodine is an HTTP / WebSocket server as well as an Evented Network Tool Library. In essense, Iodine is a Ruby port for the facil.io C library.

Here is a simple telnet based echo server using Iodine (see full list at Connection):

  require 'iodine'
  # define the protocol for our service
  module EchoProtocol
    def on_open(client)
      # Set a connection timeout
      client.timeout = 10
      # Write a welcome message
      client.write "Echo server running on Iodine #{Iodine::VERSION}.\r\n"
    end
    # this is called for incoming data - note data might be fragmented.
    def on_message(client, data)
      # write the data we received
      client.write "echo: #{data}"
      # close the connection when the time comes
      client.close if data =~ /^bye[\n\r]/
    end
    # called if the connection is still open and the server is shutting down.
    def on_shutdown(client)
      # write the data we received
      client.write "Server going away\r\n"
    end
    extend self
  end
  # create the service instance, the block returns a connection handler.
  Iodine.listen(port: "3000") { EchoProtocol }
  # start the service
  Iodine.threads = 1
  Iodine.start

Methods for setting up and starting Iodine include Iodine.start, Iodine.threads, Iodine.threads=, Iodine.workers and Iodine.workers=.

Methods for setting startup / operational callbacks include Iodine.on_idle, Iodine.on_state.

Methods for asynchronous execution include Iodine.run (same as Iodine.defer), Iodine.run_after and Iodine.run_every.

Methods for application wide pub/sub include Iodine.subscribe, Iodine.unsubscribe and Iodine.publish. Connection specific pub/sub methods are documented in the Connection class).

Methods for TCP/IP, Unix Sockets and HTTP connections include Iodine.listen and Iodine.connect.

Note that the HTTP server supports both TCP/IP and Unix Sockets as well as SSE / WebSockets extensions.

Iodine doesn't call Iodine.patch_rack automatically, but doing so will improve Rack's performace.

Please read the README file for an introduction to Iodine.

Defined Under Namespace

Modules: Base, JSON, PubSub, Rack Classes: Connection, Mustache, TLS

Constant Summary collapse

VERSION =
'0.8.3'.freeze
DEFAULT_SETTINGS =

The default connection settings used by listen and connect.

It's a Hash object that allows Iodine default values to be manipulated. i.e.:

  DEFAULT_SETTINGS[:port] = "8080" # replaces the default port, which is `ENV["port"] || "3000"`.
{}
DEFAULT_HTTP_ARGS =
Deprecated.

The default connection settings used by listen and connect.

DEFAULT_SETTINGS

Class Method Summary collapse

Class Method Details

.after_fork(&block) ⇒ Object

Deprecated.

use on_state.

Sets a block of code to run after a new worker process is forked (cluster mode only).

Code runs in both the parent and the child.



100
101
102
103
# File 'lib/iodine.rb', line 100

def self.after_fork(&block)
  warn "Iodine.after_fork is deprecated, use Iodine.on_state(:after_fork)."
  Iodine.on_state(:after_fork, &block)
end

.after_fork_in_master(&block) ⇒ Object

Deprecated.

use on_state.

Sets a block of code to run in the master / root process, after a new worker process is forked (cluster mode only).



114
115
116
117
# File 'lib/iodine.rb', line 114

def self.after_fork_in_master(&block)
  warn "Iodine.after_fork_in_master is deprecated, use Iodine.on_state(:enter_master)."
  Iodine.on_state(:enter_master, &block)
end

.after_fork_in_worker(&block) ⇒ Object

Deprecated.

use on_state.

Sets a block of code to run in the worker process, after a new worker process is forked (cluster mode only).



107
108
109
110
# File 'lib/iodine.rb', line 107

def self.after_fork_in_worker(&block)
  warn "Iodine.after_fork_in_worker is deprecated, use Iodine.on_state(:enter_child)."
  Iodine.on_state(:enter_child, &block)
end

.attach_fd(fd, handler) ⇒ Object

The attach_fd method instructs iodine to attach a socket to the server using it's numerical file descriptor.

This is faster than attaching a Ruby IO object since it allows iodine to directly call the system's read/write methods. However, this doesn't support TLS/SSL connections.

This method requires two objects, a file descriptor (fd) and a callback object.

See listen for details about the callback object.

Returns the callback object (handler) used.



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'ext/iodine_ext/iodine_tcp.c', line 273

static VALUE iodine_tcp_attach_fd(VALUE self, VALUE fd, VALUE handler) {
  // clang-format on
  Check_Type(fd, T_FIXNUM);
  if (handler == Qnil || handler == Qfalse || handler == Qtrue) {
    rb_raise(rb_eArgError, "A callback object must be provided.");
  }
  IodineStore.add(handler);
  int other = dup(NUM2INT(fd));
  if (other == -1) {
    rb_raise(rb_eIOError, "invalid fd.");
  }
  intptr_t uuid = fio_fd2uuid(other);
  iodine_tcp_attch_uuid(uuid, handler);
  IodineStore.remove(handler);
  return handler;
  (void)self;
}

.before_fork(&block) ⇒ Object

Deprecated.

use on_state.

Sets a block of code to run before a new worker process is forked (cluster mode only).



91
92
93
94
# File 'lib/iodine.rb', line 91

def self.before_fork(&block)
  warn "Iodine.before_fork is deprecated, use Iodine.on_state(:before_fork)."
  Iodine.on_state(:before_fork, &block)
end

.connect(args) ⇒ Object

The connect method instructs iodine to connect to a server using either TCP/IP or Unix sockets.

 Iodine.connect(settings)

Supported Settigs:

:url URL indicating service type, host name, port and optional path.
:handler see details below.
:address an IP address or a unix socket address. Only relevant if :url is missing.
:body (HTTP client) the body to be sent.
:cookies (HTTP/WebSocket client) cookie data.
:headers (HTTP/WebSocket client) custom headers.
:log (HTTP only) - logging the requests.
:max_body (HTTP only) - limits HTTP body in the response, see listen.
:max_headers (HTTP only) - limits the header length in the response, see listen.
:max_msg (WebSockets only) maximum incoming message size pre message (in Kb).
:method (HTTP client) a String such as "GET" or "POST".
:path HTTP/WebSocket client) the HTTP path to be used.
:ping ping interval (in seconds). Up to 255 seconds.
:port port number to listen to either a String or Number)
:public (public folder, HTTP server only)
:service (:raw / :tls / :ws / :wss )
:timeout (HTTP only) keep-alive timeout in seconds. Up to 255 seconds.
:tls an TLS context object for encrypted connections.

Some connection settings are only valid for HTTP / WebSocket connections.

If :url is provided, it will overwrite the :address, :port and :path settings (if provided).

Unlike listen, a block can't be used and a :handler object must be provided.

If the connection fails, only the on_close callback will be called (with a nil client).

Here's an example TCP/IP client that sends a simple HTTP GET request:

  # use a secure connection?
  USE_TLS = false

  # remote server details
  $port = USE_TLS ? 443 : 80
  $address = "google.com"


  # require iodine
  require 'iodine'

  # Iodine runtime settings
  Iodine.threads = 1
  Iodine.workers = 1
  Iodine.verbosity = 3 # warnings only


  # a client callback handler
  module Client

    def self.on_open(connection)
      # Set a connection timeout
      connection.timeout = 10
      # subscribe to the chat channel.
      puts "* Sending request..."
      connection.write "GET / HTTP/1.1\r\nHost: #{$address}\r\n\r\n"
    end

    def self.on_message(connection, data)
      # publish the data we received
      STDOUT.write data
      # close the connection after a second... we're not really parsing anything, so it's a guess.
      Iodine.run_after(1000) { connection.close }
    end

    def self.on_close(connection)
      # stop iodine
      Iodine.stop
      puts "Done."
    end

    # returns the callback object (self).
    def self.call
      self
    end
  end



  if(USE_TLS)
    tls = Iodine::TLS.new
    # ALPN blocks should return a valid calback object
    tls.on_protocol("http/1.1") { Client }
  end

  Iodine.connect(address: $address, port: $port, handler: Client, tls: tls)

  # start the iodine reactor
  Iodine.start

Iodine also supports WebSocket client connections, using either the url property or the ws and wss service names.

The following example establishes a secure (TLS) connects to the WebSocket echo testing server at wss://echo.websocket.org:

  # require iodine
  require 'iodine'

  # The client class
  class EchoClient

    def on_open(connection)
      @messages = [ "Hello World!",
        "I'm alive and sending messages",
        "I also receive messages",
        "now that we all know this...",
        "I can stop.",
        "Goodbye." ]
      send_one_message(connection)
    end

    def on_message(connection, message)
      puts "Received: #{message}"
      send_one_message(connection)
    end

    def on_close(connection)
      # in this example, we stop iodine once the client is closed
      puts "* Client closed."
      Iodine.stop
    end

    # We use this method to pop messages from the queue and send them
    #
    # When the queue is empty, we disconnect the client.
    def send_one_message(connection)
      msg = @messages.shift
      if(msg)
        connection.write msg
      else
        connection.close
      end
    end
  end

  Iodine.threads = 1
  Iodine.connect url: "wss://echo.websocket.org", handler: EchoClient.new, ping: 40
  Iodine.start

Note*: the on_close callback is always called, even if a connection couldn't be established.

Returns the handler object used.



1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
# File 'ext/iodine_ext/iodine.c', line 1299

static VALUE iodine_connect(VALUE self, VALUE args) {
  // clang-format on
  iodine_connection_args_s s = iodine_connect_args(args, 0);
  intptr_t uuid = -1;
  switch (s.service) {
  case IODINE_SERVICE_RAW:
    uuid = iodine_tcp_connect(s);
    break;
  case IODINE_SERVICE_HTTP:
    iodine_connect_args_cleanup(&s);
    rb_raise(rb_eRuntimeError, "HTTP client connections aren't supported yet.");
    return Qnil;
    break;
  case IODINE_SERVICE_WS:
    uuid = iodine_ws_connect(s);
    break;
  }
  iodine_connect_args_cleanup(&s);
  if (uuid == -1)
    rb_raise(rb_eRuntimeError, "Couldn't open client socket.");
  return self;
}

.deferObject

Runs a block of code asyncronously (adds the code to the event queue).

Always returns the block of code to executed (Proc object).

Code will be executed only while Iodine is running (after start).

Code blocks that where scheduled to run before Iodine enters cluster mode will run on all child processes.



202
203
204
205
206
207
208
# File 'ext/iodine_ext/iodine_defer.c', line 202

static VALUE iodine_defer_run(VALUE self) {
  rb_need_block();
  VALUE block = IodineStore.add(rb_block_proc());
  fio_defer(iodine_defer_performe_once, (void *)block, NULL);
  return block;
  (void)self;
}

.listen(args) ⇒ Object

listen can be used to listen to any incoming connections, including HTTP and raw (tcp/ip and unix sockets) connections.

 Iodine.listen(settings)

Supported Settigs:

:url URL indicating service type, host name and port. Path will be parsed as a Unix socket.
:handler (deprecated: :app) see details below.
:address an IP address or a unix socket address. Only relevant if :url is missing.
:log (HTTP only) request logging. For global verbosity see verbosity
:max_body (HTTP only) maximum upload size allowed per request before disconnection (in Mb).
:max_headers (HTTP only) maximum total header length allowed per request (in Kb).
:max_msg (WebSockets only) maximum message size pre message (in Kb).
:ping (:raw clients and WebSockets only) ping interval (in seconds). Up to 255 seconds.
:port port number to listen to either a String or Number)
:public (HTTP server only) public folder for static file service.
:service (:raw / :tls / :ws / :wss / :http / :https ) a supported service this socket will listen to.
:timeout (HTTP only) keep-alive timeout in seconds. Up to 255 seconds.
:tls an TLS context object for encrypted connections.

Some connection settings are only valid when listening to HTTP / WebSocket connections.

If :url is provided, it will overwrite the :address and :port settings (if provided).

For HTTP connections, the :handler must be a valid Rack application object (answers .call(env)).

Here's an example for an HTTP hello world application:

  require 'iodine'
  # a handler can be a block
  Iodine.listen(service: :http, port: "3000") {|env| [200, {"Content-Length" => "12"}, ["Hello World!"]] }
  # start the service
  Iodine.threads = 1
  Iodine.start

Here's another example, using a Unix Socket instead of a TCP/IP socket for an HTTP hello world application.

This example shows how the :url option can be used, but the :address settings could have been used for the same effect (with port: 0).

  require 'iodine'
  # note that unix sockets in URL form use an absolute path.
  Iodine.listen(url: "http://:0/tmp/sock.sock") {|env| [200, {"Content-Length" => "12"}, ["Hello World!"]] }
  # start the service
  Iodine.threads = 1
  Iodine.start

For raw connections, the :handler object should be an object that answer .call and returns a valid callback object that supports the following callbacks (see also Connection):

on_open(client) called after a connection was established
on_message(client,data) called when incoming data is available. Data may be fragmented.
on_drained(client) called after pending client.write events have been processed (see Iodine::Connection#pending).
ping(client) called whenever a timeout has occured (see Iodine::Connection#timeout=).
on_shutdown(client) called if the server is shutting down. This is called before the connection is closed.
on_close(client) called when the connection with the client was closed.

The client argument passed to the :handler callbacks is an Connection instance that represents the connection / the client.

Here's an example for a telnet based chat-room example:

  require 'iodine'
  # define the protocol for our service
  module ChatHandler
    def self.on_open(client)
      # Set a connection timeout
      client.timeout = 10
      # subscribe to the chat channel.
      client.subscribe :chat
      # Write a welcome message
      client.publish :chat, "new member entered the chat\r\n"
    end
    # this is called for incoming data - note data might be fragmented.
    def self.on_message(client, data)
      # publish the data we received
      client.publish :chat, data
      # close the connection when the time comes
      client.close if data =~ /^bye[\n\r]/
    end
    # called whenever timeout occurs.
    def self.ping(client)
      client.write "System: quite, isn't it...?\r\n"
    end
    # called if the connection is still open and the server is shutting down.
    def self.on_shutdown(client)
      # write the data we received
      client.write "Chat server going away. Try again later.\r\n"
    end
    # returns the callback object (self).
    def self.call
      self
    end
  end
  # we use can both the `handler` keyword or a block, anything that answers #call.
  Iodine.listen(service: :raw, port: "3000", handler: ChatHandler)
  # we can listen to more than a single socket at a time.
  Iodine.listen(url: "raw://:3030", handler: ChatHandler)
  # start the service
  Iodine.threads = 1
  Iodine.start

Returns the handler object used.



1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
# File 'ext/iodine_ext/iodine.c', line 1122

static VALUE iodine_listen(VALUE self, VALUE args) {
  // clang-format on
  iodine_connection_args_s s = iodine_connect_args(args, 1);
  intptr_t uuid = -1;
  switch (s.service) {
  case IODINE_SERVICE_RAW:
    uuid = iodine_tcp_listen(s);
    break;
  case IODINE_SERVICE_HTTP: /* overflow */
  case IODINE_SERVICE_WS:
    uuid = iodine_http_listen(s);
    break;
  }
  iodine_connect_args_cleanup(&s);
  if (uuid == -1)
    rb_raise(rb_eRuntimeError, "Couldn't open listening socket.");
  return s.handler;
  (void)self;
}

.listen2http(args, &block) ⇒ Object

Deprecated.

use listen with service: :http.

Sets a block of code to run once a Worker process shuts down (both in single process mode and cluster mode).



82
83
84
85
86
# File 'lib/iodine.rb', line 82

def self.listen2http(args, &block)
  warn "Iodine.listen2http is deprecated, use Iodine.listen(service: :http)."
  args[:service] = :http;
  Iodine.listen(args, &block)
end

.master?Boolean

Returns true if this process is the master / root process, false otherwise.

Note that the master process might be a worker process as well, when running in single process mode (see workers).

Returns:

  • (Boolean)


306
307
308
# File 'ext/iodine_ext/iodine.c', line 306

static VALUE iodine_master_is(VALUE self) {
  return fio_is_master() ? Qtrue : Qfalse;
}

.on_idleObject

Schedules a single occuring event for the next idle cycle.

To schedule a reoccuring event, reschedule the event at the end of it's run.

i.e.

  IDLE_PROC = Proc.new { puts "idle"; Iodine.on_idle &IDLE_PROC }
  Iodine.on_idle &IDLE_PROC


83
84
85
86
87
88
89
90
91
92
# File 'ext/iodine_ext/iodine.c', line 83

static VALUE iodine_sched_on_idle(VALUE self) {
  // clang-format on
  rb_need_block();
  VALUE block = rb_block_proc();
  IodineStore.add(block);
  fio_state_callback_add(FIO_CALL_ON_IDLE, iodine_perform_on_idle_callback,
                         (void *)block);
  return block;
  (void)self;
}

.on_shutdown(&block) ⇒ Object

Deprecated.

use on_state.

Sets a block of code to run once a Worker process shuts down (both in single process mode and cluster mode).



121
122
123
124
# File 'lib/iodine.rb', line 121

def self.on_shutdown(&block)
  warn "Iodine.on_shutdown is deprecated, use Iodine.on_state(:on_finish)."
  Iodine.on_state(:on_finish, &block)
end

.on_state(event) ⇒ Object

Sets a block of code to run when Iodine's core state is updated.

The state event Symbol can be any of the following:

:pre_start the block will be called once before starting up the IO reactor.
:before_fork the block will be called before each time the IO reactor forks a new worker.
:after_fork the block will be called after each fork (both in parent and workers).
:enter_child the block will be called by a worker process right after forking.
:enter_master the block will be called by the master process after spawning a worker (after forking).
:on_start the block will be called every time a worker proceess starts. In single process mode, the master process is also a worker.
:on_parent_crush the block will be called by each worker the moment it detects the master process crashed.
:on_child_crush the block will be called by the parent (master) after a worker process crashed.
:start_shutdown the block will be called before starting the shutdown sequence.
:on_finish the block will be called just before finishing up (both on chlid and parent processes).

Code runs in both the parent and the child.

Parameters:

  • event (Symbol)

    the state event for which the block should run (see list).

Since:

  • 0.7.9



326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'ext/iodine_ext/iodine_defer.c', line 326

static VALUE iodine_on_state(VALUE self, VALUE event) {
  // clang-format on
  rb_need_block();
  Check_Type(event, T_SYMBOL);
  VALUE block = rb_block_proc();
  IodineStore.add(block);
  ID state = rb_sym2id(event);

  if (state == STATE_PRE_START) {
    fio_state_callback_add(FIO_CALL_PRE_START,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_BEFORE_FORK) {
    fio_state_callback_add(FIO_CALL_BEFORE_FORK,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_AFTER_FORK) {
    fio_state_callback_add(FIO_CALL_AFTER_FORK,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_ENTER_CHILD) {
    fio_state_callback_add(FIO_CALL_IN_CHILD,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_ENTER_MASTER) {
    fio_state_callback_add(FIO_CALL_IN_MASTER,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_ON_START) {
    fio_state_callback_add(FIO_CALL_ON_START,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_ON_PARENT_CRUSH) {
    fio_state_callback_add(FIO_CALL_ON_PARENT_CRUSH,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_ON_CHILD_CRUSH) {
    fio_state_callback_add(FIO_CALL_ON_CHILD_CRUSH,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_START_SHUTDOWN) {
    fio_state_callback_add(FIO_CALL_ON_SHUTDOWN,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else if (state == STATE_ON_FINISH) {
    fio_state_callback_add(FIO_CALL_ON_FINISH,
                           iodine_perform_state_callback_persist,
                           (void *)block);
  } else {
    IodineStore.remove(block);
    rb_raise(rb_eTypeError, "unknown event in Iodine.on_state");
  }
  return block;
  (void)self;
}

.patch_rackObject

Will monkey patch some Rack methods to increase their performance.

This is recommended, see Iodine::Rack::Utils for details.



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/iodine.rb', line 65

def self.patch_rack
  begin
    require 'rack'
  rescue LoadError
  end
  ::Rack::Utils.class_eval do
    Iodine::Base::MonkeyPatch::RackUtils.methods(false).each do |m|
      ::Rack::Utils.define_singleton_method(m,
            Iodine::Base::MonkeyPatch::RackUtils.instance_method(m) )
    end
  end
end

.publish(*args) ⇒ Object

Publishes a message to a channel.

Can be used using two Strings:

  publish(to, message)

The method accepts an optional engine argument:

  publish(to, message, my_pubsub_engine)


704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
# File 'ext/iodine_ext/iodine_connection.c', line 704

static VALUE iodine_pubsub_publish(int argc, VALUE *argv, VALUE self) {
  // clang-format on
  VALUE rb_ch, rb_msg, rb_engine = Qnil;
  const fio_pubsub_engine_s *engine = NULL;
  switch (argc) {
  case 3:
    /* fallthrough */
    rb_engine = argv[2];
  case 2:
    rb_ch = argv[0];
    rb_msg = argv[1];
    break;
  case 1: {
    /* single argument must be a Hash */
    Check_Type(argv[0], T_HASH);
    rb_ch = rb_hash_aref(argv[0], to_id);
    if (rb_ch == Qnil || rb_ch == Qfalse) {
      rb_ch = rb_hash_aref(argv[0], channel_id);
    }
    rb_msg = rb_hash_aref(argv[0], message_id);
    rb_engine = rb_hash_aref(argv[0], engine_id);
  } break;
  default:
    rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
  }

  if (rb_msg == Qnil || rb_msg == Qfalse) {
    rb_raise(rb_eArgError, "message is required.");
  }
  Check_Type(rb_msg, T_STRING);

  if (rb_ch == Qnil || rb_ch == Qfalse)
    rb_raise(rb_eArgError, "target / channel is required .");
  if (TYPE(rb_ch) == T_SYMBOL)
    rb_ch = rb_sym2str(rb_ch);
  Check_Type(rb_ch, T_STRING);

  if (rb_engine == Qfalse) {
    engine = FIO_PUBSUB_PROCESS;
  } else if (rb_engine != Qnil) {
    // collect engine object
    iodine_pubsub_s *e = iodine_pubsub_CData(rb_engine);
    if (e) {
      engine = e->engine;
    }
  }

  fio_publish(.engine = engine, .channel = IODINE_RSTRINFO(rb_ch),
              .message = IODINE_RSTRINFO(rb_msg));
  return Qtrue;
  (void)self;
}

.runObject

Runs a block of code asyncronously (adds the code to the event queue).

Always returns the block of code to executed (Proc object).

Code will be executed only while Iodine is running (after start).

Code blocks that where scheduled to run before Iodine enters cluster mode will run on all child processes.



202
203
204
205
206
207
208
# File 'ext/iodine_ext/iodine_defer.c', line 202

static VALUE iodine_defer_run(VALUE self) {
  rb_need_block();
  VALUE block = IodineStore.add(rb_block_proc());
  fio_defer(iodine_defer_performe_once, (void *)block, NULL);
  return block;
  (void)self;
}

.run_after(milliseconds) ⇒ Object

Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).

Tasks scheduled before calling start will run once for every process.

Always returns a copy of the block object.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'ext/iodine_ext/iodine_defer.c', line 218

static VALUE iodine_defer_run_after(VALUE self, VALUE milliseconds) {
  (void)(self);
  if (milliseconds == Qnil) {
    return iodine_defer_run(self);
  }
  if (TYPE(milliseconds) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "milliseconds must be a number");
    return Qnil;
  }
  size_t milli = FIX2UINT(milliseconds);
  if (milli == 0) {
    return iodine_defer_run(self);
  }
  // requires a block to be passed
  rb_need_block();
  VALUE block = rb_block_proc();
  if (block == Qnil)
    return Qfalse;
  IodineStore.add(block);
  if (fio_run_every(milli, 1, iodine_defer_run_timer, (void *)block,
                    (void (*)(void *))IodineStore.remove) == -1) {
    perror("ERROR: Iodine couldn't initialize timer");
    return Qnil;
  }
  return block;
}

.run_every(*args) ⇒ Object

Runs the required block after the specified number of milliseconds have passed. Time is counted only once Iodine started running (using start).

Accepts:

:milliseconds the number of milliseconds between event repetitions.
:repetitions the number of event repetitions. Defaults to 0 (never ending).
:block (required) a block is required, as otherwise there is nothing to

perform.

The event will repeat itself until the number of repetitions had been delpeted.

Always returns a copy of the block object.



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'ext/iodine_ext/iodine_defer.c', line 263

static VALUE iodine_defer_run_every(int argc, VALUE *argv, VALUE self) {
  // clang-format on
  (void)(self);
  VALUE milliseconds, repetitions, block;

  rb_scan_args(argc, argv, "11&", &milliseconds, &repetitions, &block);

  if (TYPE(milliseconds) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "milliseconds must be a number.");
    return Qnil;
  }
  if (repetitions != Qnil && TYPE(repetitions) != T_FIXNUM) {
    rb_raise(rb_eTypeError, "repetitions must be a number or `nil`.");
    return Qnil;
  }

  size_t milli = FIX2UINT(milliseconds);
  size_t repeat = (repetitions == Qnil) ? 0 : FIX2UINT(repetitions);
  // requires a block to be passed
  rb_need_block();
  IodineStore.add(block);
  if (fio_run_every(milli, repeat, iodine_defer_run_timer, (void *)block,
                    (void (*)(void *))IodineStore.remove) == -1) {
    perror("ERROR: Iodine couldn't initialize timer");
    return Qnil;
  }
  return block;
}

.running?Boolean

Returns true if Iodine is currently running a server

Returns:

  • (Boolean)


321
322
323
324
325
326
327
# File 'ext/iodine_ext/iodine.c', line 321

static VALUE iodine_running(VALUE self) {
  if (fio_is_running()) {
    return Qtrue;
  } else {
    return Qfalse;
  }
}

.startObject

This will block the calling (main) thread and start the Iodine reactor.

When using cluster mode (2 or more worker processes), it is important that no other threads are active.

For many reasons, fork should NOT be called while multi-threading, so cluster mode must always be initiated from the main thread in a single thread environment.

For information about why forking in multi-threaded environments should be avoided, see (for example): http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'ext/iodine_ext/iodine.c', line 272

static VALUE iodine_start(VALUE self) {
  if (fio_is_running()) {
    rb_raise(rb_eRuntimeError, "Iodine already running!");
  }
  IodineCaller.set_GVL(1);
  VALUE threads_rb = iodine_threads_get(self);
  VALUE workers_rb = iodine_workers_get(self);
  iodine_start_params_s params = {
      .threads = NUM2SHORT(threads_rb),
      .workers = NUM2SHORT(workers_rb),
  };
  iodine_print_startup_message(params);
  IodineCaller.leaveGVL(iodine_run_outside_GVL, &params);
  return self;
}

.stopObject

This will stop the iodine server, shutting it down.

If called within a worker process (rather than the root/master process), this will cause a hot-restart for the worker.



294
295
296
297
# File 'ext/iodine_ext/iodine.c', line 294

static VALUE iodine_stop(VALUE self) {
  fio_stop();
  return self;
}

.subscribe(*args) ⇒ Object

Subscribes to a Pub/Sub stream / channel or replaces an existing subscription.

The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:

  subscribe("my_stream") {|source, msg| p msg }
  subscribe("my_strea*", match: :redis) {|source, msg| p msg }
  subscribe(to: "my_stream")  {|source, msg| p msg }
  # or use any object that answers `#call(source, msg)`
  MyProc = Proc.new {|source, msg| p msg }
  subscribe to: "my_stream", match: :redis, handler: MyProc

The first argument must be either a String or a Hash.

The second, optional, argument must be a Hash (if given).

The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):

  • :match - The channel / subject name matching type to be used. Valid value is: :redis. Future versions hope to support :nats and :rabbit patern matching as well.
  • :to - The channel / subject to subscribe to.
  • :as - (only for WebSocket connections) accepts the optional value :binary. default is :text. Note that binary transmissions are illegal for some connections (such as SSE) and an attempted binary subscription will fail for these connections.
  • :handler - Any object that answers .call(source, msg) where source is the stream / channel name.

Note: if an existing subscription with the same name exists, it will be replaced by this new subscription.

Returns the name of the subscription, which matches the name be used in unsubscribe (or nil on failure).



603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
# File 'ext/iodine_ext/iodine_connection.c', line 603

static VALUE iodine_pubsub_subscribe(int argc, VALUE *argv, VALUE self) {
  // clang-format on
  iodine_sub_args_s args = iodine_subscribe_args(argc, argv);
  if (args.channel == Qnil) {
    return Qnil;
  }
  iodine_connection_data_s *c = NULL;
  if (TYPE(self) == T_MODULE) {
    if (!args.block) {
      rb_raise(rb_eArgError,
               "block or :handler required for local subscriptions.");
    }
  } else {
    c = iodine_connection_validate_data(self);
    if (!c || (c->info.type == IODINE_CONNECTION_SSE && args.binary)) {
      if (args.block) {
        IodineStore.remove(args.block);
      }
      return Qnil; /* cannot subscribe a closed / invalid connection. */
    }
    if (args.block == Qnil) {
      if (c->info.type == IODINE_CONNECTION_WEBSOCKET)
        websocket_optimize4broadcasts((args.binary
                                           ? WEBSOCKET_OPTIMIZE_PUBSUB_BINARY
                                           : WEBSOCKET_OPTIMIZE_PUBSUB),
                                      1);
      if (args.binary) {
        args.block = Qtrue;
      }
    }
    fio_atomic_add(&c->ref, 1);
  }

  subscription_s *sub =
      fio_subscribe(.channel = IODINE_RSTRINFO(args.channel),
                    .on_message = iodine_on_pubsub,
                    .on_unsubscribe = iodine_on_unsubscribe, .udata1 = c,
                    .udata2 = (void *)args.block, .match = args.pattern);
  if (c) {
    fio_lock(&c->lock);
    if (c->info.uuid == -1) {
      fio_unsubscribe(sub);
      fio_unlock(&c->lock);
      return Qnil;
    }
    iodine_sub_add(&c->subscriptions, sub);
    fio_unlock(&c->lock);
  } else {
    fio_lock(&sub_lock);
    iodine_sub_add(&sub_global, sub);
    fio_unlock(&sub_lock);
  }
  return args.channel;
}

.threadsFixNum

Returns the number of worker threads that will be used when start is called.

Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.

Zero values promise nothing (iodine will decide what to do with them).

Returns:

  • (FixNum)

    Thread Count



124
125
126
127
128
129
# File 'ext/iodine_ext/iodine.c', line 124

static VALUE iodine_threads_get(VALUE self) {
  VALUE i = rb_ivar_get(self, rb_intern2("@threads", 8));
  if (i == Qnil)
    i = INT2NUM(0);
  return i;
}

.threads=(val) ⇒ Object

Sets the number of worker threads that will be used when start is called.

Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.

Zero values promise nothing (iodine will decide what to do with them).

Parameters:

  • thread_count (FixNum)

    The number of worker threads to use



142
143
144
145
146
147
148
149
# File 'ext/iodine_ext/iodine.c', line 142

static VALUE iodine_threads_set(VALUE self, VALUE val) {
  Check_Type(val, T_FIXNUM);
  if (NUM2SSIZET(val) >= (1 << 12)) {
    rb_raise(rb_eRangeError, "requsted thread count is out of range.");
  }
  rb_ivar_set(self, rb_intern2("@threads", 8), val);
  return val;
}

.unsubscribe(name) ⇒ Object

Unsubscribes from a Pub/Sub stream / channel.

The method accepts a single arguments, the name used for the subscription. i.e.:

  subscribe("my_stream") {|source, msg| p msg }
  unsubscribe("my_stream")

Returns true if the subscription was found.

Returns false if the subscription didn't exist.



671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
# File 'ext/iodine_ext/iodine_connection.c', line 671

static VALUE iodine_pubsub_unsubscribe(VALUE self, VALUE name) {
  // clang-format on
  iodine_connection_data_s *c = NULL;
  fio_lock_i *s_lock = &sub_lock;
  fio_subhash_s *subs = &sub_global;
  VALUE ret;
  if (TYPE(self) != T_MODULE) {
    c = iodine_connection_validate_data(self);
    if (!c || c->info.uuid == -1) {
      return Qnil; /* cannot unsubscribe a closed connection. */
    }
    s_lock = &c->lock;
    subs = &c->subscriptions;
  }
  fio_lock(s_lock);
  ret = iodine_sub_unsubscribe(subs, IODINE_RSTRINFO(name));
  fio_unlock(s_lock);
  return ret;
}

.verbosityFixNum

Gets the logging level used for Iodine messages.

Levels range from 0-5, where:

0 == Quite (no messages) 1 == Fatal Errors only. 2 == Errors only (including fatal errors). 3 == Warnings and errors only. 4 == Informational messages, warnings and errors (default). 5 == Everything, including debug information.

Logging is always performed to the process's STDERR and can be piped away.

NOTE: this does NOT effect HTTP logging.

Returns:

  • (FixNum)

    Logging Level



169
170
171
172
# File 'ext/iodine_ext/iodine.c', line 169

static VALUE iodine_logging_get(VALUE self) {
  return INT2FIX(FIO_LOG_LEVEL);
  (void)self;
}

.verbosity=(val) ⇒ Object

Gets the logging level used for Iodine messages.

Levels range from 0-5, where:

0 == Quite (no messages) 1 == Fatal Errors only. 2 == Errors only (including fatal errors). 3 == Warnings and errors only. 4 == Informational messages, warnings and errors (default). 5 == Everything, including debug information.

Logging is always performed to the process's STDERR and can be piped away.

NOTE: this does NOT effect HTTP logging.

Parameters:

  • log_level (FixNum)

    Sets the logging level



192
193
194
195
196
# File 'ext/iodine_ext/iodine.c', line 192

static VALUE iodine_logging_set(VALUE self, VALUE val) {
  Check_Type(val, T_FIXNUM);
  FIO_LOG_LEVEL = FIX2INT(val);
  return self;
}

.worker?Boolean

Returns true if this process is a worker process or if iodine is running in a single process mode (the master is also a worker), false otherwise.

Returns:

  • (Boolean)


314
315
316
# File 'ext/iodine_ext/iodine.c', line 314

static VALUE iodine_worker_is(VALUE self) {
  return fio_is_master() ? Qtrue : Qfalse;
}

.workersFixNum

Returns the number of worker processes that will be used when start is called.

Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.

Zero values promise nothing (iodine will decide what to do with them).

1 == single process mode, the msater process acts as a worker process.

Returns:

  • (FixNum)

    Worker Count



211
212
213
214
215
216
# File 'ext/iodine_ext/iodine.c', line 211

static VALUE iodine_workers_get(VALUE self) {
  VALUE i = rb_ivar_get(self, rb_intern2("@workers", 8));
  if (i == Qnil)
    i = INT2NUM(0);
  return i;
}

.workers=(val) ⇒ Object

Sets the number of worker processes that will be used when start is called.

Negative numbers are translated as fractions of the number of CPU cores. i.e., -2 == half the number of detected CPU cores.

Zero values promise nothing (iodine will decide what to do with them).

1 == single process mode, the msater process acts as a worker process.

Parameters:

  • worker_count (FixNum)

    Number of worker processes



231
232
233
234
235
236
237
238
# File 'ext/iodine_ext/iodine.c', line 231

static VALUE iodine_workers_set(VALUE self, VALUE val) {
  Check_Type(val, T_FIXNUM);
  if (NUM2SSIZET(val) >= (1 << 9)) {
    rb_raise(rb_eRangeError, "requsted worker process count is out of range.");
  }
  rb_ivar_set(self, rb_intern2("@workers", 8), val);
  return val;
}