Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions generator/mrt/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"encoding/binary"
"fmt"
"io"
"log"
"net"
"sync"
)
Expand Down Expand Up @@ -130,44 +132,41 @@ func (p *Processor) processRIBEntry(subType uint16, reader *bytes.Reader, result
// Read sequence number (skip)
var seqNum uint32
if err := binary.Read(reader, binary.BigEndian, &seqNum); err != nil {
return err
return fmt.Errorf("EOF at seqNum: %w", err)
}

// Read prefix length
var prefixLen uint8
if err := binary.Read(reader, binary.BigEndian, &prefixLen); err != nil {
return err
return fmt.Errorf("EOF at prefixLen: %w", err)
}

// Calculate prefix bytes
prefixBytes := (prefixLen + 7) / 8
prefixBytes := int((prefixLen + 7) / 8)
if prefixBytes > reader.Len() {
return fmt.Errorf("prefix length %d (bits: %d) exceeds remaining bytes %d",
prefixBytes, prefixLen, reader.Len())
}
prefix := make([]byte, prefixBytes)
if _, err := reader.Read(prefix); err != nil {
return err
}

// Parse IP address
var ipStr string
if subType == 2 || subType == 3 || subType == 8 || subType == 9 { // IPv4 (unicast/multicast)
if prefixBytes < 4 {
// Extend the slice in-place instead of creating a new one
prefix = append(prefix, make([]byte, 4-prefixBytes)...)
}
ip := net.IPv4(prefix[0], prefix[1], prefix[2], prefix[3])
ipStr = ip.String()
} else { // IPv6 (type 4, 5, 10, 11)
if prefixBytes < 16 {
// Extend the slice in-place
prefix = append(prefix, make([]byte, 16-prefixBytes)...)
}
ip := net.IP(prefix)
ipStr = ip.String()
var fullIP net.IP
if subType == 2 || subType == 3 || subType == 8 || subType == 9 { // IPv4
fullIP = make(net.IP, 4)
copy(fullIP, prefix)
} else { // IPv6
fullIP = make(net.IP, 16)
copy(fullIP, prefix)
}
var ipStr = fullIP.String()

// Read entry count
var entryCount uint16
if err := binary.Read(reader, binary.BigEndian, &entryCount); err != nil {
return err
return fmt.Errorf("EOF at entryCount (Seq: %d): %w", seqNum, err)
}

// Process each entry
Expand All @@ -181,7 +180,7 @@ func (p *Processor) processRIBEntry(subType uint16, reader *bytes.Reader, result
}

if err := p.processRIBEntryDescriptor(reader, result, ipStr, uint32(prefixLen), subType, isMulticast); err != nil {
return err
return fmt.Errorf("entry %d/%d error: %w", i+1, entryCount, err)
}
}

Expand All @@ -193,25 +192,33 @@ func (p *Processor) processRIBEntryDescriptor(reader *bytes.Reader, result *Resu
// Skip peer index
var peerIndex uint16
if err := binary.Read(reader, binary.BigEndian, &peerIndex); err != nil {
return err
return fmt.Errorf("EOF at peerIndex: %w", err)
}

// Skip timestamp
var originatedTime uint32
if err := binary.Read(reader, binary.BigEndian, &originatedTime); err != nil {
return err
return fmt.Errorf("EOF at originatedTime: %w", err)
}

// Read attribute length
var attrLength uint16
if err := binary.Read(reader, binary.BigEndian, &attrLength); err != nil {
return err
return fmt.Errorf("EOF at attrLength: %w", err)
}

// Read attributes
if attrLength == 0 {
log.Printf("[WARN] Found RIB Entry with 0 attribute length for prefix %s/%d, skipping", prefix, prefixLen)
return nil
}
if int(attrLength) > reader.Len() {
return fmt.Errorf("attribute length %d exceeds remaining buffer %d", attrLength, reader.Len())
}

attributes := make([]byte, attrLength)
if _, err := reader.Read(attributes); err != nil {
return err
n, err := io.ReadFull(reader, attributes)
if err != nil {
return fmt.Errorf("failed to read %d bytes of attributes (read %d): %w", attrLength, n, err)
}

// Process attributes
Expand Down Expand Up @@ -253,16 +260,20 @@ func (p *Processor) processRIBEntryDescriptor(reader *bytes.Reader, result *Resu
}

asPathReader := bytes.NewReader(asPathData)
for asPathReader.Len() > 0 {
var segType uint8
var segLength uint8
for asPathReader.Len() >= 2 {
var segType, segLength uint8
if err := binary.Read(asPathReader, binary.BigEndian, &segType); err != nil {
break
}
if err := binary.Read(asPathReader, binary.BigEndian, &segLength); err != nil {
break
}

expectedBytes := int(segLength) * 4
if asPathReader.Len() < expectedBytes {
break
}

// Read AS number
for i := uint8(0); i < segLength; i++ {
var asn uint32
Expand Down
30 changes: 13 additions & 17 deletions generator/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bufio"
"bytes"
"compress/bzip2"
"context"
Expand Down Expand Up @@ -158,25 +159,20 @@ func downloadMRTFiles(ctx context.Context, config *Config) ([]MRTDownload, error
return
}

// Decompress using bzip2 with streaming
bzReader := bzip2.NewReader(resp.Body)
// is bzip2
br := bufio.NewReader(resp.Body)
header, err := br.Peek(3)

// Use a buffer to read data in chunks
var buffer bytes.Buffer
chunk := make([]byte, 32*1024) // 32KB chunks
var reader io.Reader = br
if err == nil && string(header) == "BZh" {
reader = bzip2.NewReader(br)
}

for {
n, err := bzReader.Read(chunk)
if n > 0 {
buffer.Write(chunk[:n])
}
if err == io.EOF {
break
}
if err != nil {
errCh <- fmt.Errorf("failed to decompress %s: %v", entry.URL, err)
return
}
// read data
var buffer bytes.Buffer
if _, err := io.Copy(&buffer, reader); err != nil {
errCh <- fmt.Errorf("failed to read data from %s: %v", entry.URL, err)
return
}

// Send the decompressed data with source tag
Expand Down