Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,22 @@ func (s *Session) EnqueueTx(data []byte) {
}
}

// EnqueueInitialData prepends data to the tx buffer if the SYN hasn't been sent
// yet. Used by the connect_data optimization.
// EnqueueInitialData appends data to the tx buffer while synNeeded is still
// true, so the first DrainTx call bundles it into the SYN frame's payload
// (the connect_data optimization — saves one round-trip on every TLS
// handshake and HTTP request).
//
// Previously this prepended, on the assumption it'd be called once before
// any other write. But the SOCKS5 adapter calls it on every Write, and the
// local write loop is frequently faster than poll workers — multiple calls
// land before the SYN drains, and prepending REVERSES byte order. For a
// payload whose first bytes carry framing/length (a TLS record header, an
// HTTP request line, the bench harness's 8-byte size prefix), reordering
// silently corrupts the upstream stream. The bug also rendered every
// upload-throughput benchmark we have meaningless: with a size-prefixed
// payload of zeros, the upstream parsed a body chunk's leading zeros as
// "expect 0 bytes" and ACKed immediately, making upload look ~5× faster
// than it actually was.
func (s *Session) EnqueueInitialData(data []byte) {
s.mu.Lock()
if !s.synNeeded {
Expand All @@ -159,8 +173,7 @@ func (s *Session) EnqueueInitialData(data []byte) {
s.EnqueueTx(data)
return
}
// Prepend to txBuf so it's picked up by the first DrainTx call.
s.txBuf = append(data, s.txBuf...)
s.txBuf = append(s.txBuf, data...)
if s.firstQueuedAt.IsZero() {
s.firstQueuedAt = time.Now()
}
Expand Down
34 changes: 34 additions & 0 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,37 @@ func TestRollbackDrain_PreservesConcurrentEnqueue(t *testing.T) {
t.Fatalf("merged payload: got %q want %q", got, want)
}
}

// TestEnqueueInitialData_PreservesOrderAcrossMultipleCalls catches the regression
// where EnqueueInitialData prepended (instead of appending) while synNeeded was
// true. The SOCKS5 adapter calls this on every Write, and a fast local writer
// can fit many calls between session creation and the SYN drain. Prepend
// reversed byte order on the wire; for any protocol whose first bytes carry
// framing (TLS records, HTTP request lines, length prefixes), the upstream
// would either error or parse garbage. The bench harness's sized upload sink
// silently masked this by ACKing on a 0-length body when the body's leading
// zeros were read as the size header, producing wildly optimistic upload
// throughput measurements.
func TestEnqueueInitialData_PreservesOrderAcrossMultipleCalls(t *testing.T) {
s := New(sid(10), "example.com:443", true)

// Simulate a SOCKS5 writer calling Write multiple times before the
// carrier's poll worker drains the SYN: a length-prefix header followed
// by body chunks. The order matters; reverse order would put the body
// before the header and the upstream would misparse.
s.EnqueueInitialData([]byte("HDR_"))
s.EnqueueInitialData([]byte("body_chunk_1_"))
s.EnqueueInitialData([]byte("body_chunk_2"))

frames := s.DrainTx(64 * 1024)
if len(frames) != 1 {
t.Fatalf("want 1 bundled frame, got %d", len(frames))
}
if !frames[0].HasFlag(frame.FlagSYN) {
t.Fatal("first frame missing SYN")
}
want := []byte("HDR_body_chunk_1_body_chunk_2")
if !bytes.Equal(frames[0].Payload, want) {
t.Fatalf("payload ordering corrupt: got %q want %q", frames[0].Payload, want)
}
}
Loading