Class: Mu::Pcap::IPv6

Inherits:
IP show all
Defined in:
lib/diy/parser/mu/pcap/ipv6.rb

Constant Summary collapse

FORMAT =
'NnCCa16a16'

Constants inherited from IP

Mu::Pcap::IP::IPPROTO_AH, Mu::Pcap::IP::IPPROTO_DSTOPTS, Mu::Pcap::IP::IPPROTO_FRAGMENT, Mu::Pcap::IP::IPPROTO_HOPOPTS, Mu::Pcap::IP::IPPROTO_NONE, Mu::Pcap::IP::IPPROTO_ROUTING, Mu::Pcap::IP::IPPROTO_SCTP, Mu::Pcap::IP::IPPROTO_TCP, Mu::Pcap::IP::IPPROTO_UDP

Constants inherited from Packet

Packet::IGNORE_UDP_PORTS

Instance Attribute Summary collapse

Attributes inherited from IP

#dst, #src

Attributes inherited from Packet

#payload, #payload_raw

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from IP

checksum, #v4?

Methods inherited from Packet

#deepdup, isolate_l7, normalize, #payload_bytes, #to_bytes

Constructor Details

#initializeIPv6

Returns a new instance of IPv6.



15
16
17
18
19
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 15

def initialize
    super
    @next_header = 0
    @hop_limit   = 64
end

Instance Attribute Details

#hop_limitObject Also known as: ttl

Returns the value of attribute hop_limit.



13
14
15
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 13

def hop_limit
  @hop_limit
end

#next_headerObject Also known as: proto

Returns the value of attribute next_header.



13
14
15
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 13

def next_header
  @next_header
end

Class Method Details

.ah_header_from_bytes(ipv6, bytes, name) ⇒ Object

Parse authentication header (whose length field is interpeted differently)



111
112
113
114
115
116
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 111

def self.ah_header_from_bytes ipv6, bytes, name
    Pcap.assert bytes.length >= 8, "Truncated IPv6 #{name} header"
    length = (bytes[1].ord + 2) * 4
    Pcap.assert bytes.length >= length, "Truncated IPv6 #{name} header"
    return payload_from_bytes(ipv6, bytes[0].ord, bytes[length..-1])
end

.eight_byte_header_from_bytes(ipv6, bytes, name) ⇒ Object

Parse extension header that’s a multiple of 8 bytes



103
104
105
106
107
108
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 103

def self.eight_byte_header_from_bytes ipv6, bytes, name
    Pcap.assert bytes.length >= 8, "Truncated IPv6 #{name} header"
    length = (bytes[1].ord + 1) * 8
    Pcap.assert bytes.length >= length, "Truncated IPv6 #{name} header"
    return payload_from_bytes(ipv6, bytes[0].ord, bytes[length..-1])
end

.from_bytes(bytes) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 36

def self.from_bytes bytes
    Pcap.assert bytes.length >= 40, 'Truncated IPv6 header: ' +
        "expected at least 40 bytes, got #{bytes.length} bytes"

    vcl, length, next_header, hop_limit, src, dst = 
        bytes[0, 40].unpack FORMAT
    version = vcl >> 28 & 0x0f
    traffic_class = vcl >> 20 & 0xff
    flow_label = vcl & 0xfffff

    Pcap.assert version == 6, "Wrong IPv6 version: got (#{version})"
    Pcap.assert bytes.length >= (40 + length), 'Truncated IPv6 header: ' +
        "expected #{length + 40} bytes, got #{bytes.length} bytes"

    ipv6 = IPv6.new
    ipv6.next_header = next_header
    ipv6.hop_limit = hop_limit
    ipv6.src = IPAddr.new_ntoh(src).to_s
    ipv6.dst = IPAddr.new_ntoh(dst).to_s

    ipv6.payload_raw = bytes[40..-1]
    ipv6.next_header, ipv6.payload =
        payload_from_bytes ipv6, ipv6.next_header, bytes[40...40+length]

    return ipv6
end

.payload_from_bytes(ipv6, next_header, bytes) ⇒ Object

Parse bytes and returns next_header and payload. Skips extension headers.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 65

def self.payload_from_bytes ipv6, next_header, bytes
    begin
        case next_header
        when IPPROTO_TCP
            payload = TCP.from_bytes bytes
        when IPPROTO_UDP
            payload = UDP.from_bytes bytes
        when IPPROTO_SCTP
            payload = SCTP.from_bytes bytes
        when IPPROTO_HOPOPTS
            next_header, payload = eight_byte_header_from_bytes(ipv6,
                bytes, 'hop-by-hop options')
        when IPPROTO_ROUTING
            next_header, payload = eight_byte_header_from_bytes(ipv6,
                bytes, 'routing')
        when IPPROTO_DSTOPTS
            next_header, payload = eight_byte_header_from_bytes(ipv6,
                bytes, 'destination options')
        when IPPROTO_FRAGMENT
            Pcap.assert bytes.length >= 8,
                "Truncated IPv6 fragment header"
            Pcap.assert false, 'IPv6 fragments are not supported'
        when IPPROTO_AH
            next_header, payload = ah_header_from_bytes(ipv6,
                bytes, 'authentication header')
        when IPPROTO_NONE
            payload = ''
        else
            payload = bytes
        end
    rescue ParseError => e
        Pcap.warning e
        payload = bytes
    end
    return [next_header, payload]
end

Instance Method Details

#==(other) ⇒ Object



140
141
142
143
144
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 140

def == other
    return super &&
        self.next_header == other.next_header &&
        self.hop_limit   == other.hop_limit
end

#flow_idObject



28
29
30
31
32
33
34
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 28

def flow_id
    if not @payload or @payload.is_a? String
        return [:ipv6, @next_header, @src, @dst]
    else
        return [:ipv6, @src, @dst, @payload.flow_id]
    end
end

#pseudo_header(payload_length) ⇒ Object



134
135
136
137
138
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 134

def pseudo_header payload_length
    return IPAddr.new(@src, Socket::AF_INET6).hton +
        IPAddr.new(@dst, Socket::AF_INET6).hton +
        [payload_length, '', @next_header].pack('Na3C')
end

#v6?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 21

def v6?
    return true
end

#write(io) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/diy/parser/mu/pcap/ipv6.rb', line 118

def write io
    if @payload.is_a? String
        payload = @payload
    else
        string_io = StringIO.new
        @payload.write string_io, self
        payload = string_io.string
    end
    src = IPAddr.new(@src, Socket::AF_INET6).hton
    dst = IPAddr.new(@dst, Socket::AF_INET6).hton
    header = [0x60000000, payload.length, @next_header, @hop_limit, 
              src, dst].pack FORMAT
    io.write header
    io.write payload
end