Class: Fiber
Overview
Fiber extensions
Instance Attribute Summary collapse
-
#oob ⇒ Object
Returns the value of attribute oob.
-
#parent ⇒ Object
Returns the value of attribute parent.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#tag ⇒ Object
Returns the value of attribute tag.
-
#thread ⇒ Object
Returns the value of attribute thread.
Class Method Summary collapse
-
.await(*fibers) ⇒ Object
(also: join)
Waits for all given fibers to terminate, then returns the respective return values for all terminated fibers.
-
.schedule_priority_oob_fiber(&block) ⇒ Object
Creates and schedules with priority an out-of-band fiber that runs the supplied block.
-
.select(*fibers) ⇒ Array
Waits for at least one of the given fibers to terminate, returning an array containing the first terminated fiber and its return value.
Instance Method Summary collapse
-
#<<(msg) ⇒ Fiber
Sends a message to the given fiber.
-
#add_child(child_fiber) ⇒ Fiber
Adds a child fiber reference.
-
#attach_all_children_to(parent) ⇒ Fiber
Attaches all child fibers to a new parent.
-
#attach_and_monitor(parent) ⇒ Fiber
Attaches the fiber to the new parent and monitors the new parent.
-
#attach_to(parent) ⇒ Fiber
Attaches the fiber to a new parent.
-
#await ⇒ Object
Waits for the fiber to terminate, and returns its return value (the result of its last statement).
-
#await_all_children ⇒ Array<any>
Block until all child fibers have terminated.
-
#caller ⇒ Array<String>
Returns the fiber's caller.
-
#cancel(exception = Polyphony::Cancel) ⇒ Fiber
Stops a fiber by raising a Polyphony::Cancel exception.
-
#children ⇒ Array<Fiber>
Returns the fiber's children.
-
#dead? ⇒ bool
Returns true if the fiber is dead.
-
#detach ⇒ Fiber
Detaches the fiber from its current parent.
-
#finalize(result, uncaught_exception: false) ⇒ false
Finalizes the fiber, handling its return value or any uncaught exception.
-
#finalize_children(result, uncaught_exception) ⇒ Array
Shuts down all children of the current fiber.
-
#graceful_shutdown=(graceful) ⇒ bool
Sets the graceful shutdown flag for the fiber.
-
#graceful_shutdown? ⇒ bool
Returns the graceful shutdown flag for the fiber.
-
#inform_monitors(result, uncaught_exception) ⇒ Fiber
Informs the fiber's monitors it is terminated.
-
#inspect ⇒ String
(also: #to_s)
Returns a string representation of the fiber for debugging.
-
#interject(&block) ⇒ Fiber
Adds an interjection to the fiber.
-
#interrupt(value = nil) ⇒ Fiber
(also: #stop, #kill, #move_on)
Stops the fiber by raising a
Polyphony::MoveOn
exception. -
#join ⇒ Object
Waits for the fiber to terminate, and returns its return value (the result of its last statement).
-
#location ⇒ String
Returns the source location for the fiber based on its caller.
-
#mailbox ⇒ Queue
Returns the fiber's mailbox.
-
#main? ⇒ bool
Returns true if the fiber is the main fiber for its thread.
-
#monitor(fiber) ⇒ Fiber
Adds a fiber to the list of monitoring fibers.
-
#monitor_mailbox ⇒ Polyphony::Queue
Returns the fiber's monitoring mailbox queue, used for receiving fiber monitoring messages.
-
#monitors ⇒ Array<Fiber>
Returns the list of monitoring fibers.
-
#prepare(tag, block, caller, parent) ⇒ Fiber
Prepares a fiber for running.
-
#raise(*args) ⇒ Object
Raises an exception in the context of the fiber.
-
#receive ⇒ any
Receives a message from the fiber's mailbox.
-
#receive_all_pending ⇒ Array
Receives all messages currently in the fiber's mailbox.
-
#receive_loop ⇒ Object
Receives messages from the fiber's mailbox in an infinite loop.
-
#remove_child(child_fiber) ⇒ Fiber
Removes a child fiber reference.
-
#restart(value = nil) ⇒ Fiber
(also: #reset)
Restarts the fiber, with the given value serving as the first value passed to the fiber's block.
-
#restart_self(first_value) ⇒ any
Resets the fiber's state and reruns the fiber.
-
#run(first_value) ⇒ any
Runs the fiber's block and handles uncaught exceptions.
-
#running? ⇒ bool
Returns true if fiber is running.
-
#schedule(*args) ⇒ Object
Adds the fiber to the runqueue with the given resume value.
-
#schedule_with_priority(*args) ⇒ Object
Adds the fiber to the head of the runqueue with the given resume value.
-
#send(msg) ⇒ Fiber
Sends a message to the given fiber.
-
#set_caller(caller) ⇒ Fiber
Sets the fiber's caller.
-
#setup_main_fiber ⇒ Fiber
Sets up the fiber as the main fiber for the current thread.
-
#setup_raw ⇒ Fiber
Performs setup for a "raw" Fiber created using Fiber.new.
-
#shutdown_all_children(graceful: false) ⇒ Fiber
Terminates and blocks until all child fibers have terminated.
-
#spin(tag = nil, orig_caller = Kernel.caller, &block) ⇒ Fiber
Creates a new child fiber.
-
#state ⇒ Symbol
Returns the current state for the fiber, one of the following:.
-
#supervise(*fibers, **opts, &block) ⇒ Object
Supervises the given fibers or all child fibers.
-
#terminate(graceful: false) ⇒ Fiber
Terminates the fiber, optionally setting the graceful shutdown flag.
-
#terminate_all_children(graceful: false) ⇒ Fiber
Terminates all child fibers.
-
#unmonitor(fiber) ⇒ Fiber
Removes a monitor fiber.
-
#value ⇒ Object
Waits for the fiber to terminate, and returns its return value (the result of its last statement).
Instance Attribute Details
#oob ⇒ Object
Returns the value of attribute oob.
7 8 9 |
# File 'lib/polyphony/extensions/fiber.rb', line 7 def oob @oob end |
#parent ⇒ Object
Returns the value of attribute parent.
7 8 9 |
# File 'lib/polyphony/extensions/fiber.rb', line 7 def parent @parent end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
8 9 10 |
# File 'lib/polyphony/extensions/fiber.rb', line 8 def result @result end |
#tag ⇒ Object
Returns the value of attribute tag.
7 8 9 |
# File 'lib/polyphony/extensions/fiber.rb', line 7 def tag @tag end |
#thread ⇒ Object
Returns the value of attribute thread.
7 8 9 |
# File 'lib/polyphony/extensions/fiber.rb', line 7 def thread @thread end |
Class Method Details
.Fiber.await(f1, f2, ...) ⇒ Array<any> .Fiber.await(fibers) ⇒ Array<any> Also known as: join
Waits for all given fibers to terminate, then returns the respective
return values for all terminated fibers. If any of the awaited fibers
terminates with an uncaught exception, Fiber.await
will await all the
other fibers to terminate, then reraise the exception.
This method can be called with multiple fibers as multiple arguments, or with a single array containing one or more fibers.
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 |
# File 'lib/polyphony/extensions/fiber.rb', line 557 def await(*fibers) return [] if fibers.empty? if (first = fibers.first).is_a?(Array) fibers = first end current_fiber = Fiber.current mailbox = current_fiber.monitor_mailbox results = {} fibers.each do |f| results[f] = nil if f.dead? # fiber already terminated, so queue message mailbox << [f, f.result] else f.monitor(current_fiber) end end exception = nil while !fibers.empty? (fiber, result) = mailbox.shift next unless fibers.include?(fiber) fibers.delete(fiber) current_fiber.remove_child(fiber) if fiber.parent == current_fiber if result.is_a?(Exception) exception ||= result fibers.each(&:terminate) else results[fiber] = result end end raise exception if exception results.values end |
.schedule_priority_oob_fiber(&block) ⇒ Object
Creates and schedules with priority an out-of-band fiber that runs the
supplied block. If any uncaught exception is raised while the fiber is
running, it will bubble up to the main thread's main fiber, which will
also be scheduled with priority. This method is mainly used trapping
signals (see also the patched Kernel#trap
)
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 |
# File 'lib/polyphony/extensions/fiber.rb', line 631 def schedule_priority_oob_fiber(&block) oob_fiber = Fiber.new do Fiber.current.setup_raw Thread.backend.trace(:unblock, oob_fiber, nil, @caller) result = block.call rescue Exception => e Thread.current.schedule_and_wakeup(Thread.main.main_fiber, e) result = e ensure Thread.backend.trace(:terminate, Fiber.current, result) suspend end prepare_oob_fiber(oob_fiber, block) Thread.backend.trace(:spin, oob_fiber, caller) oob_fiber.schedule_with_priority(nil) end |
.select(*fibers) ⇒ Array
Waits for at least one of the given fibers to terminate, returning an array containing the first terminated fiber and its return value. If an exception occurs in one of the given fibers, it will be reraised.
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 |
# File 'lib/polyphony/extensions/fiber.rb', line 602 def select(*fibers) return nil if fibers.empty? current_fiber = Fiber.current mailbox = current_fiber.monitor_mailbox fibers.each do |f| if f.dead? result = f.result result.is_a?(Exception) ? (raise result) : (return [f, result]) end end fibers.each { |f| f.monitor(current_fiber) } while true (fiber, result) = mailbox.shift next unless fibers.include?(fiber) fibers.each { |f| f.unmonitor(current_fiber) } raise result if result.is_a?(Exception) return [fiber, result] end end |
Instance Method Details
#<<(msg) ⇒ Fiber
Sends a message to the given fiber. The message will be added to the fiber's mailbox.
122 123 124 125 126 127 128 129 130 |
# File 'ext/polyphony/fiber.c', line 122
VALUE Fiber_send(VALUE self, VALUE msg) {
VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
if (mailbox == Qnil) {
mailbox = rb_funcall(cQueue, ID_new, 0);
rb_ivar_set(self, ID_ivar_mailbox, mailbox);
}
Queue_push(mailbox, msg);
return self;
}
|
#add_child(child_fiber) ⇒ Fiber
Adds a child fiber reference. Used internally.
366 367 368 369 370 |
# File 'lib/polyphony/extensions/fiber.rb', line 366 def add_child(child_fiber) (@children ||= {})[child_fiber] = true child_fiber.monitor(self) if @supervise_mode self end |
#attach_all_children_to(parent) ⇒ Fiber
Attaches all child fibers to a new parent.
322 323 324 325 326 |
# File 'lib/polyphony/extensions/fiber.rb', line 322 def attach_all_children_to(parent) child_fibers = @children&.keys child_fibers&.each { |c| c.attach_to(parent) } self end |
#attach_and_monitor(parent) ⇒ Fiber
Attaches the fiber to the new parent and monitors the new parent.
354 355 356 357 358 359 360 |
# File 'lib/polyphony/extensions/fiber.rb', line 354 def attach_and_monitor(parent) @parent.remove_child(self) @parent = parent parent.add_child(self) monitor(parent) self end |
#attach_to(parent) ⇒ Fiber
Attaches the fiber to a new parent.
343 344 345 346 347 348 |
# File 'lib/polyphony/extensions/fiber.rb', line 343 def attach_to(parent) @parent.remove_child(self) @parent = parent parent.add_child(self) self end |
#await ⇒ any #join ⇒ any #value ⇒ any
Waits for the fiber to terminate, and returns its return value (the result of its last statement). If the fiber has terminated with an ancaught exception, the exception will be raised.
f = spin { :foo; :bar }
f.await #=> :bar
198 199 200 |
# File 'lib/polyphony/extensions/fiber.rb', line 198 def await Fiber.await(self).first end |
#await_all_children ⇒ Array<any>
Block until all child fibers have terminated. Returns the return values for all child fibers.
294 295 296 297 298 |
# File 'lib/polyphony/extensions/fiber.rb', line 294 def await_all_children return unless @children && !@children.empty? Fiber.await(@children.keys.reject(&:dead?)) end |
#caller ⇒ Array<String>
Returns the fiber's caller.
43 44 45 46 47 48 49 50 |
# File 'lib/polyphony/extensions/fiber.rb', line 43 def caller spin_caller = @caller || [] if @parent spin_caller + @parent.caller else spin_caller end end |
#cancel(exception = Polyphony::Cancel) ⇒ Fiber
Stops a fiber by raising a Polyphony::Cancel exception.
119 120 121 122 123 124 125 |
# File 'lib/polyphony/extensions/fiber.rb', line 119 def cancel(exception = Polyphony::Cancel) return if @running == false value = exception.is_a?(Class) ? exception.new : exception schedule value self end |
#children ⇒ Array<Fiber>
Returns the fiber's children.
255 256 257 |
# File 'lib/polyphony/extensions/fiber.rb', line 255 def children (@children ||= {}).keys end |
#dead? ⇒ bool
Returns true if the fiber is dead.
538 539 540 |
# File 'lib/polyphony/extensions/fiber.rb', line 538 def dead? state == :dead end |
#detach ⇒ Fiber
Detaches the fiber from its current parent. The fiber will be made a child of the main fiber (for the current thread.)
332 333 334 335 336 337 |
# File 'lib/polyphony/extensions/fiber.rb', line 332 def detach @parent.remove_child(self) @parent = @thread.main_fiber @parent.add_child(self) self end |
#finalize(result, uncaught_exception: false) ⇒ false
Finalizes the fiber, handling its return value or any uncaught exception.
462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/polyphony/extensions/fiber.rb', line 462 def finalize(result, uncaught_exception: false) result, uncaught_exception = finalize_children(result, uncaught_exception) Thread.backend.trace(:terminate, self, result) @result = result inform_monitors(result, uncaught_exception) @running = false ensure @parent&.remove_child(self) # Prevent fiber from being resumed after terminating @thread.fiber_unschedule(self) Thread.current.switch_fiber end |
#finalize_children(result, uncaught_exception) ⇒ Array
Shuts down all children of the current fiber. If any exception occurs while the children are shut down, it is returned along with the uncaught_exception flag set. Otherwise, it returns the given arguments.
483 484 485 486 487 488 |
# File 'lib/polyphony/extensions/fiber.rb', line 483 def finalize_children(result, uncaught_exception) shutdown_all_children(graceful: graceful_shutdown?) [result, uncaught_exception] rescue Exception => e [e, true] end |
#graceful_shutdown=(graceful) ⇒ bool
Sets the graceful shutdown flag for the fiber.
131 132 133 |
# File 'lib/polyphony/extensions/fiber.rb', line 131 def graceful_shutdown=(graceful) @graceful_shutdown = graceful end |
#graceful_shutdown? ⇒ bool
Returns the graceful shutdown flag for the fiber.
138 139 140 |
# File 'lib/polyphony/extensions/fiber.rb', line 138 def graceful_shutdown? @graceful_shutdown end |
#inform_monitors(result, uncaught_exception) ⇒ Fiber
Informs the fiber's monitors it is terminated.
495 496 497 498 499 500 501 502 503 504 505 506 507 |
# File 'lib/polyphony/extensions/fiber.rb', line 495 def inform_monitors(result, uncaught_exception) if @monitors msg = [self, result] @monitors.each_key { |f| f.monitor_mailbox << msg } end if uncaught_exception && @parent parent_is_monitor = @monitors&.key?(@parent) @parent.schedule_with_priority(result) unless parent_is_monitor end self end |
#inspect ⇒ String Also known as: to_s
Returns a string representation of the fiber for debugging.
20 21 22 23 24 25 26 |
# File 'lib/polyphony/extensions/fiber.rb', line 20 def inspect if @tag "#<Fiber #{tag}:#{object_id} #{location} (#{state})>" else "#<Fiber:#{object_id} #{location} (#{state})>" end end |
#interject(&block) ⇒ Fiber
Adds an interjection to the fiber. The current operation undertaken by the fiber will be interrupted, and the given block will be executed, and the operation will be resumed. This API is experimental and might be removed in the future.
181 182 183 |
# File 'lib/polyphony/extensions/fiber.rb', line 181 def interject(&block) raise Polyphony::Interjection.new(block) end |
#interrupt(value = nil) ⇒ Fiber Also known as: stop, kill, move_on
Stops the fiber by raising a Polyphony::MoveOn
exception. The given
value will become the fiber's return value.
85 86 87 88 89 90 |
# File 'lib/polyphony/extensions/fiber.rb', line 85 def interrupt(value = nil) return if @running == false schedule Polyphony::MoveOn.new(value) self end |
#await ⇒ any #join ⇒ any #value ⇒ any
Waits for the fiber to terminate, and returns its return value (the result of its last statement). If the fiber has terminated with an ancaught exception, the exception will be raised.
f = spin { :foo; :bar }
f.await #=> :bar
201 202 203 |
# File 'lib/polyphony/extensions/fiber.rb', line 201 def await Fiber.await(self).first end |
#location ⇒ String
Returns the source location for the fiber based on its caller.
32 33 34 35 36 37 38 |
# File 'lib/polyphony/extensions/fiber.rb', line 32 def location if @oob "#{@caller[0]} (oob)" else @caller ? @caller[0] : '(root)' end end |
#mailbox ⇒ Queue
Returns the fiber's mailbox.
170 171 172 173 174 175 176 177 |
# File 'ext/polyphony/fiber.c', line 170
VALUE Fiber_mailbox(VALUE self) {
VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
if (mailbox == Qnil) {
mailbox = rb_funcall(cQueue, ID_new, 0);
rb_ivar_set(self, ID_ivar_mailbox, mailbox);
}
return mailbox;
}
|
#main? ⇒ bool
Returns true if the fiber is the main fiber for its thread.
64 65 66 |
# File 'lib/polyphony/extensions/fiber.rb', line 64 def main? @main end |
#monitor(fiber) ⇒ Fiber
Adds a fiber to the list of monitoring fibers. Monitoring fibers will be notified on their monitor mailboxes when the fiber is terminated.
514 515 516 517 |
# File 'lib/polyphony/extensions/fiber.rb', line 514 def monitor(fiber) (@monitors ||= {})[fiber] = true self end |
#monitor_mailbox ⇒ Polyphony::Queue
Returns the fiber's monitoring mailbox queue, used for receiving fiber monitoring messages.
76 77 78 |
# File 'lib/polyphony/extensions/fiber.rb', line 76 def monitor_mailbox @monitor_mailbox ||= Polyphony::Queue.new end |
#monitors ⇒ Array<Fiber>
Returns the list of monitoring fibers.
531 532 533 |
# File 'lib/polyphony/extensions/fiber.rb', line 531 def monitors @monitors&.keys || [] end |
#prepare(tag, block, caller, parent) ⇒ Fiber
Prepares a fiber for running.
392 393 394 395 396 397 398 399 400 401 |
# File 'lib/polyphony/extensions/fiber.rb', line 392 def prepare(tag, block, caller, parent) @thread = Thread.current @tag = tag @parent = parent @caller = caller @block = block Thread.backend.trace(:spin, self, Kernel.caller[1..]) schedule self end |
#fiber.raise(message) ⇒ Fiber #fiber.raise(exception_class) ⇒ Fiber #fiber.raise(exception_class, exception_message) ⇒ Fiber #fiber.raise(exception) ⇒ Fiber
Raises an exception in the context of the fiber
169 170 171 172 173 |
# File 'lib/polyphony/extensions/fiber.rb', line 169 def raise(*args) error = Exception.instantiate(*args) schedule(error) self end |
#receive ⇒ any
Receives a message from the fiber's mailbox. If no message is available, waits for a message to be sent to it.
138 139 140 141 142 143 144 145 |
# File 'ext/polyphony/fiber.c', line 138
VALUE Fiber_receive(VALUE self) {
VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
if (mailbox == Qnil) {
mailbox = rb_funcall(cQueue, ID_new, 0);
rb_ivar_set(self, ID_ivar_mailbox, mailbox);
}
return Queue_shift(0, 0, mailbox);
}
|
#receive_all_pending ⇒ Array
Receives all messages currently in the fiber's mailbox.
184 185 186 187 |
# File 'ext/polyphony/fiber.c', line 184 VALUE Fiber_receive_all_pending(VALUE self) { VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox); return (mailbox == Qnil) ? rb_ary_new() : Queue_shift_all(mailbox); } |
#receive_loop ⇒ Object
Receives messages from the fiber's mailbox in an infinite loop.
151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'ext/polyphony/fiber.c', line 151
noreturn VALUE Fiber_receive_loop(VALUE self) {
VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
if (mailbox == Qnil) {
mailbox = rb_funcall(cQueue, ID_new, 0);
rb_ivar_set(self, ID_ivar_mailbox, mailbox);
}
while (1) {
VALUE msg = Queue_shift(0, 0,mailbox);
rb_yield(msg);
RB_GC_GUARD(msg);
}
}
|
#remove_child(child_fiber) ⇒ Fiber
Removes a child fiber reference. Used internally.
376 377 378 379 |
# File 'lib/polyphony/extensions/fiber.rb', line 376 def remove_child(child_fiber) @children&.delete(child_fiber) self end |
#restart(value = nil) ⇒ Fiber Also known as: reset
Restarts the fiber, with the given value serving as the first value passed to the fiber's block.
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/polyphony/extensions/fiber.rb', line 100 def restart(value = nil) raise "Can't restart main fiber" if @main if @running schedule Polyphony::Restart.new(value) return self end fiber = parent.spin(@tag, @caller, &@block) @monitors&.each_key { |f| fiber.monitor(f) } fiber.schedule(value) unless value.nil? fiber end |
#restart_self(first_value) ⇒ any
Resets the fiber's state and reruns the fiber.
452 453 454 455 |
# File 'lib/polyphony/extensions/fiber.rb', line 452 def restart_self(first_value) @mailbox = nil run(first_value) end |
#run(first_value) ⇒ any
Runs the fiber's block and handles uncaught exceptions.
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/polyphony/extensions/fiber.rb', line 407 def run(first_value) Kernel.raise first_value if first_value.is_a?(Exception) @running = true Thread.backend.trace(:unblock, self, first_value, @caller) result = @block.(first_value) finalize(result) result rescue Polyphony::Restart => e restart_self(e.value) rescue Polyphony::MoveOn, Polyphony::Terminate => e finalize(e.value) rescue Exception => e e.source_fiber = self finalize(e, uncaught_exception: true) end |
#running? ⇒ bool
Returns true if fiber is running.
13 14 15 |
# File 'lib/polyphony/extensions/fiber.rb', line 13 def running? @running end |
#schedule(value) ⇒ Fiber #schedule ⇒ Fiber
Adds the fiber to the runqueue with the given resume value. If no resume
value is given, the fiber will be resumed with nil
.
73 74 75 76 77 |
# File 'ext/polyphony/fiber.c', line 73
static VALUE Fiber_schedule(int argc, VALUE *argv, VALUE self) {
VALUE value = (argc == 0) ? Qnil : argv[0];
Fiber_make_runnable(self, value);
return self;
}
|
#schedule_with_priority(value) ⇒ Fiber #schedule_with_priority ⇒ Fiber
Adds the fiber to the head of the runqueue with the given resume value. If no
resume value is given, the fiber will be resumed with nil
.
89 90 91 92 93 |
# File 'ext/polyphony/fiber.c', line 89
static VALUE Fiber_schedule_with_priority(int argc, VALUE *argv, VALUE self) {
VALUE value = (argc == 0) ? Qnil : argv[0];
Fiber_make_runnable_with_priority(self, value);
return self;
}
|
#send(msg) ⇒ Fiber
Sends a message to the given fiber. The message will be added to the fiber's mailbox.
122 123 124 125 126 127 128 129 130 |
# File 'ext/polyphony/fiber.c', line 122
VALUE Fiber_send(VALUE self, VALUE msg) {
VALUE mailbox = rb_ivar_get(self, ID_ivar_mailbox);
if (mailbox == Qnil) {
mailbox = rb_funcall(cQueue, ID_new, 0);
rb_ivar_set(self, ID_ivar_mailbox, mailbox);
}
Queue_push(mailbox, msg);
return self;
}
|
#set_caller(caller) ⇒ Fiber
Sets the fiber's caller.
56 57 58 59 |
# File 'lib/polyphony/extensions/fiber.rb', line 56 def set_caller(caller) @caller = caller self end |
#setup_main_fiber ⇒ Fiber
Sets up the fiber as the main fiber for the current thread.
439 440 441 442 443 444 445 446 |
# File 'lib/polyphony/extensions/fiber.rb', line 439 def setup_main_fiber @main = true @tag = :main @thread = Thread.current @running = true @children&.clear self end |
#setup_raw ⇒ Fiber
Performs setup for a "raw" Fiber created using Fiber.new. Note that this fiber is an orphan fiber (has no parent), since we cannot control how the fiber terminates after it has already been created. Calling #setup_raw allows the fiber to be scheduled and to receive messages.
430 431 432 433 434 |
# File 'lib/polyphony/extensions/fiber.rb', line 430 def setup_raw @thread = Thread.current @running = true self end |
#shutdown_all_children(graceful: false) ⇒ Fiber
Terminates and blocks until all child fibers have terminated.
303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/polyphony/extensions/fiber.rb', line 303 def shutdown_all_children(graceful: false) return self unless @children pending = [] child_fibers = @children.keys child_fibers.each do |c| next if c.dead? c.terminate(graceful:) pending << c end Fiber.await(pending) self end |
#spin(tag = nil, orig_caller = Kernel.caller, &block) ⇒ Fiber
Creates a new child fiber.
child = fiber.spin { sleep 10; fiber.stop }
266 267 268 269 270 271 272 |
# File 'lib/polyphony/extensions/fiber.rb', line 266 def spin(tag = nil, orig_caller = Kernel.caller, &block) f = Fiber.new { |v| f.run(v) } f.prepare(tag, block, orig_caller, self) (@children ||= {})[f] = true f.monitor(self) if @supervise_mode f end |
#state ⇒ Symbol
Returns the current state for the fiber, one of the following:
:running
- the fiber is currently running.:runnable
- the fiber is on the runqueue, scheduled to be resumed ("ran").:waiting
- the fiber is waiting on some blocking operation to complete, allowing other fibers to run.:dead
- the fiber has finished running.
106 107 108 109 110 111 112 113 |
# File 'ext/polyphony/fiber.c', line 106
static VALUE Fiber_state(VALUE self) {
if (!rb_fiber_alive_p(self) || (rb_ivar_get(self, ID_ivar_running) == Qfalse))
return SYM_dead;
if (rb_fiber_current() == self) return SYM_running;
if (rb_ivar_get(self, ID_ivar_runnable) != Qnil) return SYM_runnable;
return SYM_waiting;
}
|
#supervise(*fibers, **opts, &block) ⇒ Object
Supervises the given fibers or all child fibers. The fiber is put in
supervision mode, which means any child added after calling #supervise
will automatically be supervised. Depending on the given options, fibers
may be automatically restarted.
If a block is given, the block is called whenever a supervised fiber has
terminated. If the :on_done
option is given, that proc will be called
when a supervised fiber has terminated. If the :on_error
option is
given, that proc will be called when a supervised fiber has terminated
with an uncaught exception. If the :restart
option equals :always
,
fibers will always be restarted. If the :restart
option equals
:on_error
, fibers will be restarted only when terminated with an
uncaught exception.
This method blocks indefinitely.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/polyphony/extensions/fiber.rb', line 228 def supervise(*fibers, **opts, &block) block ||= supervise_opts_to_block(opts) @supervise_mode = true fibers = children if fibers.empty? fibers.each do |f| f.attach_to(self) unless f.parent == self f.monitor(self) end mailbox = monitor_mailbox while true (fiber, result) = mailbox.shift block&.call(fiber, result) end ensure @supervise_mode = false end |
#terminate(graceful: false) ⇒ Fiber
Terminates the fiber, optionally setting the graceful shutdown flag.
146 147 148 149 150 151 152 |
# File 'lib/polyphony/extensions/fiber.rb', line 146 def terminate(graceful: false) return if @running == false @graceful_shutdown = graceful schedule Polyphony::Terminate.new self end |
#terminate_all_children(graceful: false) ⇒ Fiber
Terminates all child fibers. This method will return before the fibers are actually terminated.
279 280 281 282 283 284 285 286 287 288 |
# File 'lib/polyphony/extensions/fiber.rb', line 279 def terminate_all_children(graceful: false) return self unless @children e = Polyphony::Terminate.new @children.each_key do |c| c.graceful_shutdown = true if graceful c.raise e end self end |
#unmonitor(fiber) ⇒ Fiber
Removes a monitor fiber.
523 524 525 526 |
# File 'lib/polyphony/extensions/fiber.rb', line 523 def unmonitor(fiber) (@monitors ||= []).delete(fiber) self end |
#await ⇒ any #join ⇒ any #value ⇒ any
Waits for the fiber to terminate, and returns its return value (the result of its last statement). If the fiber has terminated with an ancaught exception, the exception will be raised.
f = spin { :foo; :bar }
f.await #=> :bar
202 203 204 |
# File 'lib/polyphony/extensions/fiber.rb', line 202 def await Fiber.await(self).first end |