Class: Libev::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/libev_scheduler.rb,
ext/libev_scheduler/scheduler.c

Instance Method Summary collapse

Constructor Details

#initializeObject

of a blocking event loop (waking it up) in a thread-safe, signal-safe manner



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'ext/libev_scheduler/scheduler.c', line 65

static VALUE Scheduler_initialize(VALUE self) {
  Scheduler_t *scheduler;
  VALUE thread = rb_thread_current();
  int is_main_thread = (thread == rb_thread_main());

  GetScheduler(self, scheduler);
  scheduler->ev_loop = is_main_thread ? EV_DEFAULT : ev_loop_new(EVFLAG_NOSIGMASK);

  ev_async_init(&scheduler->break_async, break_async_callback);
  ev_async_start(scheduler->ev_loop, &scheduler->break_async);
  ev_unref(scheduler->ev_loop); // don't count the break_async watcher

  scheduler->pending_count = 0;
  scheduler->currently_polling = 0;
  scheduler->ready = rb_ary_new();

  return Qnil;
}

Instance Method Details

#block(*args) ⇒ Object



150
151
152
153
154
155
156
157
158
159
# File 'ext/libev_scheduler/scheduler.c', line 150

VALUE Scheduler_block(int argc, VALUE *argv, VALUE self) {
  VALUE timeout = (argc == 2) ? argv[1] : Qnil;

  if (timeout != Qnil)
    Scheduler_sleep(self, timeout);
  else
    Scheduler_pause(self);
  
  return Qtrue;
}

#closeObject

fiber scheduler interface



97
98
99
100
101
102
103
104
105
106
# File 'ext/libev_scheduler/scheduler.c', line 97

VALUE Scheduler_close(VALUE self) {
  Scheduler_t *scheduler;
  GetScheduler(self, scheduler);

  Scheduler_run(self);

  ev_async_stop(scheduler->ev_loop, &scheduler->break_async);
  if (!ev_is_default_loop(scheduler->ev_loop)) ev_loop_destroy(scheduler->ev_loop);
  return self;
}

#fiber(&block) ⇒ Object



5
6
7
8
9
10
# File 'lib/libev_scheduler.rb', line 5

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  unblock(nil, fiber)
  # fiber.resume
  return fiber
end

#io_wait(io, events, timeout) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'ext/libev_scheduler/scheduler.c', line 193

VALUE Scheduler_io_wait(VALUE self, VALUE io, VALUE events, VALUE timeout) {
  Scheduler_t *scheduler;
  struct libev_io io_watcher;
  struct libev_timer timeout_watcher;
  GetScheduler(self, scheduler);

  rb_io_t *fptr;
  VALUE underlying_io = rb_ivar_get(io, ID_ivar_io);
  if (underlying_io != Qnil) io = underlying_io;
  GetOpenFile(io, fptr);

  io_watcher.scheduler = scheduler;
  io_watcher.fiber = rb_fiber_current();
  ev_io_init(&io_watcher.io, Scheduler_io_callback, fptr->fd, io_event_mask(events));

  int use_timeout = timeout != Qnil;
  if (use_timeout) {
    timeout_watcher.scheduler = scheduler;
    timeout_watcher.fiber = rb_fiber_current();
    ev_timer_init(&timeout_watcher.timer, Scheduler_timer_callback, NUM2DBL(timeout), 0.);
    ev_timer_start(scheduler->ev_loop, &timeout_watcher.timer);
  }

  ev_io_start(scheduler->ev_loop, &io_watcher.io);
  VALUE nil = Qnil;
  scheduler->pending_count++;
  rb_fiber_yield(1, &nil);
  scheduler->pending_count--;
  ev_io_stop(scheduler->ev_loop, &io_watcher.io);
  if (use_timeout)
    ev_timer_stop(scheduler->ev_loop, &timeout_watcher.timer);
  
  return self;
}

#kernel_sleep(duration = nil) ⇒ Object



12
13
14
# File 'lib/libev_scheduler.rb', line 12

def kernel_sleep(duration = nil)
  block(:sleep, duration)
end

#pending_countObject



298
299
300
301
302
303
# File 'ext/libev_scheduler/scheduler.c', line 298

VALUE Scheduler_pending_count(VALUE self) {
  Scheduler_t *scheduler;
  GetScheduler(self, scheduler);

  return INT2NUM(scheduler->pending_count);
}

#process_wait(pid, flags) ⇒ Object



244
245
246
247
248
249
# File 'ext/libev_scheduler/scheduler.c', line 244

def process_wait(pid, flags)
  # This is a very simple way to implement a non-blocking wait:
  Thread.new do
    Process::Status.wait(pid, flags)
  end.value
end

#runObject



86
87
88
89
90
91
92
93
94
95
# File 'ext/libev_scheduler/scheduler.c', line 86

VALUE Scheduler_run(VALUE self) {
  Scheduler_t *scheduler;
  GetScheduler(self, scheduler);

  while (scheduler->pending_count > 0 || RARRAY_LEN(scheduler->ready) > 0) {
    Scheduler_poll(self);
  }

  return self;
}

#unblock(blocker, fiber) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
# File 'ext/libev_scheduler/scheduler.c', line 161

VALUE Scheduler_unblock(VALUE self, VALUE blocker, VALUE fiber) {
  Scheduler_t *scheduler;
  GetScheduler(self, scheduler);

  rb_ary_push(scheduler->ready, fiber);

  if (scheduler->currently_polling)
    ev_async_send(scheduler->ev_loop, &scheduler->break_async);

  return self;
}