Class: Iodine::Connection
- Inherits:
-
Object
- Object
- Iodine::Connection
- Defined in:
- lib/iodine/connection.rb,
ext/iodine/iodine_connection.c
Overview
Iodine's Connection class is the class that TCP/IP, WebSockets and SSE connections inherit from.
Instances of this class are passed to the callback objects. i.e.:
module MyConnectionCallbacks
# called when the callback object is linked with a new client
def on_open client
client.is_a?(Iodine::Connection) # => true
end
# called when data is available
def client, data
client.is_a?(Iodine::Connection) # => true
end
# called when the server is shutting down, before closing the client
# (it's still possible to send messages to the client)
def on_shutdown client
client.is_a?(Iodine::Connection) # => true
end
# called when the client is closed (no longer available)
def on_close client
client.is_a?(Iodine::Connection) # => true
end
# called when all the previous calls to `client.write` have completed
# (the local buffer was drained and is now empty)
def on_drained client
client.is_a?(Iodine::Connection) # => true
end
# called when timeout was reached, llowing a `ping` to be sent
def ping client
client.is_a?(Iodine::Connection) # => true
clint.close() # close connection on timeout is the default
end
# Allows the module to be used as a static callback object (avoiding object allocation)
extend self
end
All connection related actions can be performed using the methods provided through this class.
Instance Method Summary collapse
-
#close ⇒ Object
Schedules the connection to be closed.
-
#env ⇒ Object
Returns the connection's
env
(if it originated from an HTTP request). -
#handler ⇒ Object
Returns the client's current callback object.
-
#handler=(handler) ⇒ Object
Sets the client's callback object, so future events will use the new object's callbacks.
-
#open? ⇒ Boolean
Returns true if the connection appears to be open (no known issues).
-
#pending ⇒ Object
Returns the number of pending
write
operations that need to complete before the nexton_drained
callback is called. -
#protocol ⇒ Object
Returns the connection's protocol Symbol (
:sse
,:websocket
or:raw
). -
#publish(*args) ⇒ Object
Publishes a message to a channel.
-
#pubsub? ⇒ Boolean
Always returns true, since Iodine connections support the pub/sub extension.
-
#subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub stream / channel or replaces an existing subscription.
-
#timeout ⇒ Object
Returns the timeout /
ping
interval for the connection. -
#timeout=(timeout) ⇒ Object
Sets the timeout /
ping
interval for the connection (up to 255 seconds). -
#unsubscribe(name) ⇒ Object
Unsubscribes from a Pub/Sub stream / channel.
-
#write(data) ⇒ Object
Writes data to the connection asynchronously.
Instance Method Details
#close ⇒ Object
Schedules the connection to be closed.
The connection will be closed once all the scheduled write
operations have
been completed (or failed).
227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'ext/iodine/iodine_connection.c', line 227
static VALUE iodine_connection_close(VALUE self) {
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (c && !fio_is_closed(c->info.uuid)) {
if (c->info.type == IODINE_CONNECTION_WEBSOCKET) {
websocket_close(c->info.arg); /* sends WebSocket close packet */
} else {
fio_close(c->info.uuid);
}
}
return Qnil;
}
|
#env ⇒ Object
Returns the connection's env
(if it originated from an HTTP request).
332 333 334 335 336 337 338 |
# File 'ext/iodine/iodine_connection.c', line 332
static VALUE iodine_connection_env(VALUE self) {
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (c && c->info.env) {
return c->info.env;
}
return Qnil;
}
|
#handler ⇒ Object
Returns the client's current callback object.
343 344 345 346 347 348 349 350 351 352 |
# File 'ext/iodine/iodine_connection.c', line 343
static VALUE iodine_connection_handler_get(VALUE self) {
iodine_connection_data_s *data = iodine_connection_validate_data(self);
if (!data) {
FIO_LOG_DEBUG("(iodine) requested connection handler for "
"an invalid connection: %p",
(void *)self);
return Qnil;
}
return data->info.handler;
}
|
#handler=(handler) ⇒ Object
Sets the client's callback object, so future events will use the new object's callbacks.
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 |
# File 'ext/iodine/iodine_connection.c', line 360
static VALUE iodine_connection_handler_set(VALUE self, VALUE handler) {
// clang-format on
iodine_connection_data_s *data = iodine_connection_validate_data(self);
if (!data) {
FIO_LOG_DEBUG("(iodine) attempted to set a connection handler for "
"an invalid connection: %p",
(void *)self);
return Qnil;
}
if (handler == Qnil || handler == Qfalse) {
FIO_LOG_DEBUG(
"(iodine) called client.handler = nil, closing connection: %p",
(void *)self);
iodine_connection_close(self);
return Qnil;
}
if (data->info.handler != handler) {
uint8_t answers_on_open = (rb_respond_to(handler, on_open_id) != 0);
if (data->answers_on_close)
IodineCaller.call2(data->info.handler, on_close_id, 1, &self);
fio_lock(&data->lock);
data->info.handler = handler;
data->answers_on_open = answers_on_open,
data->answers_on_message = (rb_respond_to(handler, on_message_id) != 0),
data->answers_ping = (rb_respond_to(handler, ping_id) != 0),
data->answers_on_drained = (rb_respond_to(handler, on_drained_id) != 0),
data->answers_on_shutdown = (rb_respond_to(handler, on_shutdown_id) != 0),
data->answers_on_close = (rb_respond_to(handler, on_close_id) != 0),
fio_unlock(&data->lock);
if (answers_on_open) {
iodine_connection_fire_event(self, IODINE_CONNECTION_ON_OPEN, Qnil);
}
FIO_LOG_DEBUG("(iodine) switched handlers for connection: %p",
(void *)self);
}
return handler;
}
|
#open? ⇒ Boolean
Returns true if the connection appears to be open (no known issues).
240 241 242 243 244 245 246 |
# File 'ext/iodine/iodine_connection.c', line 240
static VALUE iodine_connection_is_open(VALUE self) {
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (c && !fio_is_closed(c->info.uuid)) {
return Qtrue;
}
return Qfalse;
}
|
#pending ⇒ Object
Returns the number of pending write
operations that need to complete
before the next on_drained
callback is called.
Returns -1 if the connection is closed and 0 if on_drained
won't be
scheduled (no pending write
).
262 263 264 265 266 267 268 |
# File 'ext/iodine/iodine_connection.c', line 262
static VALUE iodine_connection_pending(VALUE self) {
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (!c || fio_is_closed(c->info.uuid)) {
return INT2NUM(-1);
}
return SIZET2NUM((fio_pending(c->info.uuid)));
}
|
#protocol ⇒ Object
Returns the connection's protocol Symbol (:sse
, :websocket
or :raw
).
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'ext/iodine/iodine_connection.c', line 276
static VALUE iodine_connection_protocol_name(VALUE self) {
// clang-format on
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (c) {
switch (c->info.type) {
case IODINE_CONNECTION_WEBSOCKET:
return WebSocketSymbol;
break;
case IODINE_CONNECTION_SSE:
return SSESymbol;
break;
case IODINE_CONNECTION_RAW: /* fallthrough */
return RAWSymbol;
break;
}
}
return Qnil;
}
|
#publish(*args) ⇒ Object
Publishes a message to a channel.
Can be used using two Strings:
publish(to, )
The method accepts an optional engine
argument:
publish(to, , my_pubsub_engine)
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 |
# File 'ext/iodine/iodine_connection.c', line 708
static VALUE iodine_pubsub_publish(int argc, VALUE *argv, VALUE self) {
// clang-format on
VALUE rb_ch, rb_msg, rb_engine = Qnil;
const fio_pubsub_engine_s *engine = NULL;
switch (argc) {
case 3:
/* fallthrough */
rb_engine = argv[2];
case 2:
rb_ch = argv[0];
rb_msg = argv[1];
break;
case 1: {
/* single argument must be a Hash */
Check_Type(argv[0], T_HASH);
rb_ch = rb_hash_aref(argv[0], to_id);
if (rb_ch == Qnil || rb_ch == Qfalse) {
rb_ch = rb_hash_aref(argv[0], channel_id);
}
rb_msg = rb_hash_aref(argv[0], message_id);
rb_engine = rb_hash_aref(argv[0], engine_id);
} break;
default:
rb_raise(rb_eArgError, "method accepts 1-3 arguments.");
}
if (rb_msg == Qnil || rb_msg == Qfalse) {
rb_raise(rb_eArgError, "message is required.");
}
Check_Type(rb_msg, T_STRING);
if (rb_ch == Qnil || rb_ch == Qfalse)
rb_raise(rb_eArgError, "target / channel is required .");
if (TYPE(rb_ch) == T_SYMBOL)
rb_ch = rb_sym2str(rb_ch);
Check_Type(rb_ch, T_STRING);
if (rb_engine == Qfalse) {
engine = FIO_PUBSUB_PROCESS;
} else if (rb_engine != Qnil) {
// collect engine object
iodine_pubsub_s *e = iodine_pubsub_CData(rb_engine);
if (e) {
engine = e->engine;
}
}
fio_publish(.engine = engine, .channel = IODINE_RSTRINFO(rb_ch),
.message = IODINE_RSTRINFO(rb_msg));
return Qtrue;
(void)self;
}
|
#pubsub? ⇒ Boolean
Always returns true, since Iodine connections support the pub/sub extension.
251 252 253 254 |
# File 'ext/iodine/iodine_connection.c', line 251
static VALUE iodine_connection_is_pubsub(VALUE self) {
return INT2NUM(0);
(void)self;
}
|
#subscribe(*args) ⇒ Object
Subscribes to a Pub/Sub stream / channel or replaces an existing subscription.
The method accepts 1-2 arguments and an optional block. These are all valid ways to call the method:
subscribe("my_stream") {|source, msg| p msg }
subscribe("my_strea*", match: :redis) {|source, msg| p msg }
subscribe(to: "my_stream") {|source, msg| p msg }
# or use any object that answers `#call(source, msg)`
MyProc = Proc.new {|source, msg| p msg }
subscribe to: "my_stream", match: :redis, handler: MyProc
The first argument must be either a String or a Hash.
The second, optional, argument must be a Hash (if given).
The options Hash supports the following possible keys (other keys are ignored, all keys are Symbols):
:match
- The channel / subject name matching type to be used. Valid value is::redis
. Future versions hope to support:nats
and:rabbit
patern matching as well.:to
- The channel / subject to subscribe to.:as
- (only for WebSocket connections) accepts the optional value:binary
. default is:text
. Note that binary transmissions are illegal for some connections (such as SSE) and an attempted binary subscription will fail for these connections.:handler
- Any object that answers.call(source, msg)
where source is the stream / channel name.
Note: if an existing subscription with the same name exists, it will be replaced by this new subscription.
Returns the name of the subscription, which matches the name be used in #unsubscribe (or nil on failure).
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 |
# File 'ext/iodine/iodine_connection.c', line 604
static VALUE iodine_pubsub_subscribe(int argc, VALUE *argv, VALUE self) {
// clang-format on
iodine_sub_args_s args = iodine_subscribe_args(argc, argv);
if (args.channel == Qnil) {
return Qnil;
}
iodine_connection_data_s *c = NULL;
if (TYPE(self) == T_MODULE) {
if (!args.block) {
rb_raise(rb_eArgError,
"block or :handler required for local subscriptions.");
}
} else {
c = iodine_connection_validate_data(self);
if (!c || (c->info.type == IODINE_CONNECTION_SSE && args.binary)) {
if (args.block) {
IodineStore.remove(args.block);
}
return Qnil; /* cannot subscribe a closed / invalid connection. */
}
if (args.block == Qnil) {
if (c->info.type == IODINE_CONNECTION_WEBSOCKET)
websocket_optimize4broadcasts((args.binary
? WEBSOCKET_OPTIMIZE_PUBSUB_BINARY
: WEBSOCKET_OPTIMIZE_PUBSUB),
1);
if (args.binary) {
args.block = Qtrue;
}
}
fio_atomic_add(&c->ref, 1);
}
subscription_s *sub =
fio_subscribe(.channel = IODINE_RSTRINFO(args.channel),
.on_message = iodine_on_pubsub,
.on_unsubscribe = iodine_on_unsubscribe, .udata1 = c,
.udata2 = (void *)args.block, .match = args.pattern);
if (c) {
fio_lock(&c->lock);
if (c->info.uuid == -1) {
fio_unsubscribe(sub);
fio_unlock(&c->lock);
return Qnil;
}
iodine_sub_add(&c->subscriptions, sub);
fio_unlock(&c->lock);
} else {
fio_lock(&sub_lock);
iodine_sub_add(&sub_global, sub);
fio_unlock(&sub_lock);
}
return args.channel_org;
}
|
#timeout ⇒ Object
Returns the timeout / ping
interval for the connection.
Returns nil on error.
300 301 302 303 304 305 306 307 |
# File 'ext/iodine/iodine_connection.c', line 300
static VALUE iodine_connection_timeout_get(VALUE self) {
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (c && !fio_is_closed(c->info.uuid)) {
size_t tout = (size_t)fio_timeout_get(c->info.uuid);
return SIZET2NUM(tout);
}
return Qnil;
}
|
#timeout=(timeout) ⇒ Object
Sets the timeout / ping
interval for the connection (up to 255 seconds).
Returns nil on error.
314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'ext/iodine/iodine_connection.c', line 314
static VALUE iodine_connection_timeout_set(VALUE self, VALUE timeout) {
Check_Type(timeout, T_FIXNUM);
int tout = NUM2INT(timeout);
if (tout < 0 || tout > 255) {
rb_raise(rb_eRangeError, "timeout out of range.");
return Qnil;
}
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (c && !fio_is_closed(c->info.uuid)) {
fio_timeout_set(c->info.uuid, (uint8_t)tout);
return timeout;
}
return Qnil;
}
|
#unsubscribe(name) ⇒ Object
Unsubscribes from a Pub/Sub stream / channel.
The method accepts a single arguments, the name used for the subscription. i.e.:
subscribe("my_stream") {|source, msg| p msg }
unsubscribe("my_stream")
Returns true
if the subscription was found.
Returns false
if the subscription didn't exist.
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 |
# File 'ext/iodine/iodine_connection.c', line 672
static VALUE iodine_pubsub_unsubscribe(VALUE self, VALUE name) {
// clang-format on
iodine_connection_data_s *c = NULL;
fio_lock_i *s_lock = &sub_lock;
fio_subhash_s *subs = &sub_global;
VALUE ret;
if (TYPE(self) != T_MODULE) {
c = iodine_connection_validate_data(self);
if (!c || c->info.uuid == -1) {
return Qnil; /* cannot unsubscribe a closed connection. */
}
s_lock = &c->lock;
subs = &c->subscriptions;
}
if (TYPE(name) == T_SYMBOL)
name = rb_sym2str(name);
Check_Type(name, T_STRING);
fio_lock(s_lock);
ret = iodine_sub_unsubscribe(subs, IODINE_RSTRINFO(name));
fio_unlock(s_lock);
return ret;
}
|
#write(data) ⇒ Object
Writes data to the connection asynchronously. data
MUST be a String.
In effect, the write
call does nothing, it only schedules the data to be
sent and marks the data as pending.
Use #pending to test how many write
operations are pending completion
(on_drained(client)
will be called when they complete).
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'ext/iodine/iodine_connection.c', line 170
static VALUE iodine_connection_write(VALUE self, VALUE data) {
iodine_connection_data_s *c = iodine_connection_validate_data(self);
if (!c || fio_is_closed(c->info.uuid)) {
// don't throw exceptions - closed connections are unavoidable.
return Qnil;
// rb_raise(rb_eIOError, "Connection closed or invalid.");
}
if (!RB_TYPE_P(data, T_STRING)) {
VALUE tmp = data;
data = IodineCaller.call(data, iodine_to_s_id);
if (!RB_TYPE_P(data, T_STRING))
Check_Type(tmp, T_STRING);
rb_backtrace();
FIO_LOG_WARNING(
"`Iodine::Connection#write` was called with a non-String object.");
}
switch (c->info.type) {
case IODINE_CONNECTION_WEBSOCKET:
/* WebSockets*/
websocket_write(c->info.arg, IODINE_RSTRINFO(data),
rb_enc_get(data) == IodineUTF8Encoding);
return Qtrue;
break;
case IODINE_CONNECTION_SSE:
/* SSE */
#if 1
http_sse_write(c->info.arg, .data = IODINE_RSTRINFO(data));
return Qtrue;
#else
if (rb_enc_get(data) == IodineUTF8Encoding) {
http_sse_write(c->info.arg, .data = {.data = RSTRING_PTR(data),
.len = RSTRING_LEN(data)});
return Qtrue;
}
fprintf(stderr, "WARNING: ignoring an attept to write binary data to "
"non-binary protocol (SSE).\n");
return Qfalse;
// rb_raise(rb_eEncodingError,
// "This Connection type requires data to be UTF-8 encoded.");
#endif
break;
case IODINE_CONNECTION_RAW: /* fallthrough */
default: {
fio_write(c->info.uuid, RSTRING_PTR(data), RSTRING_LEN(data));
return Qtrue;
} break;
}
return Qnil;
}
|