Class: Cosmos::StreamProtocol
- Defined in:
- lib/cosmos/streams/stream_protocol.rb
Overview
Processes a Stream on behalf of an Interface. A Stream is a primative interface that simply reads and writes raw binary data. The StreamProtocol adds higher level processing including the ability to discard a certain number of bytes from the stream and to sync the stream on a given synchronization pattern. The StreamProtocol operates at the Packet abstraction level while the Stream operates on raw bytes.
Direct Known Subclasses
BurstStreamProtocol, FixedStreamProtocol, LengthStreamProtocol, PreidentifiedStreamProtocol, TerminatedStreamProtocol
Instance Attribute Summary collapse
-
#bytes_read ⇒ Integer
The number of bytes read from the stream.
-
#bytes_written ⇒ Integer
The number of bytes written to the stream.
-
#interface ⇒ Interface
The interface associated with this StreamProtocol.
- #post_read_data_callback ⇒ Proc
- #post_read_packet_callback ⇒ Proc
-
#post_write_data_callback ⇒ Proc
The name of a method in the Interface that #write calls after writing the data to the stream.
-
#pre_write_packet_callback ⇒ Proc
The name of a method in the Interface that #write calls before writing the data to the stream.
-
#stream ⇒ Stream
readonly
The stream this StreamProtocol is processing data from.
Instance Method Summary collapse
- #connect(stream) ⇒ Object
-
#connected? ⇒ Boolean
Whether the stream attribute has been set and is connected.
-
#disconnect ⇒ Object
Disconnects from the underlying Stream by calling Cosmos::Stream#disconnect.
-
#initialize(discard_leading_bytes = 0, sync_pattern = nil, fill_sync_pattern = false) ⇒ StreamProtocol
constructor
A new instance of StreamProtocol.
-
#post_read_data(packet_data) ⇒ String
Called to perform modifications on read data before making it into a packet.
-
#post_read_packet(packet) ⇒ Packet
Called to perform modifications on a read packet before it is given to the user.
-
#post_write_data(packet, data) ⇒ Object
Called to perform actions after writing data to the stream.
-
#pre_write_packet(packet) ⇒ String
Called to perform modifications on write data before writing it out the stream.
-
#read ⇒ Packet|nil
Reads from the stream.
-
#write(packet) ⇒ Object
Writes the packet data to the stream.
-
#write_raw(data, take_mutex = true) ⇒ Object
Writes the raw binary string to the stream.
Constructor Details
#initialize(discard_leading_bytes = 0, sync_pattern = nil, fill_sync_pattern = false) ⇒ StreamProtocol
Returns a new instance of StreamProtocol.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 61 def initialize(discard_leading_bytes = 0, sync_pattern = nil, fill_sync_pattern = false) @discard_leading_bytes = discard_leading_bytes.to_i @sync_pattern = ConfigParser.handle_nil(sync_pattern) @sync_pattern = @sync_pattern.hex_to_byte_string if @sync_pattern @fill_sync_pattern = ConfigParser.handle_true_false(fill_sync_pattern) @stream = nil @data = '' @bytes_read = 0 @bytes_written = 0 @interface = nil @post_read_data_callback = nil @post_read_packet_callback = nil @pre_write_packet_callback = nil @post_write_data_callback = nil @write_mutex = Mutex.new end |
Instance Attribute Details
#bytes_read ⇒ Integer
Returns The number of bytes read from the stream.
25 26 27 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 25 def bytes_read @bytes_read end |
#bytes_written ⇒ Integer
Returns The number of bytes written to the stream.
27 28 29 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 27 def bytes_written @bytes_written end |
#interface ⇒ Interface
Returns The interface associated with this StreamProtocol. The interface is a higher level abstraction and is passed down to the StreamProtocol to allow it to call the callbacks in the interface when processing the Cosmos::Stream. otherwise subclass wins when calling.
33 34 35 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 33 def interface @interface end |
#post_read_data_callback ⇒ Proc
Returns The name of a method in the Interface that #read calls after reading data from the Cosmos::Stream. It should take a String binary data buffer and return a String binary data buffer.
40 41 42 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 40 def post_read_data_callback @post_read_data_callback end |
#post_read_packet_callback ⇒ Proc
44 45 46 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 44 def post_read_packet_callback @post_read_packet_callback end |
#post_write_data_callback ⇒ Proc
52 53 54 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 52 def post_write_data_callback @post_write_data_callback end |
#pre_write_packet_callback ⇒ Proc
48 49 50 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 48 def pre_write_packet_callback @pre_write_packet_callback end |
#stream ⇒ Stream (readonly)
Returns The stream this StreamProtocol is processing data from.
35 36 37 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 35 def stream @stream end |
Instance Method Details
#connect(stream) ⇒ Object
102 103 104 105 106 107 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 102 def connect(stream) @data = '' @data.force_encoding('ASCII-8BIT') @stream = stream @stream.connect end |
#connected? ⇒ Boolean
Returns Whether the stream attribute has been set and is connected.
111 112 113 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 111 def connected? @stream && @stream.connected? end |
#disconnect ⇒ Object
Disconnects from the underlying Cosmos::Stream by calling Cosmos::Stream#disconnect. Clears the data attribute.
117 118 119 120 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 117 def disconnect @stream.disconnect if @stream @data = '' end |
#post_read_data(packet_data) ⇒ String
Called to perform modifications on read data before making it into a packet
224 225 226 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 224 def post_read_data(packet_data) packet_data end |
#post_read_packet(packet) ⇒ Packet
Called to perform modifications on a read packet before it is given to the user
232 233 234 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 232 def post_read_packet(packet) packet end |
#post_write_data(packet, data) ⇒ Object
Called to perform actions after writing data to the stream
264 265 266 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 264 def post_write_data(packet, data) # Default do nothing end |
#pre_write_packet(packet) ⇒ String
Called to perform modifications on write data before writing it out the stream
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 240 def pre_write_packet(packet) data = packet.buffer(false) if @fill_sync_pattern # Put leading bytes back on data = ("\x00" * @discard_leading_bytes) << data if @discard_leading_bytes > 0 # Fill the sync pattern if @sync_pattern BinaryAccessor.write(@sync_pattern, 0, @sync_pattern.length * 8, :BLOCK, data, :BIG_ENDIAN, :ERROR) end end data end |
#read ⇒ Packet|nil
Reads from the stream. It can look for a sync pattern before creating a Packet. It can discard a set number of bytes at the beginning of the stream before creating the Packet.
If the post_read_data_callback is defined (post_read_data is implemented by the interface) then it is called to translate the raw data. Otherwise post_read_data is called which does nothing unless it is implemented by a subclass.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 133 def read # Loop until we have a packet to give loop do # Handle a sync pattern if present result = handle_sync_pattern() return nil unless result # Reduce the data to a single packet packet_data = reduce_to_single_packet() return nil unless packet_data # Discard leading bytes if necessary packet_data.replace(packet_data[@discard_leading_bytes..-1]) if @discard_leading_bytes > 0 # Return data based on final_receive_processing if @post_read_data_callback packet_data = @post_read_data_callback.call(packet_data) else packet_data = post_read_data(packet_data) end if packet_data if packet_data.length > 0 # Valid packet packet = Packet.new(nil, nil, :BIG_ENDIAN, nil, packet_data) if @post_read_packet_callback packet = @post_read_packet_callback.call(packet) else packet = post_read_packet(packet) end return packet else # Packet should be ignored next end else # Connection lost return nil end end # loop do end |
#write(packet) ⇒ Object
Writes the packet data to the stream.
If the pre_write_packet_callback is defined (pre_write_packet is implemented by the interface) then that is called to translate the packet into data. Otherwise pre_write_packet is called which returns the packet buffer unless it is implemented by a subclass.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 182 def write(packet) @write_mutex.synchronize do if @pre_write_packet_callback data = @pre_write_packet_callback.call(packet) else data = pre_write_packet(packet) end if data write_raw(data, false) if @post_write_data_callback @post_write_data_callback.call(packet, data) else post_write_data(packet, data) end else # write aborted - don't write data end end end |
#write_raw(data, take_mutex = true) ⇒ Object
Writes the raw binary string to the stream.
206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/cosmos/streams/stream_protocol.rb', line 206 def write_raw(data, take_mutex = true) @write_mutex.lock if take_mutex begin if connected?() @stream.write(data) @bytes_written += data.length else raise "Stream not connected for write_raw" end ensure @write_mutex.unlock if take_mutex end end |