Class: TingYun::Agent::Event::EventLoop
- Inherits:
-
Object
- Object
- TingYun::Agent::Event::EventLoop
- Defined in:
- lib/ting_yun/agent/event/event_loop.rb
Defined Under Namespace
Classes: Timer
Instance Method Summary collapse
- #dispatch_event(event, args) ⇒ Object
- #fire(event, *args) ⇒ Object
- #fire_after(interval, event) ⇒ Object
- #fire_every(interval, event) ⇒ Object
- #fire_timer(timer) ⇒ Object
- #fire_timers ⇒ Object
-
#initialize ⇒ EventLoop
constructor
A new instance of EventLoop.
- #next_timeout ⇒ Object
- #on(event, &blk) ⇒ Object
- #prune_timers ⇒ Object
- #reschedule_timer_for_event(e) ⇒ Object
- #run ⇒ Object
- #run_once(nonblock = false) ⇒ Object
- #set_timer(timer) ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
- #wait_to_run(nonblock) ⇒ Object
- #wakeup ⇒ Object
Constructor Details
#initialize ⇒ EventLoop
Returns a new instance of EventLoop.
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 57 def initialize @self_pipe_rd, @self_pipe_wr = IO.pipe @event_queue = Queue.new @stopped = false @timers = {} @subscriptions = Hash.new { |h,k| h[k] = [] } @subscriptions[:__add_timer] << Proc.new { |t| set_timer(t) } @subscriptions[:__add_event] << Proc.new { |e, blk| @subscriptions[e] << blk } end |
Instance Method Details
#dispatch_event(event, args) ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 142 def dispatch_event(event, args) TingYun::Agent.logger.debug("EventLoop: Dispatching event '#{event}' with #{@subscriptions[event].size} callback(s).") errors = [] @subscriptions[event].each do |s| begin s.call(*args) rescue => e errors << e raise end end if !errors.empty? ::TingYun::Agent.logger.error "#{errors.size} error(s) running task for event '#{event}' in Agent Event Loop:", *errors end end |
#fire(event, *args) ⇒ Object
168 169 170 171 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 168 def fire(event, *args) @event_queue << [event, args] wakeup end |
#fire_after(interval, event) ⇒ Object
178 179 180 181 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 178 def fire_after(interval, event) ::TingYun::Agent.logger.debug "Firing event #{event} after #{interval} seconds." fire(:__add_timer, Timer.new(interval, event, false)) end |
#fire_every(interval, event) ⇒ Object
173 174 175 176 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 173 def fire_every(interval, event) ::TingYun::Agent.logger.debug "Firing event #{event} every #{interval} seconds." fire(:__add_timer, Timer.new(interval, event, true)) end |
#fire_timer(timer) ⇒ Object
131 132 133 134 135 136 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 131 def fire_timer(timer) if timer.due? @event_queue << [timer.event] timer.set_fired_time end end |
#fire_timers ⇒ Object
125 126 127 128 129 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 125 def fire_timers @timers.each do |event, timer| fire_timer(timer) end end |
#next_timeout ⇒ Object
81 82 83 84 85 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 81 def next_timeout return nil if @timers.empty? timeout = @timers.values.map(&:next_fire_time).min - Time.now timeout < 0 ? 0 : timeout end |
#on(event, &blk) ⇒ Object
164 165 166 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 164 def on(event, &blk) fire(:__add_event, event, blk) end |
#prune_timers ⇒ Object
138 139 140 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 138 def prune_timers @timers.delete_if { |e, t| t.finished? } end |
#reschedule_timer_for_event(e) ⇒ Object
160 161 162 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 160 def reschedule_timer_for_event(e) @timers[e].reschedule if @timers[e] end |
#run ⇒ Object
96 97 98 99 100 101 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 96 def run ::TingYun::Agent.logger.debug "Running event loop" while !stopped? run_once end end |
#run_once(nonblock = false) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 103 def run_once(nonblock=false) wait_to_run(nonblock) prune_timers fire_timers until @event_queue.empty? evt, args = @event_queue.pop dispatch_event(evt, args) reschedule_timer_for_event(evt) end end |
#set_timer(timer) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 68 def set_timer(timer) existing_timer = @timers[timer.event] if existing_timer elapsed_interval = Time.now - existing_timer.last_interval_start timer.advance(elapsed_interval) end @timers[timer.event] = timer fire_timer(timer) end |
#stop ⇒ Object
91 92 93 94 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 91 def stop @stopped = true wakeup end |
#stopped? ⇒ Boolean
87 88 89 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 87 def stopped? @stopped end |
#wait_to_run(nonblock) ⇒ Object
116 117 118 119 120 121 122 123 |
# File 'lib/ting_yun/agent/event/event_loop.rb', line 116 def wait_to_run(nonblock) timeout = nonblock ? 0 : next_timeout ready = IO.select([@self_pipe_rd], nil, nil, timeout) if ready && ready[0] && ready[0][0] && ready[0][0] == @self_pipe_rd @self_pipe_rd.read(1) end end |