Class: Agoo::Upgraded
- Inherits:
-
Object
- Object
- Agoo::Upgraded
- Defined in:
- ext/agoo/rupgraded.c
Instance Method Summary collapse
-
#close ⇒ Object
call-seq: close().
-
#env ⇒ Object
call-seq: env().
-
#open? ⇒ Boolean
call-seq: open?().
-
#pending ⇒ Object
call-seq: pending().
-
#protocol ⇒ Object
call-seq: protocol().
-
#publish ⇒ Object
Use the publish from the Agoo module.
-
#subscribe(subject) ⇒ Object
call-seq: subscribe(subject).
-
#unsubscribe(*args) ⇒ Object
call-seq: unsubscribe(subject=nil).
-
#write(msg) ⇒ Object
call-seq: write(msg).
Instance Method Details
#close ⇒ Object
call-seq: close()
Closes the connections associated with the handler.
147 148 149 150 151 152 153 154 155 |
# File 'ext/agoo/rupgraded.c', line 147
static VALUE
rup_close(VALUE self) {
agooUpgraded up = get_upgraded(self);
if (NULL != up) {
agoo_upgraded_close(up, false);
}
return Qnil;
}
|
#env ⇒ Object
call-seq: env()
Returns the environment passed to the call method that initiated the agooUpgraded Object creation.
270 271 272 273 274 275 276 277 278 |
# File 'ext/agoo/rupgraded.c', line 270
static VALUE
env(VALUE self) {
agooUpgraded up = get_upgraded(self);
if (NULL != up) {
return (VALUE)up->env;
}
return Qnil;
}
|
#open? ⇒ Boolean
call-seq: open?()
Returns true if the connection is open and false otherwise.
182 183 184 185 186 187 188 189 190 191 192 |
# File 'ext/agoo/rupgraded.c', line 182
static VALUE
rup_open(VALUE self) {
agooUpgraded up = get_upgraded(self);
int pending = -1;
if (NULL != up) {
pending = (int)(long)atomic_load(&up->pending);
atomic_fetch_sub(&up->ref_cnt, 1);
}
return 0 <= pending ? Qtrue : Qfalse;
}
|
#pending ⇒ Object
call-seq: pending()
Returns the number of pending WebSocket or SSE writes. If the connection is closed then -1 is returned.
164 165 166 167 168 169 170 171 172 173 174 |
# File 'ext/agoo/rupgraded.c', line 164
static VALUE
rup_pending(VALUE self) {
agooUpgraded up = get_upgraded(self);
int pending = -1;
if (NULL != up) {
pending = agoo_upgraded_pending(up);
atomic_fetch_sub(&up->ref_cnt, 1);
}
return INT2NUM(pending);
}
|
#protocol ⇒ Object
call-seq: protocol()
Returns the protocol of the upgraded connection as either :websocket or :sse. If no longer connected nil is returned.
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'ext/agoo/rupgraded.c', line 201
static VALUE
rup_protocol(VALUE self) {
VALUE pro = Qnil;
if (agoo_server.active) {
agooUpgraded up;
pthread_mutex_lock(&agoo_server.up_lock);
if (NULL != (up = DATA_PTR(self)) && NULL != up->con) {
switch (up->con->bind->kind) {
case AGOO_CON_WS:
pro = websocket_sym;
break;
case AGOO_CON_SSE:
pro = sse_sym;
break;
default:
break;
}
}
pthread_mutex_unlock(&agoo_server.up_lock);
}
return pro;
}
|
#publish ⇒ Object
Use the publish from the Agoo module.
261 |
# File 'ext/agoo/rupgraded.c', line 261
extern VALUE ragoo_publish(VALUE self, VALUE subject, VALUE message);
|
#subscribe(subject) ⇒ Object
call-seq: subscribe(subject)
Subscribes to messages published on the specified subject. The subject is a dot delimited string that can include a ‘*’ character as a wild card that matches any set of characters. The ‘>’ character matches all remaining characters. Examples: people.fred.log, people.*.log, people.fred.>
Symbols can also be used as can any other object that responds to #to_s.
105 106 107 108 109 110 111 112 113 114 115 |
# File 'ext/agoo/rupgraded.c', line 105
static VALUE
rup_subscribe(VALUE self, VALUE subject) {
agooUpgraded up;
int slen;
const char *subj = extract_subject(subject, &slen);
if (NULL != (up = get_upgraded(self))) {
agoo_upgraded_subscribe(up, subj, slen, false);
}
return Qnil;
}
|
#unsubscribe(*args) ⇒ Object
call-seq: unsubscribe(subject=nil)
Unsubscribes to messages on the provided subject. If the subject is nil then all subscriptions for the object are removed.
Symbols can also be used as can any other object that responds to #to_s.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'ext/agoo/rupgraded.c', line 126
static VALUE
rup_unsubscribe(int argc, VALUE *argv, VALUE self) {
agooUpgraded up;
const char *subject = NULL;
int slen = 0;
if (0 < argc) {
subject = extract_subject(argv[0], &slen);
}
if (NULL != (up = get_upgraded(self))) {
agoo_upgraded_unsubscribe(up, subject, slen, false);
}
return Qnil;
}
|
#write(msg) ⇒ Object
call-seq: write(msg)
Writes a message to the WebSocket or SSE connection. Returns true if the message has been queued and false otherwise. A closed connection or too many pending messages could cause a value of false to be returned.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'ext/agoo/rupgraded.c', line 69
static VALUE
rup_write(VALUE self, VALUE msg) {
agooUpgraded up = get_upgraded(self);
const char *message;
size_t mlen;
bool bin = false;
if (NULL == up) {
return Qfalse;
}
if (T_STRING == rb_type(msg)) {
message = StringValuePtr(msg);
mlen = RSTRING_LEN(msg);
if (RB_ENCODING_IS_ASCII8BIT(msg)) {
bin = true;
}
} else {
volatile VALUE rs = rb_funcall(msg, to_s_id, 0);
message = StringValuePtr(rs);
mlen = RSTRING_LEN(rs);
}
return agoo_upgraded_write(up, message, mlen, bin, false) ? Qtrue : Qfalse;
}
|