Class: NewRelic::Agent::PipeChannelManager::Pipe
- Inherits:
-
Object
- Object
- NewRelic::Agent::PipeChannelManager::Pipe
- Defined in:
- lib/new_relic/agent/pipe_channel_manager.rb
Overview
Expected initial sequence of events for Pipe usage:
-
Pipe is created in parent process (read and write ends open)
-
Parent process forks
-
An after_fork hook is invoked in the child
-
From after_fork hook, child closes read end of pipe, and writes a ready marker on the pipe (after_fork_in_child).
-
The parent receives the ready marker, and closes the write end of the pipe in response (after_fork_in_parent).
After this sequence of steps, an exit (whether clean or not) of the child will result in the pipe being marked readable again, and giving an EOF marker (nil) when read. Note that closing of the unused ends of the pipe in the parent and child processes is essential in order for the EOF to be correctly triggered. The ready marker mechanism is used because there’s no easy hook for after_fork in the parent process.
This class provides message framing (separation of individual messages), but not serialization. Serialization / deserialization is the responsibility of clients.
Message framing works like this:
Each message sent across the pipe is preceded by a length tag that specifies the length of the message that immediately follows, in bytes. The length tags are serialized as unsigned big-endian long values, (4 bytes each). This means that the maximum theoretical message size is 4 GB - much larger than we’d ever need or want for this application.
Constant Summary collapse
- READY_MARKER =
'READY'
- NUM_LENGTH_BYTES =
4
Instance Attribute Summary collapse
-
#in ⇒ Object
Returns the value of attribute in.
-
#last_read ⇒ Object
readonly
Returns the value of attribute last_read.
-
#out ⇒ Object
Returns the value of attribute out.
-
#parent_pid ⇒ Object
readonly
Returns the value of attribute parent_pid.
Instance Method Summary collapse
- #after_fork_in_child ⇒ Object
- #after_fork_in_parent ⇒ Object
- #close ⇒ Object
- #closed? ⇒ Boolean
- #deserialize_message_length(data) ⇒ Object
- #eof? ⇒ Boolean
-
#initialize ⇒ Pipe
constructor
A new instance of Pipe.
- #read ⇒ Object
- #serialize_message_length(data) ⇒ Object
- #write(data) ⇒ Object
Constructor Details
#initialize ⇒ Pipe
Returns a new instance of Pipe.
64 65 66 67 68 69 70 71 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 64 def initialize @out, @in = IO.pipe if defined?(::Encoding::ASCII_8BIT) @in.set_encoding(::Encoding::ASCII_8BIT) end @last_read = Process.clock_gettime(Process::CLOCK_REALTIME) @parent_pid = $$ end |
Instance Attribute Details
#in ⇒ Object
Returns the value of attribute in.
61 62 63 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 61 def in @in end |
#last_read ⇒ Object (readonly)
Returns the value of attribute last_read.
62 63 64 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 62 def last_read @last_read end |
#out ⇒ Object
Returns the value of attribute out.
61 62 63 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 61 def out @out end |
#parent_pid ⇒ Object (readonly)
Returns the value of attribute parent_pid.
62 63 64 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 62 def parent_pid @parent_pid end |
Instance Method Details
#after_fork_in_child ⇒ Object
115 116 117 118 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 115 def after_fork_in_child @out.close unless @out.closed? write(READY_MARKER) end |
#after_fork_in_parent ⇒ Object
120 121 122 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 120 def after_fork_in_parent @in.close unless @in.closed? end |
#close ⇒ Object
73 74 75 76 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 73 def close @out.close unless @out.closed? @in.close unless @in.closed? end |
#closed? ⇒ Boolean
124 125 126 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 124 def closed? @out.closed? && @in.closed? end |
#deserialize_message_length(data) ⇒ Object
82 83 84 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 82 def (data) data.unpack('L>').first end |
#eof? ⇒ Boolean
111 112 113 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 111 def eof? !@out.closed? && @out.eof? end |
#read ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 92 def read @in.close unless @in.closed? @last_read = Process.clock_gettime(Process::CLOCK_REALTIME) length_bytes = @out.read(NUM_LENGTH_BYTES) if length_bytes = (length_bytes) if @out.read() else length_hex = length_bytes.bytes.map { |b| b.to_s(16) }.join(' ') NewRelic::Agent.logger.error("Failed to deserialize message length from pipe. Bytes: [#{length_hex}]") nil end else NewRelic::Agent.logger.error('Failed to read bytes for length from pipe.') nil end end |
#serialize_message_length(data) ⇒ Object
78 79 80 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 78 def (data) [data.bytesize].pack('L>') end |
#write(data) ⇒ Object
86 87 88 89 90 |
# File 'lib/new_relic/agent/pipe_channel_manager.rb', line 86 def write(data) @out.close unless @out.closed? @in << (data) @in << data end |