Class: Libev::Scheduler
- Inherits:
-
Object
- Object
- Libev::Scheduler
- Defined in:
- lib/libev_scheduler.rb,
ext/libev_scheduler/scheduler.c
Instance Method Summary collapse
- #block(*args) ⇒ Object
-
#close ⇒ Object
fiber scheduler interface.
- #fiber(&block) ⇒ Object
-
#initialize ⇒ Object
constructor
of a blocking event loop (waking it up) in a thread-safe, signal-safe manner.
- #io_wait(io, events, timeout) ⇒ Object
- #kernel_sleep(duration = nil) ⇒ Object
- #pending_count ⇒ Object
- #process_wait(pid, flags) ⇒ Object
- #run ⇒ Object
- #unblock(blocker, fiber) ⇒ Object
Constructor Details
#initialize ⇒ Object
of a blocking event loop (waking it up) in a thread-safe, signal-safe manner
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'ext/libev_scheduler/scheduler.c', line 65
static VALUE Scheduler_initialize(VALUE self) {
Scheduler_t *scheduler;
VALUE thread = rb_thread_current();
int is_main_thread = (thread == rb_thread_main());
GetScheduler(self, scheduler);
scheduler->ev_loop = is_main_thread ? EV_DEFAULT : ev_loop_new(EVFLAG_NOSIGMASK);
ev_async_init(&scheduler->break_async, break_async_callback);
ev_async_start(scheduler->ev_loop, &scheduler->break_async);
ev_unref(scheduler->ev_loop); // don't count the break_async watcher
scheduler->pending_count = 0;
scheduler->currently_polling = 0;
scheduler->ready = rb_ary_new();
return Qnil;
}
|
Instance Method Details
#block(*args) ⇒ Object
150 151 152 153 154 155 156 157 158 159 |
# File 'ext/libev_scheduler/scheduler.c', line 150
VALUE Scheduler_block(int argc, VALUE *argv, VALUE self) {
VALUE timeout = (argc == 2) ? argv[1] : Qnil;
if (timeout != Qnil)
Scheduler_sleep(self, timeout);
else
Scheduler_pause(self);
return Qtrue;
}
|
#close ⇒ Object
fiber scheduler interface
97 98 99 100 101 102 103 104 105 106 |
# File 'ext/libev_scheduler/scheduler.c', line 97
VALUE Scheduler_close(VALUE self) {
Scheduler_t *scheduler;
GetScheduler(self, scheduler);
Scheduler_run(self);
ev_async_stop(scheduler->ev_loop, &scheduler->break_async);
if (!ev_is_default_loop(scheduler->ev_loop)) ev_loop_destroy(scheduler->ev_loop);
return self;
}
|
#fiber(&block) ⇒ Object
5 6 7 8 9 10 |
# File 'lib/libev_scheduler.rb', line 5 def fiber(&block) fiber = Fiber.new(blocking: false, &block) unblock(nil, fiber) # fiber.resume return fiber end |
#io_wait(io, events, timeout) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'ext/libev_scheduler/scheduler.c', line 193
VALUE Scheduler_io_wait(VALUE self, VALUE io, VALUE events, VALUE timeout) {
Scheduler_t *scheduler;
struct libev_io io_watcher;
struct libev_timer timeout_watcher;
GetScheduler(self, scheduler);
rb_io_t *fptr;
VALUE underlying_io = rb_ivar_get(io, ID_ivar_io);
if (underlying_io != Qnil) io = underlying_io;
GetOpenFile(io, fptr);
io_watcher.scheduler = scheduler;
io_watcher.fiber = rb_fiber_current();
ev_io_init(&io_watcher.io, Scheduler_io_callback, fptr->fd, io_event_mask(events));
int use_timeout = timeout != Qnil;
if (use_timeout) {
timeout_watcher.scheduler = scheduler;
timeout_watcher.fiber = rb_fiber_current();
ev_timer_init(&timeout_watcher.timer, Scheduler_timer_callback, NUM2DBL(timeout), 0.);
ev_timer_start(scheduler->ev_loop, &timeout_watcher.timer);
}
ev_io_start(scheduler->ev_loop, &io_watcher.io);
VALUE nil = Qnil;
scheduler->pending_count++;
rb_fiber_yield(1, &nil);
scheduler->pending_count--;
ev_io_stop(scheduler->ev_loop, &io_watcher.io);
if (use_timeout)
ev_timer_stop(scheduler->ev_loop, &timeout_watcher.timer);
return self;
}
|
#kernel_sleep(duration = nil) ⇒ Object
12 13 14 |
# File 'lib/libev_scheduler.rb', line 12 def kernel_sleep(duration = nil) block(:sleep, duration) end |
#pending_count ⇒ Object
298 299 300 301 302 303 |
# File 'ext/libev_scheduler/scheduler.c', line 298
VALUE Scheduler_pending_count(VALUE self) {
Scheduler_t *scheduler;
GetScheduler(self, scheduler);
return INT2NUM(scheduler->pending_count);
}
|
#process_wait(pid, flags) ⇒ Object
244 245 246 247 248 249 |
# File 'ext/libev_scheduler/scheduler.c', line 244 def process_wait(pid, flags) # This is a very simple way to implement a non-blocking wait: Thread.new do Process::Status.wait(pid, flags) end.value end |
#run ⇒ Object
86 87 88 89 90 91 92 93 94 95 |
# File 'ext/libev_scheduler/scheduler.c', line 86
VALUE Scheduler_run(VALUE self) {
Scheduler_t *scheduler;
GetScheduler(self, scheduler);
while (scheduler->pending_count > 0 || RARRAY_LEN(scheduler->ready) > 0) {
Scheduler_poll(self);
}
return self;
}
|
#unblock(blocker, fiber) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 |
# File 'ext/libev_scheduler/scheduler.c', line 161
VALUE Scheduler_unblock(VALUE self, VALUE blocker, VALUE fiber) {
Scheduler_t *scheduler;
GetScheduler(self, scheduler);
rb_ary_push(scheduler->ready, fiber);
if (scheduler->currently_polling)
ev_async_send(scheduler->ev_loop, &scheduler->break_async);
return self;
}
|