Class: NewRelic::Agent::EventLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/new_relic/agent/event_loop.rb

Defined Under Namespace

Classes: Timer

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

Returns a new instance of EventLoop.



58
59
60
61
62
63
64
65
66
67
# File 'lib/new_relic/agent/event_loop.rb', line 58

def initialize
  @self_pipe_rd, @self_pipe_wr = IO.pipe
  @event_queue = Queue.new
  @stopped = false
  @timers = {}

  @subscriptions = Hash.new { |h, k| h[k] = [] }
  @subscriptions[:__add_timer] << proc { |t| set_timer(t) }
  @subscriptions[:__add_event] << proc { |e, blk| @subscriptions[e] << blk }
end

Instance Method Details

#dispatch_event(event, args) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/new_relic/agent/event_loop.rb', line 144

def dispatch_event(event, args)
  NewRelic::Agent.logger.debug("EventLoop: Dispatching event '#{event}' with #{@subscriptions[event].size} callback(s).")

  errors = []
  @subscriptions[event].each do |s|
    begin
      s.call(*args)
    rescue NewRelic::Agent::ForceRestartException, NewRelic::Agent::ForceDisconnectException
      raise
    rescue => e
      errors << e
    end
  end

  if !errors.empty?
    ::NewRelic::Agent.logger.error("#{errors.size} error(s) running task for event '#{event}' in agent event loop:", *errors)
  end
end

#fire(event, *args) ⇒ Object



171
172
173
174
# File 'lib/new_relic/agent/event_loop.rb', line 171

def fire(event, *args)
  @event_queue << [event, args]
  wakeup
end

#fire_after(interval, event) ⇒ Object



181
182
183
184
# File 'lib/new_relic/agent/event_loop.rb', line 181

def fire_after(interval, event)
  ::NewRelic::Agent.logger.debug("Firing event #{event} after #{interval} seconds.")
  fire(:__add_timer, Timer.new(interval, event, false))
end

#fire_every(interval, event) ⇒ Object



176
177
178
179
# File 'lib/new_relic/agent/event_loop.rb', line 176

def fire_every(interval, event)
  ::NewRelic::Agent.logger.debug("Firing event #{event} every #{interval} seconds.")
  fire(:__add_timer, Timer.new(interval, event, true))
end

#fire_timer(timer) ⇒ Object



133
134
135
136
137
138
# File 'lib/new_relic/agent/event_loop.rb', line 133

def fire_timer(timer)
  if timer.due?
    @event_queue << [timer.event]
    timer.set_fired_time
  end
end

#fire_timersObject



127
128
129
130
131
# File 'lib/new_relic/agent/event_loop.rb', line 127

def fire_timers
  @timers.each do |event, timer|
    fire_timer(timer)
  end
end

#next_timeoutObject



82
83
84
85
86
87
# File 'lib/new_relic/agent/event_loop.rb', line 82

def next_timeout
  return nil if @timers.empty?

  timeout = @timers.values.map(&:next_fire_time).min - Process.clock_gettime(Process::CLOCK_REALTIME)
  [timeout, 0].max
end

#on(event, &blk) ⇒ Object



167
168
169
# File 'lib/new_relic/agent/event_loop.rb', line 167

def on(event, &blk)
  fire(:__add_event, event, blk)
end

#prune_timersObject



140
141
142
# File 'lib/new_relic/agent/event_loop.rb', line 140

def prune_timers
  @timers.delete_if { |e, t| t.finished? }
end

#reschedule_timer_for_event(e) ⇒ Object



163
164
165
# File 'lib/new_relic/agent/event_loop.rb', line 163

def reschedule_timer_for_event(e)
  @timers[e]&.reschedule
end

#runObject



98
99
100
101
102
103
# File 'lib/new_relic/agent/event_loop.rb', line 98

def run
  ::NewRelic::Agent.logger.debug('Running event loop')
  while !stopped?
    run_once
  end
end

#run_once(nonblock = false) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/new_relic/agent/event_loop.rb', line 105

def run_once(nonblock = false)
  wait_to_run(nonblock)

  prune_timers
  fire_timers

  until @event_queue.empty?
    evt, args = @event_queue.pop
    dispatch_event(evt, args)
    reschedule_timer_for_event(evt)
  end
end

#set_timer(timer) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/new_relic/agent/event_loop.rb', line 69

def set_timer(timer)
  existing_timer = @timers[timer.event]

  if existing_timer
    elapsed_interval = Process.clock_gettime(Process::CLOCK_REALTIME) - existing_timer.last_interval_start
    timer.advance(elapsed_interval)
  end

  @timers[timer.event] = timer

  fire_timer(timer)
end

#stopObject



93
94
95
96
# File 'lib/new_relic/agent/event_loop.rb', line 93

def stop
  @stopped = true
  wakeup
end

#stopped?Boolean

Returns:

  • (Boolean)


89
90
91
# File 'lib/new_relic/agent/event_loop.rb', line 89

def stopped?
  @stopped
end

#wait_to_run(nonblock) ⇒ Object



118
119
120
121
122
123
124
125
# File 'lib/new_relic/agent/event_loop.rb', line 118

def wait_to_run(nonblock)
  timeout = nonblock ? 0 : next_timeout
  ready = IO.select([@self_pipe_rd], nil, nil, timeout)

  if ready && ready[0] && ready[0][0] && ready[0][0] == @self_pipe_rd
    @self_pipe_rd.read(1)
  end
end

#wakeupObject



186
187
188
189
190
191
192
# File 'lib/new_relic/agent/event_loop.rb', line 186

def wakeup
  begin
    @self_pipe_wr.write_nonblock('.')
  rescue Errno::EAGAIN
    ::NewRelic::Agent.logger.debug('Failed to wakeup event loop')
  end
end