Class: Utilrb::EventLoop
Overview
Simple event loop which supports timers and defers blocking operations to a thread pool those results are queued and being processed by the event loop thread at the end of each step.
All events must be code blocks which will be executed at the end of each step. There is no support for filtering or event propagations.
For an easy integration of ruby classes into the event loop the Forwardable#def_event_loop_delegator can be used.
Defined Under Namespace
Modules: Forwardable Classes: Event, Timer
Instance Attribute Summary collapse
-
#thread_pool ⇒ Utilrb::ThreadPool
readonly
Underlying thread pool used to defer work.
Class Method Summary collapse
Instance Method Summary collapse
-
#add_event(event, every_step = false) ⇒ Object
Adds an Event to the event loop.
-
#add_task(task) ⇒ Object
Adds a task to the thread pool.
-
#add_timer(timer) ⇒ Object
Adds a timer to the event loop.
-
#async(work, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task
Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call.
-
#async_every(work, options = Hash.new, *args, &callback) ⇒ EventLoop::Timer
Integrates a blocking operation call like #async but automatically re queues the call if period was passed and the task was finished by the worker thread.
-
#async_with_options(work, options = Hash.new, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task
Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call.
-
#backlog ⇒ Fixnum
Number of tasks waiting for execution.
-
#call(&block) ⇒ Event, Object
Calls the give block in the event loop thread.
-
#cancel_timer(timer) ⇒ Object
Cancels the given timer if it is running otherwise it does nothing.
-
#clear ⇒ Object
Clears all timers, events and errors.
-
#clear_errors ⇒ Object
Clears all errors which occurred during the last step and are not marked as known If the errors were not cleared they are re raised the next time step is called.
-
#defer(options = Hash.new, *args, &block) ⇒ ThreadPool::Task
Integrates a blocking operation call into the EventLoop by executing it from a different thread.
-
#events? ⇒ Boolean
Returns true if events are queued.
-
#every(period, start = true) { ... } ⇒ Utilrb::EventLoop::Timer
Adds a timer to the event loop which will execute the given code block with the given period from the event loop thread.
-
#every_step(&block) ⇒ Event
Executes the given block every step from the event loop thread.
-
#exec(period = 0.05) { ... } ⇒ Object
Starts the event loop with the given period.
- #handle_error(error, save = true) ⇒ Object
-
#initialize ⇒ EventLoop
constructor
A new EventLoop.
-
#on_error(error_class) {|exception| ... } ⇒ Object
Errors caught during event loop callbacks are forwarded to registered code blocks.
-
#on_errors(*error_classes) {|exception| ... } ⇒ Object
Errors caught during event loop callbacks are forwarded to registered code blocks.
-
#once(delay = nil) { ... } ⇒ Utilrb::EventLoop::Timer, Event
Executes the given block in the next step from the event loop thread.
-
#pretty_print(pp) ⇒ Object
:nodoc:.
- #reraise_error(error) ⇒ Object
-
#reset_timers(time = Time.now) ⇒ Object
Resets all timers to fire not before their hole period is passed counting from the given point in time.
-
#shutdown ⇒ Object
Shuts down the thread pool.
-
#step(time = Time.now) { ... } ⇒ Object
Handles all current events and timers.
-
#steps(period = 0.05, max_time = 1.0, &block) ⇒ Object
Steps with the given period until all worker thread are waiting for work.
-
#stop ⇒ Object
Stops the EventLoop after [#exec] was called.
-
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
-
#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object
Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.
-
#thread=(thread) ⇒ Object
Sets the event loop thread.
-
#thread? ⇒ Boolean
Returns true if the current thread is the event loop thread.
-
#timer?(timer) ⇒ Boolean
Returns true if the given timer is running.
-
#timers ⇒ Object
Returns all currently running timers.
-
#validate_thread ⇒ Object
Raises if the current thread is not the event loop thread (by default the one the event loop was started from).
-
#wait_for(period = 0.05, timeout = nil, &block) ⇒ Object
Steps with the given period until the given block returns true.
Constructor Details
#initialize ⇒ EventLoop
A new EventLoop
185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/utilrb/event_loop.rb', line 185 def initialize @mutex = Mutex.new @events = Queue.new # stores all events for the next step @timers = Set.new # stores all timers @every_cylce_events = Set.new # stores all events which are added to @events each step @on_error = {} # stores on error callbacks @errors = Queue.new # stores errors which will be re raised at the end of the step @number_of_events_to_process = 0 # number of events which are processed in the current step @thread_pool = ThreadPool.new @thread = Thread.current #the event loop thread @stop = nil end |
Instance Attribute Details
#thread_pool ⇒ Utilrb::ThreadPool (readonly)
Underlying thread pool used to defer work.
182 183 184 |
# File 'lib/utilrb/event_loop.rb', line 182 def thread_pool @thread_pool end |
Class Method Details
Instance Method Details
#add_event(event, every_step = false) ⇒ Object
Adds an Event to the event loop
642 643 644 645 646 647 648 649 650 651 652 |
# File 'lib/utilrb/event_loop.rb', line 642 def add_event(event,every_step = false) raise ArgumentError "cannot add event which is ignored." if event.ignore? if every_step @mutex.synchronize do @every_cylce_events << event end else @events << event end event end |
#add_task(task) ⇒ Object
Adds a task to the thread pool
657 658 659 |
# File 'lib/utilrb/event_loop.rb', line 657 def add_task(task) thread_pool << task end |
#add_timer(timer) ⇒ Object
Adds a timer to the event loop
631 632 633 634 635 636 |
# File 'lib/utilrb/event_loop.rb', line 631 def add_timer(timer) @mutex.synchronize do raise "timer #{timer}:#{timer.doc} was already added!" if @timers.include?(timer) @timers << timer end end |
#async(work, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task
Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call
async method(:my_method) do |result,exception|
if exception
raise exception
else
puts result
end
end
213 214 215 |
# File 'lib/utilrb/event_loop.rb', line 213 def async(work,*args,&callback) (work,Hash.new,*args,&callback) end |
#async_every(work, options = Hash.new, *args, &callback) ⇒ EventLoop::Timer
Integrates a blocking operation call like #async but automatically re queues the call if period was passed and the task was finished by the worker thread. This means it will never re queue the call if the task blocks for ever and it will never simultaneously defer the call to more than one worker thread.
252 253 254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/utilrb/event_loop.rb', line 252 def async_every(work,=Hash.new,*args, &callback) , async_opt = Kernel.(,:period,:start => true) period = [:period] raise ArgumentError,"No period given" unless period task = nil every period ,[:start] do if !task task = (work,async_opt,*args,&callback) elsif task.finished? add_task task end task end end |
#async_with_options(work, options = Hash.new, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task
Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call
async method(:my_method) do |result,exception|
if exception
raise exception
else
puts result
end
end
220 221 222 223 |
# File 'lib/utilrb/event_loop.rb', line 220 def (work,=Hash.new,*args,&callback) [:callback] = callback defer(,*args,&work) end |
#backlog ⇒ Fixnum
Number of tasks waiting for execution
568 569 570 |
# File 'lib/utilrb/event_loop.rb', line 568 def backlog thread_pool.backlog end |
#call(&block) ⇒ Event, Object
Calls the give block in the event loop thread. If the current thread is the event loop thread it will execute it right a way and returns the result of the code block call. Otherwise, it returns an handler to the Event which was queued.
375 376 377 378 379 380 381 |
# File 'lib/utilrb/event_loop.rb', line 375 def call(&block) if thread? block.call else once(&block) end end |
#cancel_timer(timer) ⇒ Object
Cancels the given timer if it is running otherwise it does nothing.
492 493 494 495 496 |
# File 'lib/utilrb/event_loop.rb', line 492 def cancel_timer(timer) @mutex.synchronize do @timers.delete timer end end |
#clear ⇒ Object
Clears all timers, events and errors
662 663 664 665 666 667 668 669 670 671 |
# File 'lib/utilrb/event_loop.rb', line 662 def clear thread_pool.clear @errors.clear @events.clear @mutex.synchronize do @every_cylce_events.clear @timers.clear end end |
#clear_errors ⇒ Object
Clears all errors which occurred during the last step and are not marked as known If the errors were not cleared they are re raised the next time step is called.
675 676 677 |
# File 'lib/utilrb/event_loop.rb', line 675 def clear_errors @errors.clear end |
#defer(options = Hash.new, *args, &block) ⇒ ThreadPool::Task
Integrates a blocking operation call into the EventLoop by executing it from a different thread. The given callback will be called from the EventLoop thread while processing its events after the call returned.
If the callback has an arity of 2 the exception will be passed to the callback as second parameter in an event of an error. The error is also passed to the error handlers of the even loop, but it will not be re raised if the error is marked as known
To overwrite an error the callback can return :ignore_error or a new instance of an error in an event of an error. In this case the error handlers of the event loop will not be called or called with the new error instance.
callback = Proc.new do |r,e|
if e
:ignore_error
else
puts r
end
end
defer(=> callback) do
raise
end
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/utilrb/event_loop.rb', line 302 def defer(=Hash.new,*args,&block) , = Kernel.(,{:callback => nil,:known_errors => [],:on_error => nil}) callback = [:callback] error_callback = [:on_error] known_errors = Array([:known_errors]) task = Utilrb::ThreadPool::Task.new(,*args,&block) # ensures that user callback is called from main thread and not from worker threads if callback task.callback do |result,exception| once do if callback.arity == 1 callback.call result if !exception else e = callback.call result,exception #check if the error was overwritten in the #case of an error exception = if exception if e.is_a?(Symbol) && e == :ignore_error nil elsif e.is_a? Exception e else exception end end end if exception error_callback.call(exception) if error_callback raises = !known_errors.any? {|error| exception.is_a?(error)} handle_error(exception,raises) end end end else task.callback do |result,exception| if exception raises = !known_errors.find {|error| exception.is_a?(error)} once do error_callback.call(exception) if error_callback handle_error(exception,raises) end end end end @mutex.synchronize do @thread_pool << task end task end |
#events? ⇒ Boolean
Returns true if events are queued.
386 387 388 |
# File 'lib/utilrb/event_loop.rb', line 386 def events? !@events.empty? || !@errors.empty? end |
#every(period, start = true) { ... } ⇒ Utilrb::EventLoop::Timer
Adds a timer to the event loop which will execute the given code block with the given period from the event loop thread.
398 399 400 401 402 |
# File 'lib/utilrb/event_loop.rb', line 398 def every(period,start=true,&block) timer = Timer.new(self,period,&block) timer.start if start # adds itself to the event loop timer end |
#every_step(&block) ⇒ Event
Executes the given block every step from the event loop thread.
407 408 409 |
# File 'lib/utilrb/event_loop.rb', line 407 def every_step(&block) add_event Event.new(block),true end |
#exec(period = 0.05) { ... } ⇒ Object
Starts the event loop with the given period. If a code block is given it will be executed at the end of each step. This method will block until stop is called
516 517 518 519 520 521 522 523 524 525 |
# File 'lib/utilrb/event_loop.rb', line 516 def exec(period=0.05,&block) @stop = false reset_timers while !@stop last_step = Time.now step(last_step,&block) diff = (Time.now-last_step).to_f sleep(period-diff) if diff < period && !@stop end end |
#handle_error(error, save = true) ⇒ Object
679 680 681 682 683 684 685 686 687 688 689 |
# File 'lib/utilrb/event_loop.rb', line 679 def handle_error(error,save = true) call do on_error = @mutex.synchronize do @on_error.find_all{|key,e| error.is_a? key}.map(&:last).flatten end on_error.each do |handler| handler.call error end @errors << error if save == true end end |
#on_error(error_class) {|exception| ... } ⇒ Object
Errors caught during event loop callbacks are forwarded to registered code blocks. The code block is called from the event loop thread.
417 418 419 420 421 422 |
# File 'lib/utilrb/event_loop.rb', line 417 def on_error(error_class,&block) @mutex.synchronize do @on_error[error_class] ||= [] @on_error[error_class] << block end end |
#on_errors(*error_classes) {|exception| ... } ⇒ Object
Errors caught during event loop callbacks are forwarded to registered code blocks. The code blocks are called from the event loop thread.
430 431 432 433 434 435 |
# File 'lib/utilrb/event_loop.rb', line 430 def on_errors(*error_classes,&block) error_classes.flatten! error_classes.each do |error_class| on_error(error_class,&block) end end |
#once(delay = nil) { ... } ⇒ Utilrb::EventLoop::Timer, Event
Executes the given block in the next step from the event loop thread. Returns a Timer object if a delay is set otherwise an handler to the Event which was queued.
359 360 361 362 363 364 365 366 367 |
# File 'lib/utilrb/event_loop.rb', line 359 def once(delay=nil,&block) raise ArgumentError "no block given" unless block if delay && delay > 0 timer = Timer.new(self,delay,true,&block) timer.start(timer.period, false) else add_event(Event.new(block)) end end |
#pretty_print(pp) ⇒ Object
:nodoc:
235 236 237 |
# File 'lib/utilrb/event_loop.rb', line 235 def pretty_print(pp) # :nodoc: pp.text "EventLoop " end |
#reraise_error(error) ⇒ Object
577 578 579 |
# File 'lib/utilrb/event_loop.rb', line 577 def reraise_error(error) raise error, error., (error.backtrace || []) + caller(1) end |
#reset_timers(time = Time.now) ⇒ Object
Resets all timers to fire not before their hole period is passed counting from the given point in time.
502 503 504 505 506 507 508 |
# File 'lib/utilrb/event_loop.rb', line 502 def reset_timers(time = Time.now) @mutex.synchronize do @timers.each do |timer| timer.reset time end end end |
#shutdown ⇒ Object
Shuts down the thread pool
573 574 575 |
# File 'lib/utilrb/event_loop.rb', line 573 def shutdown() thread_pool.shutdown() end |
#step(time = Time.now) { ... } ⇒ Object
Handles all current events and timers. If a code block is given it will be executed at the end.
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 |
# File 'lib/utilrb/event_loop.rb', line 586 def step(time = Time.now,&block) validate_thread reraise_error(@errors.shift) if !@errors.empty? #copy all work otherwise it would not be allowed to #call any event loop functions from a timer timers,call = @mutex.synchronize do @every_cylce_events.delete_if(&:ignore?) @every_cylce_events.each do |event| add_event event end # check all timers temp_timers = @timers.find_all do |timer| timer.timeout?(time) end # delete single shot timer which elapsed @timers -= temp_timers.find_all(&:single_shot?) [temp_timers,block] end # handle all current events but not the one which are added during processing. # Step is recursively be called if wait_for is used insight an event code block. # To make sure that all events and timer are processed in the right order # @number_of_events_to_process and a second timeout check is used. @number_of_events_to_process = [@events.size,@number_of_events_to_process].max while @number_of_events_to_process > 0 event = @events.pop @number_of_events_to_process -= 1 handle_errors{event.call} unless event.ignore? end timers.each do |timer| next if timer.stopped? handle_errors{timer.call(time)} if timer.timeout?(time) end handle_errors{call.call} if call reraise_error(@errors.shift) if !@errors.empty? #allow thread pool to take over Thread.pass end |
#steps(period = 0.05, max_time = 1.0, &block) ⇒ Object
Steps with the given period until all worker thread are waiting for work
555 556 557 558 559 560 561 562 563 564 565 |
# File 'lib/utilrb/event_loop.rb', line 555 def steps(period = 0.05,max_time=1.0,&block) start = Time.now begin last_step = Time.now step(last_step,&block) time = Time.now break if max_time && max_time <= (time-start).to_f diff = (time-last_step).to_f sleep(period-diff) if diff < period && !@stop end while (thread_pool.process? || events?) end |
#stop ⇒ Object
Stops the EventLoop after [#exec] was called.
528 529 530 |
# File 'lib/utilrb/event_loop.rb', line 528 def stop @stop = true end |
#sync(sync_key, *args) {|*args| ... } ⇒ Object
Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.
This is useful for instance member calls which are not thread safe.
226 227 228 |
# File 'lib/utilrb/event_loop.rb', line 226 def sync(sync_key,*args,&block) thread_pool.sync(sync_key,*args,&block) end |
#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object
Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.
231 232 233 |
# File 'lib/utilrb/event_loop.rb', line 231 def sync_timeout(sync_key,timeout,*args,&block) thread_pool.sync_timeout(sync_key,timeout,*args,&block) end |
#thread=(thread) ⇒ Object
Sets the event loop thread. By default it is set to the one the EventLoop was started from.
@param thread The thread
463 464 465 466 467 |
# File 'lib/utilrb/event_loop.rb', line 463 def thread=(thread) @mutex.synchronize do @thread = thread end end |
#thread? ⇒ Boolean
Returns true if the current thread is the event loop thread.
449 450 451 452 453 454 455 456 457 |
# File 'lib/utilrb/event_loop.rb', line 449 def thread? @mutex.synchronize do if Thread.current == @thread true else false end end end |
#timer?(timer) ⇒ Boolean
Returns true if the given timer is running.
473 474 475 476 477 |
# File 'lib/utilrb/event_loop.rb', line 473 def timer?(timer) @mutex.synchronize do @timers.include? timer end end |
#timers ⇒ Object
Returns all currently running timers.
482 483 484 485 486 |
# File 'lib/utilrb/event_loop.rb', line 482 def timers @mutex.synchronize do @timers.dup end end |
#validate_thread ⇒ Object
Raises if the current thread is not the event loop thread (by default the one the event loop was started from).
441 442 443 |
# File 'lib/utilrb/event_loop.rb', line 441 def validate_thread raise "current thread is not the event loop thread" if !thread? end |
#wait_for(period = 0.05, timeout = nil, &block) ⇒ Object
Steps with the given period until the given block returns true.
538 539 540 541 542 543 544 545 546 547 548 |
# File 'lib/utilrb/event_loop.rb', line 538 def wait_for(period=0.05,timeout=nil,&block) start = Time.now old_stop = @stop exec period do stop if block.call if timeout && timeout <= (Time.now-start).to_f raise RuntimeError,"Timeout during wait_for" end end @stop = old_stop end |