Class: Utilrb::EventLoop

Inherits:
Object show all
Defined in:
lib/utilrb/event_loop.rb

Overview

Simple event loop which supports timers and defers blocking operations to a thread pool those results are queued and being processed by the event loop thread at the end of each step.

All events must be code blocks which will be executed at the end of each step. There is no support for filtering or event propagations.

For an easy integration of ruby classes into the event loop the Forwardable#def_event_loop_delegator can be used.

Examples:

Example for using the EventLoop

event_loop = EventLoop.new 
event_loop.once do 
  puts "called once"
end

event_loop.every(1.0) do 
  puts "called every second"
end

callback = Proc.new |result|
  puts result 
end
event_loop.defer callback do
  sleep 2
  "result from the worker thread #{Thread.current}"
end

event_loop.exec

Author:

Defined Under Namespace

Modules: Forwardable Classes: Event, Timer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

A new EventLoop



185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/utilrb/event_loop.rb', line 185

def initialize
    @mutex = Mutex.new
    @events = Queue.new               # stores all events for the next step
    @timers = Set.new                 # stores all timers
    @every_cylce_events = Set.new     # stores all events which are added to @events each step
    @on_error = {}                    # stores on error callbacks
    @errors = Queue.new               # stores errors which will be re raised at the end of the step
    @number_of_events_to_process = 0  # number of events which are processed in the current step
    @thread_pool = ThreadPool.new
    @thread = Thread.current #the event loop thread
    @stop = nil
end

Instance Attribute Details

#thread_poolUtilrb::ThreadPool (readonly)

Underlying thread pool used to defer work.

Returns:



182
183
184
# File 'lib/utilrb/event_loop.rb', line 182

def thread_pool
  @thread_pool
end

Class Method Details

.cleanup_backtrace(&block) ⇒ Object



172
173
174
175
176
177
# File 'lib/utilrb/event_loop.rb', line 172

def self.cleanup_backtrace(&block)
    block.call
rescue
    $@.delete_if{|s| %r"#{Regexp.quote(__FILE__)}"o =~ s}
    ::Kernel::raise
end

Instance Method Details

#add_event(event, every_step = false) ⇒ Object

Adds an Event to the event loop

Parameters:

  • event (Event)

    The event

  • every_step (Boolean) (defaults to: false)

    Automatically added for every step



642
643
644
645
646
647
648
649
650
651
652
# File 'lib/utilrb/event_loop.rb', line 642

def add_event(event,every_step = false)
    raise ArgumentError "cannot add event which is ignored." if event.ignore?
    if every_step
        @mutex.synchronize do
            @every_cylce_events << event
        end
    else
        @events << event
    end
    event
end

#add_task(task) ⇒ Object

Adds a task to the thread pool

Parameters:



657
658
659
# File 'lib/utilrb/event_loop.rb', line 657

def add_task(task)
    thread_pool << task
end

#add_timer(timer) ⇒ Object

Adds a timer to the event loop

Parameters:

  • timer (Timer)

    The timer.



631
632
633
634
635
636
# File 'lib/utilrb/event_loop.rb', line 631

def add_timer(timer)
    @mutex.synchronize do
        raise "timer #{timer}:#{timer.doc} was already added!" if @timers.include?(timer)
        @timers << timer
    end
end

#async(work, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task

Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call

async method(:my_method) do |result,exception|
      if exception
              raise exception
      else
              puts result
      end
end

Parameters:

  • work (#call)

    The proc which will be deferred

Yields:

  • (result)

    The callback

  • (result, exception)

    The callback

Returns:



213
214
215
# File 'lib/utilrb/event_loop.rb', line 213

def async(work,*args,&callback)
    async_with_options(work,Hash.new,*args,&callback)
end

#async_every(work, options = Hash.new, *args, &callback) ⇒ EventLoop::Timer

Integrates a blocking operation call like #async but automatically re queues the call if period was passed and the task was finished by the worker thread. This means it will never re queue the call if the task blocks for ever and it will never simultaneously defer the call to more than one worker thread.

Parameters:

  • options (Hash) (defaults to: Hash.new)

    The options

  • work (#call)

    The proc which will be deferred

  • options (Hash) (defaults to: Hash.new)

    The options

  • options (Hash) (defaults to: Hash.new)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :period (Float)

    The period

  • :start (Boolean)

    Starts the timer right away (default = true)

  • :callback (Proc)

    The callback

  • :known_errors (class)

    Known erros which will be rescued

  • :on_error (Proc)

    Callback which is called when an error occured

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Returns:

Raises:

  • (ArgumentError)


252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/utilrb/event_loop.rb', line 252

def async_every(work,options=Hash.new,*args, &callback)
    options, async_opt = Kernel.filter_options(options,:period,:start => true)
    period = options[:period]
    raise ArgumentError,"No period given" unless period
    task = nil
    every period ,options[:start] do
        if !task
            task = async_with_options(work,async_opt,*args,&callback)
        elsif task.finished?
            add_task task
        end
        task
    end
end

#async_with_options(work, options = Hash.new, *args) {|result| ... } ⇒ Utilrb::ThreadPool::Task

Integrates a blocking operation call into the EventLoop like #defer but has a more suitable syntax for deferring a method call

async method(:my_method) do |result,exception|
      if exception
              raise exception
      else
              puts result
      end
end

Parameters:

  • work (#call)

    The proc which will be deferred

  • options (Hash) (defaults to: Hash.new)

    The options

Options Hash (options):

  • :callback (Proc)

    The callback

  • :known_errors (class)

    Known erros which will be rescued

  • :on_error (Proc)

    Callback which is called when an error occured

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Yields:

  • (result)

    The callback

  • (result, exception)

    The callback

Returns:



220
221
222
223
# File 'lib/utilrb/event_loop.rb', line 220

def async_with_options(work,options=Hash.new,*args,&callback)
    options[:callback] = callback
    defer(options,*args,&work)
end

#backlogFixnum

Number of tasks waiting for execution

Returns:

  • (Fixnum)

    the number of tasks



568
569
570
# File 'lib/utilrb/event_loop.rb', line 568

def backlog
    thread_pool.backlog
end

#call(&block) ⇒ Event, Object

Calls the give block in the event loop thread. If the current thread is the event loop thread it will execute it right a way and returns the result of the code block call. Otherwise, it returns an handler to the Event which was queued.

Returns:



375
376
377
378
379
380
381
# File 'lib/utilrb/event_loop.rb', line 375

def call(&block)
    if thread?
        block.call
    else
        once(&block)
    end
end

#cancel_timer(timer) ⇒ Object

Cancels the given timer if it is running otherwise it does nothing.

Parameters:

  • timer (Timer)

    The timer



492
493
494
495
496
# File 'lib/utilrb/event_loop.rb', line 492

def cancel_timer(timer)
    @mutex.synchronize do
        @timers.delete timer
    end
end

#clearObject

Clears all timers, events and errors



662
663
664
665
666
667
668
669
670
671
# File 'lib/utilrb/event_loop.rb', line 662

def clear
    thread_pool.clear

    @errors.clear
    @events.clear
    @mutex.synchronize do
        @every_cylce_events.clear
        @timers.clear
    end
end

#clear_errorsObject

Clears all errors which occurred during the last step and are not marked as known If the errors were not cleared they are re raised the next time step is called.



675
676
677
# File 'lib/utilrb/event_loop.rb', line 675

def clear_errors
    @errors.clear
end

#defer(options = Hash.new, *args, &block) ⇒ ThreadPool::Task

Integrates a blocking operation call into the EventLoop by executing it from a different thread. The given callback will be called from the EventLoop thread while processing its events after the call returned.

If the callback has an arity of 2 the exception will be passed to the callback as second parameter in an event of an error. The error is also passed to the error handlers of the even loop, but it will not be re raised if the error is marked as known

To overwrite an error the callback can return :ignore_error or a new instance of an error in an event of an error. In this case the error handlers of the event loop will not be called or called with the new error instance.

callback = Proc.new do |r,e|

   if e
      :ignore_error
   else
      puts r
   end
end

defer(=> callback) do

raise

end

Examples:

ignore a error

Parameters:

  • options (Hash) (defaults to: Hash.new)

    The options

  • options (Hash) (defaults to: Hash.new)

    The options of the task.

  • args (Array)

    The arguments for the code block

  • block (#call)

    The code block

Options Hash (options):

  • :callback (Proc)

    The callback

  • :known_errors (class)

    Known erros which will be rescued

  • :on_error (Proc)

    Callback which is called when an error occured

  • :sync_key (Object)

    The sync key

  • :callback (Proc)

    The callback

  • :default (Object)

    Default value returned when an error ocurred which was handled.

Returns:



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/utilrb/event_loop.rb', line 302

def defer(options=Hash.new,*args,&block)
    options, task_options = Kernel.filter_options(options,{:callback => nil,:known_errors => [],:on_error => nil})
    callback = options[:callback]
    error_callback = options[:on_error]
    known_errors = Array(options[:known_errors])

    task = Utilrb::ThreadPool::Task.new(task_options,*args,&block)
    # ensures that user callback is called from main thread and not from worker threads
    if callback
        task.callback do |result,exception|
            once do
                if callback.arity == 1
                    callback.call result if !exception
                else
                    e = callback.call result,exception
                    #check if the error was overwritten in the
                    #case of an error
                    exception = if exception
                                    if e.is_a?(Symbol) && e == :ignore_error
                                        nil
                                    elsif e.is_a? Exception
                                        e
                                    else
                                        exception
                                    end
                                end
                end
                if exception
                    error_callback.call(exception) if error_callback
                    raises = !known_errors.any? {|error| exception.is_a?(error)}
                    handle_error(exception,raises)
                end
            end
        end
    else
        task.callback do |result,exception|
            if exception
                raises = !known_errors.find {|error| exception.is_a?(error)}
                once do
                    error_callback.call(exception) if error_callback
                    handle_error(exception,raises)
                end
            end
        end
    end
    @mutex.synchronize do
        @thread_pool << task
    end
    task
end

#events?Boolean

Returns true if events are queued.

Returns:

  • (Boolean)


386
387
388
# File 'lib/utilrb/event_loop.rb', line 386

def events?
    !@events.empty? || !@errors.empty?
end

#every(period, start = true) { ... } ⇒ Utilrb::EventLoop::Timer

Adds a timer to the event loop which will execute the given code block with the given period from the event loop thread.

Parameters:

  • period (Float)

    The period of the timer in seconds

Yields:

  • The code block.

Returns:



398
399
400
401
402
# File 'lib/utilrb/event_loop.rb', line 398

def every(period,start=true,&block)
    timer = Timer.new(self,period,&block)
    timer.start if start # adds itself to the event loop
    timer
end

#every_step(&block) ⇒ Event

Executes the given block every step from the event loop thread.

Returns:



407
408
409
# File 'lib/utilrb/event_loop.rb', line 407

def every_step(&block)
    add_event Event.new(block),true
end

#exec(period = 0.05) { ... } ⇒ Object

Starts the event loop with the given period. If a code block is given it will be executed at the end of each step. This method will block until stop is called

Parameters:

  • period (Float) (defaults to: 0.05)

    The period

Yields:

  • The code block



516
517
518
519
520
521
522
523
524
525
# File 'lib/utilrb/event_loop.rb', line 516

def exec(period=0.05,&block)
    @stop = false
    reset_timers
    while !@stop
        last_step = Time.now
        step(last_step,&block)
        diff = (Time.now-last_step).to_f
        sleep(period-diff) if diff < period && !@stop
    end
end

#handle_error(error, save = true) ⇒ Object



679
680
681
682
683
684
685
686
687
688
689
# File 'lib/utilrb/event_loop.rb', line 679

def handle_error(error,save = true)
    call do
        on_error = @mutex.synchronize do
            @on_error.find_all{|key,e| error.is_a? key}.map(&:last).flatten
        end
        on_error.each do |handler|
            handler.call error
        end
        @errors << error if save == true
    end
end

#on_error(error_class) {|exception| ... } ⇒ Object

Errors caught during event loop callbacks are forwarded to registered code blocks. The code block is called from the event loop thread.

Parameters:

  • @error_class

    The error class the block should be called for

Yields:

  • (exception)

    The code block



417
418
419
420
421
422
# File 'lib/utilrb/event_loop.rb', line 417

def on_error(error_class,&block)
    @mutex.synchronize do
        @on_error[error_class] ||= []
        @on_error[error_class]  << block
    end
end

#on_errors(*error_classes) {|exception| ... } ⇒ Object

Errors caught during event loop callbacks are forwarded to registered code blocks. The code blocks are called from the event loop thread.

Parameters:

  • @error_classes

    The error classes the block should be called for

Yields:

  • (exception)

    The code block



430
431
432
433
434
435
# File 'lib/utilrb/event_loop.rb', line 430

def on_errors(*error_classes,&block)
    error_classes.flatten!
    error_classes.each do |error_class|
        on_error(error_class,&block)
    end
end

#once(delay = nil) { ... } ⇒ Utilrb::EventLoop::Timer, Event

Executes the given block in the next step from the event loop thread. Returns a Timer object if a delay is set otherwise an handler to the Event which was queued.

Yields:

  • The code block.

Returns:



359
360
361
362
363
364
365
366
367
# File 'lib/utilrb/event_loop.rb', line 359

def once(delay=nil,&block)
    raise ArgumentError "no block given" unless block
    if delay && delay > 0
        timer = Timer.new(self,delay,true,&block)
        timer.start(timer.period, false)
    else
        add_event(Event.new(block))
    end
end

#pretty_print(pp) ⇒ Object

:nodoc:



235
236
237
# File 'lib/utilrb/event_loop.rb', line 235

def pretty_print(pp) # :nodoc:
    pp.text "EventLoop "
end

#reraise_error(error) ⇒ Object

Raises:

  • (error)


577
578
579
# File 'lib/utilrb/event_loop.rb', line 577

def reraise_error(error)
    raise error, error.message, (error.backtrace || []) + caller(1)
end

#reset_timers(time = Time.now) ⇒ Object

Resets all timers to fire not before their hole period is passed counting from the given point in time.

Parameters:

  • time (Time) (defaults to: Time.now)

    The time



502
503
504
505
506
507
508
# File 'lib/utilrb/event_loop.rb', line 502

def reset_timers(time = Time.now)
    @mutex.synchronize do 
        @timers.each do |timer|
            timer.reset time
        end
    end
end

#shutdownObject

Shuts down the thread pool



573
574
575
# File 'lib/utilrb/event_loop.rb', line 573

def shutdown()
    thread_pool.shutdown()
end

#step(time = Time.now) { ... } ⇒ Object

Handles all current events and timers. If a code block is given it will be executed at the end.

Parameters:

  • time (Time) (defaults to: Time.now)

    The time the step is executed for.

Yields:

  • The code block



586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
# File 'lib/utilrb/event_loop.rb', line 586

def step(time = Time.now,&block)
    validate_thread
    reraise_error(@errors.shift) if !@errors.empty?

    #copy all work otherwise it would not be allowed to 
    #call any event loop functions from a timer
    timers,call = @mutex.synchronize do
                            @every_cylce_events.delete_if(&:ignore?)
                            @every_cylce_events.each do |event|
                                add_event event
                            end

                            # check all timers
                            temp_timers = @timers.find_all do |timer|
                                timer.timeout?(time)
                            end
                            # delete single shot timer which elapsed
                            @timers -= temp_timers.find_all(&:single_shot?)
                            [temp_timers,block]
                        end

    # handle all current events but not the one which are added during processing.
    # Step is recursively be called if wait_for is used insight an event code block.
    # To make sure that all events and timer are processed in the right order
    # @number_of_events_to_process and a second timeout check is used.
    @number_of_events_to_process = [@events.size,@number_of_events_to_process].max
    while @number_of_events_to_process > 0
        event = @events.pop
        @number_of_events_to_process -= 1
        handle_errors{event.call} unless event.ignore?
    end
    timers.each do |timer|
        next if timer.stopped?
        handle_errors{timer.call(time)} if timer.timeout?(time)
    end
    handle_errors{call.call} if call
    reraise_error(@errors.shift) if !@errors.empty?
    
    #allow thread pool to take over
    Thread.pass
end

#steps(period = 0.05, max_time = 1.0, &block) ⇒ Object

Steps with the given period until all worker thread are waiting for work

Parameters:

  • period (Float) (defaults to: 0.05)

    Ther period

  • (@see #step)


555
556
557
558
559
560
561
562
563
564
565
# File 'lib/utilrb/event_loop.rb', line 555

def steps(period = 0.05,max_time=1.0,&block)
    start = Time.now
    begin
        last_step = Time.now
        step(last_step,&block)
        time = Time.now
        break if max_time && max_time <= (time-start).to_f
        diff = (time-last_step).to_f
        sleep(period-diff) if diff < period && !@stop
    end while (thread_pool.process? || events?)
end

#stopObject

Stops the EventLoop after [#exec] was called.



528
529
530
# File 'lib/utilrb/event_loop.rb', line 528

def stop
    @stop = true
end

#sync(sync_key, *args) {|*args| ... } ⇒ Object

Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.

This is useful for instance member calls which are not thread safe.

Parameters:

  • sync_key (Object)

    The sync key

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block



226
227
228
# File 'lib/utilrb/event_loop.rb', line 226

def sync(sync_key,*args,&block)
    thread_pool.sync(sync_key,*args,&block)
end

#sync_timeout(sync_key, timeout, *args) {|*args| ... } ⇒ Object

Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.

Parameters:

  • sync_key (Object)

    The sync key

  • timeout (Float)

    The timeout

Yields:

  • (*args)

    the code block block

Returns:

  • (Object)

    The result of the code block



231
232
233
# File 'lib/utilrb/event_loop.rb', line 231

def sync_timeout(sync_key,timeout,*args,&block)
    thread_pool.sync_timeout(sync_key,timeout,*args,&block)
end

#thread=(thread) ⇒ Object

Sets the event loop thread. By default it is set to the one the EventLoop was started from.

@param thread The thread



463
464
465
466
467
# File 'lib/utilrb/event_loop.rb', line 463

def thread=(thread)
    @mutex.synchronize do
        @thread = thread
    end
end

#thread?Boolean

Returns true if the current thread is the event loop thread.

Returns:

  • (Boolean)


449
450
451
452
453
454
455
456
457
# File 'lib/utilrb/event_loop.rb', line 449

def thread?
    @mutex.synchronize do
        if Thread.current == @thread
            true
        else
            false
        end
    end
end

#timer?(timer) ⇒ Boolean

Returns true if the given timer is running.

Parameters:

  • timer (Timer)

    The timer.

Returns:

  • (Boolean)


473
474
475
476
477
# File 'lib/utilrb/event_loop.rb', line 473

def timer?(timer)
    @mutex.synchronize do
        @timers.include? timer
    end
end

#timersObject

Returns all currently running timers.



482
483
484
485
486
# File 'lib/utilrb/event_loop.rb', line 482

def timers
    @mutex.synchronize do
        @timers.dup
    end
end

#validate_threadObject

Raises if the current thread is not the event loop thread (by default the one the event loop was started from).

Raises:

  • (RuntimeError)


441
442
443
# File 'lib/utilrb/event_loop.rb', line 441

def validate_thread
    raise "current thread is not the event loop thread" if !thread?
end

#wait_for(period = 0.05, timeout = nil, &block) ⇒ Object

Steps with the given period until the given block returns true.

Parameters:

  • period (Float) (defaults to: 0.05)

    The period

  • timeout (Float) (defaults to: nil)

    The timeout in seconds

Yield Returns:

  • (Boolean)


538
539
540
541
542
543
544
545
546
547
548
# File 'lib/utilrb/event_loop.rb', line 538

def wait_for(period=0.05,timeout=nil,&block)
    start = Time.now
    old_stop = @stop
    exec period do
        stop if block.call
        if timeout && timeout <= (Time.now-start).to_f
            raise RuntimeError,"Timeout during wait_for"
        end
    end
    @stop = old_stop
end