Module: Iodine::Protocol
- Defined in:
- lib/iodine/protocol.rb,
ext/iodine/iodine_core.c
Overview
The Protocol class is used only for documenting the Protocol API, it will not be included when requiring ‘iodine`.
A dynamic (stateful) protocol is defined as a Ruby class instance which is in control of one single connection.
It is called dynamic because it is dynamically allocated for each connection and then discarded, also it sounded better then calling it “the stateful protocol”, even though that’s what it actually is.
It is (mostly) thread-safe as long as it’s operations are limited to the scope of the object.
The Callbacks
A protocol class MUST contain ONE of the following callbacks:
- on_data
-
called whened there’s data available to be read, but no data was read just yet. ‘on_data` will not be called again untill all the existing network buffer was read (edge triggered event).
- on_message(buffer)
-
the default ‘on_data` implementation creates a 1Kb buffer and reads data while recycling the same String memory space. The buffer is forwarded to the `on_message` callback before being recycled. The buffer object will be over-written once `on_message` returns, so creating a persistent copy requires `buffer.dup`.
A protocol class MAY contain any of the following optional callbacks:
- on_open
-
called after a new connection was accepted and the protocol was linked with Iodine’s Protocol API. Initialization should be performed here.
- ping
-
called whenever timeout was reached. The default implementation will close the connection unless a protocol task (#defer, ‘on_data` or `on_message`) are busy in the background.
- on_shutdown
-
called if the connection is still open while the server is shutting down. This allows the protocol to send a “going away” frame before the connection is closed and ‘on_close` is called.
- on_close
-
called after a connection was closed, for any cleanup (if any).
WARNING: for thread safety and connection management, ‘on_open`, `on_shutdown`, `on_close` and `ping` will all be performed within the reactor’s main thread. Do not run long running tasks within these callbacks, or the server might block while you do. Use #defer to run protocol related tasks (this locks the connection, preventing it from running more then one task at a time and offering thread safety), or #run to run asynchronous tasks that aren’t protocol related.
The API:
After a new connection is accepted and a new protocol object is created, the protocol will be linked with Iodine’s Protocol API. Only the main protocol will be able to access the API within ‘initialize`, so it’s best to use ‘on_open` for any Initialization required.
Instance Method Summary collapse
-
#close ⇒ Object
Closes a connection.
-
#defer ⇒ Object
Runs the required block later (defers the blocks execution).
-
#each ⇒ Object
Runs the required block for each dynamic protocol connection except this one.
-
#on_close ⇒ Object
implement this callback to handle the event.
-
#on_data ⇒ Object
A default on_data implementation will read up to 1Kb into a reusable buffer from the socket and call the ‘on_message` callback.
-
#on_message(data) ⇒ Object
implement this callback to handle the event.
-
#on_open ⇒ Object
implement this callback to handle the event.
-
#on_ready ⇒ Object
implement this callback to handle the event.
-
#on_shutdown ⇒ Object
implement this callback to handle the event.
-
#ping ⇒ Object
Implement this callback to handle the event.
-
#read(*args) ⇒ Object
Reads ‘n` bytes from the network connection.
- #upgrade(handler) ⇒ Object
-
#write(data) ⇒ Object
Writes data to the connection.
-
#write_urgent(data) ⇒ Object
Writes data to the connection.
Instance Method Details
#close ⇒ Object
Closes a connection.
The connection will be closed only once all the data was sent.
Returns self.
144 145 146 147 148 |
# File 'ext/iodine/iodine_core.c', line 144 static VALUE dyn_close(VALUE self) { intptr_t fd = iodine_get_fd(self); sock_close(fd); return self; } |
#defer ⇒ Object
Runs the required block later (defers the blocks execution).
Unlike Iodine#run, the block will *not run concurrently with any other callback for this object (except ‘ping` and `on_ready`).
Also, unlike Iodine#run, the block will not be called unless the connection remains open at the time it’s execution is scheduled.
Always returns ‘self`.
174 175 176 177 178 179 180 181 182 183 184 |
# File 'ext/iodine/iodine_core.c', line 174
static VALUE dyn_defer(VALUE self) {
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
intptr_t fd = iodine_get_fd(self);
server_task(fd, dyn_perform_defer, (void *)block, dyn_defer_fallback);
return self;
}
|
#each ⇒ Object
Runs the required block for each dynamic protocol connection except this one.
Tasks will be performed within each connections lock, so no connection will have more then one task being performed at the same time (similar to #defer).
Also, unlike Iodine.run, the block will not be called unless the connection remains open at the time it’s execution is scheduled.
Always returns ‘self`.
212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'ext/iodine/iodine_core.c', line 212
static VALUE dyn_each(VALUE self) {
// requires a block to be passed
rb_need_block();
VALUE block = rb_block_proc();
if (block == Qnil)
return Qfalse;
Registry.add(block);
intptr_t fd = iodine_get_fd(self);
server_each(fd, iodine_protocol_service, dyn_perform_each_task, (void *)block,
dyn_finish_each_task);
return self;
}
|
#on_close ⇒ Object
implement this callback to handle the event.
266 |
# File 'ext/iodine/iodine_core.c', line 266 static VALUE not_implemented(VALUE self) { return Qnil; } |
#on_data ⇒ Object
A default on_data implementation will read up to 1Kb into a reusable buffer from the socket and call the ‘on_message` callback.
It is recommended that you implement this callback if messages might require more then 1Kb of space.
277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'ext/iodine/iodine_core.c', line 277
static VALUE default_on_data(VALUE self) {
VALUE buff = rb_ivar_get(self, buff_var_id);
if (buff == Qnil) {
rb_ivar_set(self, buff_var_id, (buff = rb_str_buf_new(1024)));
}
do {
dyn_read(1, &buff, self);
if (!RSTRING_LEN(buff))
return Qnil;
rb_funcall(self, on_message_func_id, 1, buff);
} while (RSTRING_LEN(buff) == rb_str_capacity(buff));
return Qnil;
}
|
#on_message(data) ⇒ Object
implement this callback to handle the event.
268 |
# File 'ext/iodine/iodine_core.c', line 268
static VALUE not_implemented2(VALUE self, VALUE data) { return Qnil; }
|
#on_open ⇒ Object
implement this callback to handle the event.
266 |
# File 'ext/iodine/iodine_core.c', line 266 static VALUE not_implemented(VALUE self) { return Qnil; } |
#on_ready ⇒ Object
implement this callback to handle the event.
266 |
# File 'ext/iodine/iodine_core.c', line 266 static VALUE not_implemented(VALUE self) { return Qnil; } |
#on_shutdown ⇒ Object
implement this callback to handle the event.
266 |
# File 'ext/iodine/iodine_core.c', line 266 static VALUE not_implemented(VALUE self) { return Qnil; } |
#ping ⇒ Object
Implement this callback to handle the event. The default implementation will close the connection.
261 262 263 264 |
# File 'ext/iodine/iodine_core.c', line 261 static VALUE not_implemented_ping(VALUE self) { sock_close(iodine_get_fd(self)); return Qnil; } |
#read(*args) ⇒ Object
Reads ‘n` bytes from the network connection. The number of bytes to be read (n) is:
-
the number of bytes set in the optional ‘buffer_or_length` argument.
-
the String capacity (not length) of the String passed as the optional ‘buffer_or_length` argument.
-
1024 Bytes (1Kb) if the optional ‘buffer_or_length` is either missing or contains a String who’s capacity is less then 1Kb.
Returns a String (either the same one used as the buffer or a new one) on a successful read. Returns ‘nil` if no data was available.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'ext/iodine/iodine_core.c', line 58
static VALUE dyn_read(int argc, VALUE *argv, VALUE self) {
if (argc > 1) {
rb_raise(
rb_eArgError,
"read accepts only one argument - a Fixnum (buffer length) or a String "
"(it's capacity - or 1Kb, whichever's the higher - will be used as "
"buffer's length).");
return Qnil;
}
VALUE buffer = (argc == 1 ? argv[0] : Qnil);
if (buffer != Qnil && TYPE(buffer) != T_FIXNUM && TYPE(buffer) != T_STRING) {
rb_raise(rb_eTypeError,
"buffer should either be a length (a new string will be created) "
"or a string (reading will be limited to the original string's "
"capacity or 1Kb - whichever the larger).");
return Qnil;
}
VALUE str;
long len;
intptr_t fd = iodine_get_fd(self);
if (buffer == Qnil) {
buffer = LONG2FIX(1024);
}
if (TYPE(buffer) == T_FIXNUM) {
len = FIX2LONG(buffer);
if (len <= 0)
len = 1024;
str = rb_str_buf_new(len);
// create a rb_String with X length and take it's pointer
// rb_str_resize(VALUE str, long len)
// RSTRING_PTR(str)
} else {
// take the string's pointer and length
len = rb_str_capacity(buffer);
// make sure the string is modifiable
rb_str_modify(buffer);
// resize te string if needed.
if (len < 1024)
rb_str_resize(buffer, (len = 1024));
str = buffer;
}
ssize_t in = sock_read(fd, RSTRING_PTR(str), len);
// make sure it's binary encoded
rb_enc_associate_index(str, BinaryEncodingIndex);
// set actual size....
if (in > 0)
rb_str_set_len(str, (long)in);
else {
rb_str_set_len(str, 0);
str = Qnil;
}
// return empty string? or fix above if to return Qnil?
return str;
}
|
#upgrade(handler) ⇒ Object
251 252 253 |
# File 'ext/iodine/iodine_core.c', line 251
VALUE dyn_upgrade(VALUE self, VALUE handler) {
return iodine_upgrade2basic(iodine_get_fd(self), handler);
}
|
#write(data) ⇒ Object
Writes data to the connection. Returns ‘false` on error and `self` on success.
116 117 118 119 120 121 |
# File 'ext/iodine/iodine_core.c', line 116
static VALUE dyn_write(VALUE self, VALUE data) {
intptr_t fd = iodine_get_fd(self);
if (sock_write(fd, RSTRING_PTR(data), RSTRING_LEN(data)))
return Qfalse;
return self;
}
|
#write_urgent(data) ⇒ Object
Writes data to the connection. The data will be sent as soon as possible without fragmantation of previously scheduled data.
Returns ‘false` on error and `self` on success.
129 130 131 132 133 134 135 |
# File 'ext/iodine/iodine_core.c', line 129
static VALUE dyn_write_urgent(VALUE self, VALUE data) {
intptr_t fd = iodine_get_fd(self);
if (sock_write2(.fduuid = fd, .buffer = RSTRING(data),
.length = RSTRING_LEN(data), .urgent = 1))
return Qfalse;
return self;
}
|