Class: ZeevexConcurrency::EventLoop
- Inherits:
-
Object
- Object
- ZeevexConcurrency::EventLoop
- Defined in:
- lib/zeevex_concurrency/event_loop.rb
Direct Known Subclasses
Defined Under Namespace
Instance Method Summary collapse
- #<<(callable) ⇒ Object
- #backlog ⇒ Object
-
#enqueue(callable = nil, &block) ⇒ Object
Enqueue any callable object (including a Promise or Future or other Delayed class) to the event loop and return a Delayed object which can be used to fetch the return value.
- #flush ⇒ Object
-
#in_event_loop? ⇒ Boolean
Returns true if the method was called from code executing on the event loop’s thread.
-
#initialize(options = {}) ⇒ EventLoop
constructor
A new instance of EventLoop.
-
#on_event_loop(runnable = nil, &block) ⇒ Object
Runs a computation on the event loop.
- #reset ⇒ Object
-
#run_and_wait(runnable = nil, &block) ⇒ Object
Returns the value from the computation rather than a Promise.
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ EventLoop
Returns a new instance of EventLoop.
6 7 8 9 10 11 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 6 def initialize( = {}) @options = @mutex = .delete(:mutex) || Mutex.new @queue = .delete(:queue) || Queue.new @state = :stopped end |
Instance Method Details
#<<(callable) ⇒ Object
54 55 56 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 54 def <<(callable) enqueue(callable) end |
#backlog ⇒ Object
62 63 64 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 62 def backlog @queue.size end |
#enqueue(callable = nil, &block) ⇒ Object
Enqueue any callable object (including a Promise or Future or other Delayed class) to the event loop and return a Delayed object which can be used to fetch the return value.
Strictly obeys ordering.
45 46 47 48 49 50 51 52 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 45 def enqueue(callable = nil, &block) to_run = callable || block raise ArgumentError, "Must provide proc or block arg" unless to_run to_run = ZeevexConcurrency::Promise.new(to_run) unless to_run.is_a?(ZeevexConcurrency::Delayed) @queue << to_run to_run end |
#flush ⇒ Object
58 59 60 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 58 def flush @queue.clear end |
#in_event_loop? ⇒ Boolean
Returns true if the method was called from code executing on the event loop’s thread
75 76 77 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 75 def in_event_loop? Thread.current.object_id == @thread.object_id end |
#on_event_loop(runnable = nil, &block) ⇒ Object
Runs a computation on the event loop. Does not deadlock if currently on the event loop, but will not preserve ordering either - it runs the computation immediately despite other events in the queue
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 84 def on_event_loop(runnable = nil, &block) return unless runnable || block_given? promise = (runnable && runnable.is_a?(ZeevexConcurrency::Delayed)) ? runnable : ZeevexConcurrency::Promise.create(runnable, &block) if in_event_loop? promise.call promise else enqueue promise, &block end end |
#reset ⇒ Object
66 67 68 69 70 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 66 def reset stop flush start end |
#run_and_wait(runnable = nil, &block) ⇒ Object
Returns the value from the computation rather than a Promise. Has similar semantics to ‘on_event_loop` - if this is called from the event loop, it just executes the computation synchronously ahead of any other queued computations
102 103 104 105 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 102 def run_and_wait(runnable = nil, &block) promise = on_event_loop(runnable, &block) promise.value end |
#running? ⇒ Boolean
13 14 15 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 13 def running? @state == :started end |
#start ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 17 def start return unless @state == :stopped @stop_requested = false @thread = Thread.new do process end @state = :started end |
#stop ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/zeevex_concurrency/event_loop.rb', line 27 def stop return unless @state == :started enqueue { @stop_requested = true } unless @thread.join(1) @thread.kill @thread.join(0) end @thread = nil @state = :stopped end |