Class: Concurrently::Proc
- Inherits:
-
Proc
- Object
- Proc
- Concurrently::Proc
- Includes:
- CallbacksAttachable
- Defined in:
- lib/all/concurrently/proc.rb,
lib/all/concurrently/proc.rb,
lib/all/concurrently/proc.rb
Overview
Concurrent procs are thread safe.
A Concurrently::Proc
is like a regular Proc except its block of code is
evaluated concurrently. Its evaluation can wait for other stuff to happen
(e.g. result of evaluations or readiness of an IO) without blocking the
execution of its thread.
Errors raised inside concurrent evaluations are re-raised when getting
their result with Evaluation#await_result. They are also logged to
stderr by default. This behavior can be controlled by Proc.error_log_output=.
In addition, errors can be watched by registering callbacks for the
:error
event as shown in the example below. This is useful as a central
access point to errors happening inside concurrent evaluations for recovery
purposes. Also, concurrent procs evaluated with #call_and_forget are
evaluated in the background with no access to their evaluation and if they
fail they do so silently. The callbacks are the only way to gain access to
their errors.
The callbacks can be registered for all procs or only for one specific proc:
Defined Under Namespace
Classes: Evaluation
Class Method Summary collapse
-
.error_log_output=(output) ⇒ Object
Sets the output to which errors in concurrent procs are written to.
Instance Method Summary collapse
-
#call(*args) ⇒ Object
(also: #[])
Evaluates the concurrent proc in a blocking manner.
-
#call_and_forget(*args) ⇒ nil
Fire and forget variation of #call_detached.
-
#call_detached(*args) ⇒ Evaluation
Evaluates the concurrent proc detached from the current execution thread.
-
#call_nonblock(*args) ⇒ Object, Evaluation
Evaluates the concurrent proc in a non-blocking manner.
-
#initialize(evaluation_class = Evaluation) ⇒ Proc
constructor
A new instance of Proc.
Constructor Details
#initialize(evaluation_class = Evaluation) ⇒ Proc
A new instance of Concurrently::Proc
124 125 126 |
# File 'lib/all/concurrently/proc.rb', line 124 def initialize(evaluation_class = Evaluation) @evaluation_class = evaluation_class end |
Class Method Details
.error_log_output=(output) ⇒ Object
Sets the output to which errors in concurrent procs are written to.
By default, errors are written to stderr. To disable logging of errors,
set the output to nil
or false
.
102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/all/concurrently/proc.rb', line 102 def self.error_log_output=(output) if Object.const_defined? :Logger and Logger === output @error_handler.cancel if @error_handler @error_handler = on(:error){ |error| output.error Proc. self, error } elsif IO === output @error_handler.cancel if @error_handler @error_handler = on(:error){ |error| output.puts Proc. self, error } elsif !output remove_instance_variable(:@error_handler).cancel if @error_handler else raise Error, "output no logger or IO" end end |
Instance Method Details
#call(*args) ⇒ Object Also known as: []
Evaluates the concurrent proc in a blocking manner.
Evaluating the proc this way executes its block of code immediately and blocks the current thread of execution until the result is available.
170 171 172 173 174 175 176 177 |
# File 'lib/all/concurrently/proc.rb', line 170 def call(*args) case immediate_result = call_nonblock(*args) when Evaluation immediate_result.await_result else immediate_result end end |
#call_and_forget(*args) ⇒ nil
Fire and forget variation of #call_detached.
Once called, there is no way to control the evaluation anymore. But, because we save creating an Evaluation instance this is slightly faster than #call_detached.
291 292 293 294 295 296 297 298 |
# File 'lib/all/concurrently/proc.rb', line 291 def call_and_forget(*args) event_loop = EventLoop.current # run without creating an Evaluation object at first. It will be created # if the proc needs to wait for something. event_loop.run_queue.schedule_immediately event_loop.proc_fiber_pool.take_fiber, [self, args], false nil end |
#call_detached(*args) ⇒ Evaluation
Evaluates the concurrent proc detached from the current execution thread.
Evaluating the proc this way detaches the evaluation from the current thread of execution and schedules its start during the next iteration of the event loop.
To execute code this way you can also use the shortcut Kernel#concurrently.
268 269 270 271 272 273 |
# File 'lib/all/concurrently/proc.rb', line 268 def call_detached(*args) event_loop = EventLoop.current evaluation = @evaluation_class.new(event_loop.proc_fiber_pool.take_fiber) event_loop.run_queue.schedule_immediately evaluation, [self, args, [evaluation]] evaluation end |
#call_nonblock(*args) ⇒ Object, Evaluation
Evaluates the concurrent proc in a non-blocking manner.
Evaluating the proc this way executes its block of code immediately until the result is available or the evaluation needs to wait.
Dealing with this method is similar to dealing with IO#*_nonblock
.
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/all/concurrently/proc.rb', line 219 def call_nonblock(*args) event_loop = EventLoop.current run_queue = event_loop.run_queue evaluation_bucket = [] result = begin fiber = event_loop.proc_fiber_pool.take_fiber # ProcFiberPool#take_fiber might have accessed the current evaluation # if it needs to wait for the next iteration to get a fiber. Reset the # current evaluation afterwards! previous_evaluation = run_queue.current_evaluation run_queue.current_evaluation = nil run_queue.evaluation_class = @evaluation_class fiber.resume [self, args, evaluation_bucket] ensure run_queue.current_evaluation = previous_evaluation run_queue.evaluation_class = nil end case result when Evaluation # The proc fiber if the proc cannot be evaluated without waiting. # Inject the evaluation into it so it can be concluded later. evaluation_bucket << result result when Exception raise result else result end end |