Class: NewRelic::Agent::PipeChannelManager::Pipe
- Inherits:
-
Object
- Object
- NewRelic::Agent::PipeChannelManager::Pipe
- Defined in:
- lib/new_relic/agent/pipe_channel_manager.rb
Overview
Expected initial sequence of events for Pipe usage:
-
Pipe is created in parent process (read and write ends open)
-
Parent process forks
-
An after_fork hook is invoked in the child
-
From after_fork hook, child closes read end of pipe, and writes a ready marker on the pipe (after_fork_in_child).
-
The parent receives the ready marker, and closes the write end of the pipe in response (after_fork_in_parent).
After this sequence of steps, an exit (whether clean or not) of the child will result in the pipe being marked readable again, and giving an EOF marker (nil) when read. Note that closing of the unused ends of the pipe in the parent and child processes is essential in order for the EOF to be correctly triggered. The ready marker mechanism is used because there’s no easy hook for after_fork in the parent process.
This class provides message framing (separation of individual messages), but not serialization. Serialization / deserialization is the responsibility of clients.
Message framing works like this:
Each message sent across the pipe is preceded by a length tag that specifies the length of the message that immediately follows, in bytes. The length tags are serialized as unsigned big-endian long values, (4 bytes each). This means that the maximum theoretical message size is 4 GB - much larger than we’d ever need or want for this application.
Constant Summary collapse
- READY_MARKER =
'READY'
- NUM_LENGTH_BYTES =
4
Instance Attribute Summary collapse
-
#in ⇒ Object
Returns the value of attribute in.
-
#last_read ⇒ Object
readonly
Returns the value of attribute last_read.
-
#out ⇒ Object
Returns the value of attribute out.
-
#parent_pid ⇒ Object
readonly
Returns the value of attribute parent_pid.
Instance Method Summary collapse
- #after_fork_in_child ⇒ Object
- #after_fork_in_parent ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #deserialize_message_length(data) ⇒ Object
- #eof? ⇒ Boolean
-
#initialize ⇒ Pipe
constructor
A new instance of Pipe.
- #read ⇒ Object
- #serialize_message_length(data) ⇒ Object
- #write(data) ⇒ Object
Constructor Details
permalink #initialize ⇒ Pipe
Returns a new instance of Pipe.
64 65 66 67 68 69 70 71 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 64 def initialize @out, @in = IO.pipe if defined?(::Encoding::ASCII_8BIT) @in.set_encoding(::Encoding::ASCII_8BIT) end @last_read = Process.clock_gettime(Process::CLOCK_REALTIME) @parent_pid = $$ end |
Instance Attribute Details
permalink #in ⇒ Object
Returns the value of attribute in.
61 62 63 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 61 def in @in end |
permalink #last_read ⇒ Object (readonly)
Returns the value of attribute last_read.
62 63 64 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 62 def last_read @last_read end |
permalink #out ⇒ Object
Returns the value of attribute out.
61 62 63 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 61 def out @out end |
permalink #parent_pid ⇒ Object (readonly)
Returns the value of attribute parent_pid.
62 63 64 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 62 def parent_pid @parent_pid end |
Instance Method Details
permalink #after_fork_in_child ⇒ Object
[View source]
115 116 117 118 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 115 def after_fork_in_child @out.close unless @out.closed? write(READY_MARKER) end |
permalink #after_fork_in_parent ⇒ Object
[View source]
120 121 122 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 120 def after_fork_in_parent @in.close unless @in.closed? end |
permalink #close ⇒ Object
[View source]
73 74 75 76 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 73 def close @out.close unless @out.closed? @in.close unless @in.closed? end |
permalink #closed? ⇒ Boolean
124 125 126 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 124 def closed? @out.closed? && @in.closed? end |
permalink #deserialize_message_length(data) ⇒ Object
[View source]
82 83 84 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 82 def (data) data.unpack('L>').first end |
permalink #eof? ⇒ Boolean
111 112 113 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 111 def eof? !@out.closed? && @out.eof? end |
permalink #read ⇒ Object
[View source]
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 92 def read @in.close unless @in.closed? @last_read = Process.clock_gettime(Process::CLOCK_REALTIME) length_bytes = @out.read(NUM_LENGTH_BYTES) if length_bytes = (length_bytes) if @out.read() else length_hex = length_bytes.bytes.map { |b| b.to_s(16) }.join(' ') NewRelic::Agent.logger.error("Failed to deserialize message length from pipe. Bytes: [#{length_hex}]") nil end else NewRelic::Agent.logger.error('Failed to read bytes for length from pipe.') nil end end |
permalink #serialize_message_length(data) ⇒ Object
[View source]
78 79 80 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 78 def (data) [data.bytesize].pack('L>') end |
permalink #write(data) ⇒ Object
[View source]
86 87 88 89 90 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 86 def write(data) @out.close unless @out.closed? @in << (data) @in << data end |