Class: UringMachine::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/uringmachine/fiber_scheduler.rb

Overview

Implements the Fiber::Scheduler interface for creating fiber-based concurrent applications in Ruby, in tight integration with the standard Ruby I/O and locking APIs.

Constant Summary collapse

DEFAULT_THREAD_POOL =

The blocking operation thread pool is shared by all fiber schedulers.

BlockingOperationThreadPool.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(machine = nil, thread_pool = DEFAULT_THREAD_POOL) ⇒ void

Instantiates a scheduler with the given UringMachine instance.

machine = UM.new
scheduler = UM::FiberScheduler.new(machine)
Fiber.set_scheduler(scheduler)


98
99
100
101
102
103
# File 'lib/uringmachine/fiber_scheduler.rb', line 98

def initialize(machine = nil, thread_pool = DEFAULT_THREAD_POOL)
  @machine = machine || UM.new
  @thread_pool = thread_pool
  @fiber_map = ObjectSpace::WeakMap.new
  @thread = Thread.current
end

Instance Attribute Details

#fiber_mapObject (readonly)

WeakMap holding references scheduler fibers as keys.



88
89
90
# File 'lib/uringmachine/fiber_scheduler.rb', line 88

def fiber_map
  @fiber_map
end

#machineObject (readonly)

UringMachine instance associated with scheduler.



85
86
87
# File 'lib/uringmachine/fiber_scheduler.rb', line 85

def machine
  @machine
end

Instance Method Details

#address_resolve(hostname) ⇒ Array<Addrinfo>

Resolves an hostname.



379
380
381
# File 'lib/uringmachine/fiber_scheduler.rb', line 379

def address_resolve(hostname)
  Resolv.getaddresses(hostname)
end

#block(blocker, timeout = nil) ⇒ bool

Blocks the current fiber by yielding to the machine. This hook is called when a synchronization mechanism blocks, e.g. a mutex, a queue, etc.



162
163
164
165
166
167
168
169
170
171
# File 'lib/uringmachine/fiber_scheduler.rb', line 162

def block(blocker, timeout = nil)
  if timeout
    @machine.timeout(timeout, Timeout::Error) { @machine.yield }
  else
    @machine.yield
  end
  true
rescue Timeout::Error
  false
end

#blocking_operation_wait(op) ⇒ void

This method returns an undefined value.

Runs the given operation in a separate thread, so as not to block other fibers.



152
153
154
# File 'lib/uringmachine/fiber_scheduler.rb', line 152

def blocking_operation_wait(op)
  @thread_pool.process(@machine, op)
end

#fiber(&block) ⇒ Fiber

Creates a new fiber with the given block. The created fiber is added to the fiber map, scheduled on the scheduler machine, and started before this method returns (by calling snooze).



116
117
118
119
120
121
122
123
# File 'lib/uringmachine/fiber_scheduler.rb', line 116

def fiber(&block)
  fiber = Fiber.new(blocking: false) { @machine.run(fiber, &block) }

  @fiber_map[fiber] = true
  @machine.schedule(fiber, nil)
  @machine.snooze
  fiber
end

#fiber_interrupt(fiber, exception) ⇒ void

This method returns an undefined value.

Interrupts the given fiber with an exception.



370
371
372
373
# File 'lib/uringmachine/fiber_scheduler.rb', line 370

def fiber_interrupt(fiber, exception)
  @machine.schedule(fiber, exception)
  @machine.wakeup
end

#instance_variables_to_inspectObject

:nodoc:



106
107
108
# File 'lib/uringmachine/fiber_scheduler.rb', line 106

def instance_variables_to_inspect
  [:@machine]
end

#io_close(fd) ⇒ Integer

Closes the given fd.



346
347
348
349
350
# File 'lib/uringmachine/fiber_scheduler.rb', line 346

def io_close(fd)
  @machine.close_async(fd)
rescue Errno => e
  -e.errno
end

#io_pread(io, buffer, from, length, offset) ⇒ Integer

Reads from the given IO at the given file offset



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/uringmachine/fiber_scheduler.rb', line 271

def io_pread(io, buffer, from, length, offset)
  length = buffer.size if length == 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset, from)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_pwrite(io, buffer, from, length, offset) ⇒ Integer

Writes to the given IO at the given file offset.



323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/uringmachine/fiber_scheduler.rb', line 323

def io_pwrite(io, buffer, from, length, offset)
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length, from)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_read(io, buffer, length, offset) ⇒ Integer

Reads from the given IO.



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/uringmachine/fiber_scheduler.rb', line 245

def io_read(io, buffer, length, offset)
  length = buffer.size if length == 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_select(rios, wios, eios, timeout = nil) ⇒ Object

Selects the first ready IOs from the given sets of IOs.



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/uringmachine/fiber_scheduler.rb', line 221

def io_select(rios, wios, eios, timeout = nil)
  map_r = map_fds(rios)
  map_w = map_fds(wios)
  map_e = map_fds(eios)

  r, w, e = nil
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
    }
  else
    r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
  end

  [unmap_fds(r, map_r), unmap_fds(w, map_w), unmap_fds(e, map_e)]
end

#io_wait(io, events, timeout = nil) ⇒ void

This method returns an undefined value.

Waits for the given io to become ready.



204
205
206
207
208
209
210
211
212
213
# File 'lib/uringmachine/fiber_scheduler.rb', line 204

def io_wait(io, events, timeout = nil)
  timeout ||= io.timeout
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      @machine.poll(io.fileno, events)
    }
  else
    @machine.poll(io.fileno, events)
  end
end

#io_write(io, buffer, length, offset) ⇒ Integer

Writes to the given IO.



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/uringmachine/fiber_scheduler.rb', line 296

def io_write(io, buffer, length, offset)
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#join(*fibers) ⇒ void

This method returns an undefined value.

Waits for the given fibers to terminate. If no fibers are given, waits for all fibers to terminate.



138
139
140
141
142
143
144
145
# File 'lib/uringmachine/fiber_scheduler.rb', line 138

def join(*fibers)
  if fibers.empty?
    fibers = @fiber_map.keys
    @fiber_map = ObjectSpace::WeakMap.new
  end

  @machine.await_fibers(fibers)
end

#kernel_sleep(duration = nil) ⇒ void

This method returns an undefined value.

Sleeps for the given duration.



189
190
191
# File 'lib/uringmachine/fiber_scheduler.rb', line 189

def kernel_sleep(duration = nil)
  duration ? @machine.sleep(duration) : @machine.yield
end

#process_wait(pid, flags) ⇒ Process::Status

Waits for a process to terminate.



359
360
361
362
# File 'lib/uringmachine/fiber_scheduler.rb', line 359

def process_wait(pid, flags)
  flags = UM::WEXITED if flags == 0
  @machine.waitid_status(UM::P_PID, pid, flags)
end

#scheduler_closevoid

This method returns an undefined value.

Waits for all fiber to terminate. Called upon thread termination or when the thread’s fiber scheduler is changed.



129
130
131
# File 'lib/uringmachine/fiber_scheduler.rb', line 129

def scheduler_close
  join()
end

#timeout_after(duration, exception, message, &block) ⇒ any

Run the given block with a timeout.



390
391
392
# File 'lib/uringmachine/fiber_scheduler.rb', line 390

def timeout_after(duration, exception, message, &block)
  @machine.timeout(duration, exception, &block)
end

#unblock(blocker, fiber) ⇒ void

This method returns an undefined value.

Unblocks the given fiber by scheduling it. This hook is called when a synchronization mechanism unblocks, e.g. a mutex, a queue, etc.



180
181
182
183
# File 'lib/uringmachine/fiber_scheduler.rb', line 180

def unblock(blocker, fiber)
  @machine.schedule(fiber, nil)
  @machine.wakeup if Thread.current != @thread
end

#yieldObject

Yields to the next runnable fiber.



194
195
196
# File 'lib/uringmachine/fiber_scheduler.rb', line 194

def yield
  @machine.snooze
end