Class: NewRelic::Agent::PipeChannelManager::Listener
- Inherits:
-
Object
- Object
- NewRelic::Agent::PipeChannelManager::Listener
- Defined in:
- lib/new_relic/agent/pipe_channel_manager.rb
Instance Attribute Summary collapse
-
#pipes ⇒ Object
This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.
-
#select_timeout ⇒ Object
This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#timeout ⇒ Object
This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.
Instance Method Summary collapse
- #close_all_pipes ⇒ Object
-
#initialize ⇒ Listener
constructor
A new instance of Listener.
- #register_pipe(id) ⇒ Object
- #start ⇒ Object
- #started? ⇒ Boolean
- #stop ⇒ Object
- #stop_listener_thread ⇒ Object
- #wake ⇒ Object
- #wakeup ⇒ Object
Constructor Details
#initialize ⇒ Listener
Returns a new instance of Listener.
140 141 142 143 144 145 146 147 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 140 def initialize @started = nil @pipes = {} @pipes_lock = Mutex.new @timeout = 360 @select_timeout = 60 end |
Instance Attribute Details
#pipes ⇒ Object
This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.
138 139 140 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 138 def pipes @pipes end |
#select_timeout ⇒ Object
This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.
138 139 140 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 138 def select_timeout @select_timeout end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
130 131 132 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 130 def thread @thread end |
#timeout ⇒ Object
This attr_accessor intentionally provides unsynchronized access to the within the Resque child process, and must be unsynchronized in order to avoid a potential deadlock in which the PipeChannelManager::Listener thread in the parent process is holding the @pipes_lock at the time of the fork.
138 139 140 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 138 def timeout @timeout end |
Instance Method Details
#close_all_pipes ⇒ Object
218 219 220 221 222 223 224 225 226 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 218 def close_all_pipes @pipes_lock.synchronize do @pipes.each do |id, pipe| # Needs else branch coverage pipe.close if pipe # rubocop:disable Style/SafeNavigation end @pipes = {} end end |
#register_pipe(id) ⇒ Object
153 154 155 156 157 158 159 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 153 def register_pipe(id) @pipes_lock.synchronize do @pipes[id] = Pipe.new end wakeup end |
#start ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 161 def start return if @started == true @started = true @thread = NewRelic::Agent::Threading::AgentThread.create('Pipe Channel Manager') do now = nil loop do clean_up_pipes pipes_to_listen_to = @pipes_lock.synchronize do @pipes.values.map { |pipe| pipe.out } + [wake.out] end if now NewRelic::Agent.record_metric( 'Supportability/Listeners', Process.clock_gettime(Process::CLOCK_REALTIME) - now ) end if ready = IO.select(pipes_to_listen_to, [], [], @select_timeout) now = Process.clock_gettime(Process::CLOCK_REALTIME) ready_pipes = ready[0] ready_pipes.each do |pipe| merge_data_from_pipe(pipe) unless pipe == wake.out end begin wake.out.read_nonblock(1) if ready_pipes.include?(wake.out) rescue IO::WaitReadable NewRelic::Agent.logger.error('Issue while reading from the ready pipe') NewRelic::Agent.logger.error("Ready pipes: #{ready_pipes.map(&:to_s)}, wake.out pipe: #{wake.out}") end end break unless should_keep_listening? end end sleep(0.001) # give time for the thread to spawn end |
#started? ⇒ Boolean
232 233 234 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 232 def started? @started end |
#stop ⇒ Object
209 210 211 212 213 214 215 216 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 209 def stop return unless @started == true stop_listener_thread close_all_pipes @wake.close @wake = nil end |
#stop_listener_thread ⇒ Object
203 204 205 206 207 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 203 def stop_listener_thread @started = false wakeup @thread.join end |
#wake ⇒ Object
228 229 230 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 228 def wake @wake ||= Pipe.new end |
#wakeup ⇒ Object
149 150 151 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 149 def wakeup wake.in << '.' end |