Module: Iodine::PubSub
- Defined in:
- lib/iodine/pubsub.rb,
lib/iodine.rb,
ext/iodine_ext/iodine_pubsub.c
Overview
From Wikipedia: publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead characterize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.
Iodine is equiped with an internal pub/sub service that allows improved resource management from a deployment perspective.
The common paradigm, which is implemented by pub/sub services like Redis, is for a "client" to "subscribe" to one or more "channels". Messages are streamed to these "channels" by different "publishers" (the application / other clients) and are broadcasted to the "clients" through their "subscription".
Iodine's approach it to offload pub/sub resource costs from the pub/sub service (which is usually expensive to scale) onto the application layer.
For example, the default (nil
) pub/sub Engine implements
an internal pub/sub service that manages subscriptions (clients and channels) throughout an Iodine process cluster without any need to connect to an external pub/sub service.
If Iodine was runninng with 8 processes and 16 threads per process, a publishing in process A will be delivered to subscribers in process B.
In addition, by inheriting the Engine class, it's easy to create pub/sub engines that connect to this
underlying pub/sub service. This means that Iodine will call the engine's subscribe
method only once per
channel and once messages arrive, Iodine will distribute the messages to all the subscribed clients.
Defined Under Namespace
Constant Summary collapse
- CLUSTER =
CLUSTER publishes data to all the subscribers in the process cluster.
iodine_pubsub_make_C_engine(FIO_PUBSUB_CLUSTER)
- PROCESS =
PROCESS publishes data to all the subscribers in a single process.
iodine_pubsub_make_C_engine(FIO_PUBSUB_PROCESS)
- SIBLINGS =
SIBLINGS publishes data to all the subscribers in the other processes process.
iodine_pubsub_make_C_engine(FIO_PUBSUB_SIBLINGS)
- PUBLISH2ROOT =
PUBLISH2ROOT publishes data only to the root / master process.
iodine_pubsub_make_C_engine(FIO_PUBSUB_ROOT)
Class Method Summary collapse
-
.attach(engine) ⇒ Object
Attaches an Engine to the pub/sub system (more than a single engine can be attached at the same time).
-
.default ⇒ Object
Returns the default Engine for pub/sub methods.
-
.default=(engine) ⇒ Object
Sets the default Engine for pub/sub methods.
-
.detach(engine) ⇒ Object
Removes an Engine from the pub/sub system.
-
.dettach(engine) ⇒ Object
deprecated
Deprecated.
use PubSub.detach.
-
.reset(engine) ⇒ Object
Forces Iodine to call the Engine#subscribe callback for all existing subscriptions (i.e., when reconnecting to a Pub/Sub backend such as Redis).
Class Method Details
.attach(engine) ⇒ Object
Attaches an Engine to the pub/sub system (more than a single engine can be attached at the same time).
After an engine was attached, it's callbacks will be called (Iodine::PubSub::Engine#subscribe and Iodine::PubSub::Engine#unsubscribe) in response to Pub/Sub events.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'ext/iodine_ext/iodine_pubsub.c', line 280
static VALUE iodine_pubsub_attach(VALUE self, VALUE engine) {
iodine_pubsub_s *e = iodine_pubsub_CData(engine);
if (!e) {
rb_raise(rb_eTypeError, "not a valid engine");
return Qnil;
}
if (e->handler == Qnil) {
e->handler = engine;
}
IodineStore.add(engine);
fio_pubsub_attach(e->engine);
return engine;
(void)self;
}
|
.default ⇒ Object
Returns the default Engine for pub/sub methods.
263 264 265 266 267 268 269 270 |
# File 'ext/iodine_ext/iodine_pubsub.c', line 263
static VALUE iodine_pubsub_default_get(VALUE self) {
VALUE def = rb_ivar_get(self, rb_intern2("default_engine", 14));
if (def == Qnil) {
def = rb_const_get(self, rb_intern2("CLUSTER", 7));
iodine_pubsub_default_set(self, def);
}
return def;
}
|
.default=(engine) ⇒ Object
Sets the default Engine for pub/sub methods.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'ext/iodine_ext/iodine_pubsub.c', line 245
static VALUE iodine_pubsub_default_set(VALUE self, VALUE engine) {
if (engine == Qnil) {
engine = rb_const_get(self, rb_intern2("CLUSTER", 7));
}
iodine_pubsub_s *e = iodine_pubsub_CData(engine);
if (!e) {
rb_raise(rb_eTypeError, "not a valid engine");
return Qnil;
}
if (e->handler == Qnil) {
e->handler = engine;
}
FIO_PUBSUB_DEFAULT = e->engine;
rb_ivar_set(self, rb_intern2("default_engine", 14), engine);
return engine;
}
|
.detach(engine) ⇒ Object
Removes an Engine from the pub/sub system.
After an Engine was detached, Iodine will no longer call the Engine's callbacks (Iodine::PubSub::Engine#subscribe and Iodine::PubSub::Engine#unsubscribe)
302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'ext/iodine_ext/iodine_pubsub.c', line 302
static VALUE iodine_pubsub_detach(VALUE self, VALUE engine) {
iodine_pubsub_s *e = iodine_pubsub_CData(engine);
if (!e) {
rb_raise(rb_eTypeError, "not a valid engine");
return Qnil;
}
if (e->handler == Qnil) {
e->handler = engine;
}
IodineStore.remove(engine);
fio_pubsub_detach(e->engine);
return engine;
(void)self;
}
|
.dettach(engine) ⇒ Object
use detach.
128 129 130 131 |
# File 'lib/iodine.rb', line 128 def self.dettach(engine) warn "Iodine::PubSub.dettach is deprecated (was a typo), use Iodine::PubSub.detach(engine)." Iodine::PubSub.detach(engine) end |
.reset(engine) ⇒ Object
Forces Iodine to call the Iodine::PubSub::Engine#subscribe callback for all existing subscriptions (i.e., when reconnecting to a Pub/Sub backend such as Redis).
322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'ext/iodine_ext/iodine_pubsub.c', line 322
static VALUE iodine_pubsub_reset(VALUE self, VALUE engine) {
iodine_pubsub_s *e = iodine_pubsub_CData(engine);
if (!e) {
rb_raise(rb_eTypeError, "not a valid engine");
return Qnil;
}
if (e->handler == Qnil) {
e->handler = engine;
}
fio_pubsub_reattach(e->engine);
return engine;
(void)self;
}
|