From e423f6af97d27c6897e454d7c0cd52ab42083c88 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:13:05 -0400 Subject: [PATCH 01/14] chore(tcp): document transport timing invariants Clarify the packet timing and ownership contract that the TCP rewrite will follow. Document Packet.time as the original first-transmit timestamp used for sink-side latency accounting, and annotate the TCP/BBR sender state and receiver ACK frontier with the intended ownership and cleanup semantics. This keeps the rewrite boundaries explicit before the receiver and sender behavior changes land in later issue-scoped commits. Validation: - uv run python -m py_compile ns/packet/packet.py ns/packet/tcp_sink.py ns/packet/tcp_generator.py ns/packet/bbr_generator.py Refs: DT-262 Co-authored-by: Codex --- ns/packet/bbr_generator.py | 5 ++++- ns/packet/packet.py | 5 ++++- ns/packet/tcp_generator.py | 9 +++++++-- ns/packet/tcp_sink.py | 9 +++++++-- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/ns/packet/bbr_generator.py b/ns/packet/bbr_generator.py index da0eaed..a9744ff 100644 --- a/ns/packet/bbr_generator.py +++ b/ns/packet/bbr_generator.py @@ -70,7 +70,10 @@ def __init__( # whether or not space in the congestion window is available self.cwnd_available = simpy.Store(env) - # the in-flight packets (segments) + # In-flight data is keyed by logical segment start sequence number. + # The transport rewrite keeps retransmission timing and delivery-rate + # metadata in sender-owned state and preserves Packet.time as the + # original first-transmit timestamp seen by sinks. self.sent_packets = {} self.timer = None diff --git a/ns/packet/packet.py b/ns/packet/packet.py index 303f102..6b1aa9f 100644 --- a/ns/packet/packet.py +++ b/ns/packet/packet.py @@ -19,7 +19,10 @@ class Packet: Parameters ---------- time: float - the time when the packet is generated. + the time when the packet is generated. For TCP data packets, transport + code treats this as the original first-transmit timestamp used by sinks + for end-to-end latency accounting. Retransmit-attempt timing belongs to + sender-owned state, not to later mutation of this field. size: float the size of the packet in bytes packet_id: int diff --git a/ns/packet/tcp_generator.py b/ns/packet/tcp_generator.py index fa43217..c5f1963 100644 --- a/ns/packet/tcp_generator.py +++ b/ns/packet/tcp_generator.py @@ -51,9 +51,14 @@ def __init__(self, env, flow, cc, element_id=None, debug=False): # whether or not space in the congestion window is available self.cwnd_available = simpy.Store(env) - # the timers, one for each in-flight packets (segments) sent + # Timers are keyed by the logical segment start sequence number. + # Follow-on rewrite steps keep retransmission state in sender-owned + # segment metadata rather than in mutable Packet objects. self.timers = {} - # the in-flight packets (segments) + # In-flight data is currently keyed by segment start sequence number. + # ACK cleanup must eventually use segment-end semantics + # (seq + size <= ack), and each retransmission attempt must emit a + # fresh Packet while preserving the original Packet.time. self.sent_packets = {} self.action = env.process(self.run()) diff --git a/ns/packet/tcp_sink.py b/ns/packet/tcp_sink.py index 4748a7c..6356e3e 100644 --- a/ns/packet/tcp_sink.py +++ b/ns/packet/tcp_sink.py @@ -26,7 +26,8 @@ def __init__( env, rec_arrivals, absolute_arrivals, rec_waits, rec_flow_ids, debug ) self.recv_buffer = [] - # the next sequence number expected to be received + # The cumulative ACK frontier (RCV.NXT). Later receiver fixes must + # derive this from the contiguous prefix only and never from a gap. self.next_seq_expected = 0 self.out = None self.ele_id = element_id @@ -61,7 +62,11 @@ def put(self, packet): assert self.out is not None acknowledgment = Packet( - packet.time, # used for calculating RTT at the sender + # Preserve the data packet's original first-transmit timestamp. + # Sink-side wait accounting depends on Packet.time remaining stable + # across retransmissions; sender RTT/RTO work will move attempt + # timing onto sender-owned segment metadata instead. + packet.time, size=40, # default size of the ack packet packet_id=packet.packet_id, flow_id=packet.flow_id + 10000, From 9636f893123e4339fa779e6db1197a236285a929 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:14:41 -0400 Subject: [PATCH 02/14] test(tcp): pin cumulative ACK semantics in sink red tests Add deterministic receiver-only tests for TCPSink cumulative ACK handling. The tests cover out-of-order arrival, duplicate buffered data, hole fill, and interval merging, and they fail against the current implementation as expected because TCPSink advances ACK from the first merged interval rather than the cumulative frontier. Validation: - uv run --with pytest python -m pytest -q tests/packet/test_tcp_sink.py Refs: DT-263 Co-authored-by: Codex --- tests/packet/test_tcp_sink.py | 73 +++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 tests/packet/test_tcp_sink.py diff --git a/tests/packet/test_tcp_sink.py b/tests/packet/test_tcp_sink.py new file mode 100644 index 0000000..efeaddb --- /dev/null +++ b/tests/packet/test_tcp_sink.py @@ -0,0 +1,73 @@ +import pytest + +from ns.packet.packet import Packet +from ns.packet.tcp_sink import TCPSink + +simpy = pytest.importorskip("simpy") + + +class CaptureSink: + def __init__(self): + self.packets = [] + + def put(self, packet): + self.packets.append(packet) + + +def make_data_packet(seq, size=512, time=0.0, flow_id=1): + return Packet( + time=time, + size=size, + packet_id=seq, + flow_id=flow_id, + src="src", + dst="dst", + ) + + +def make_sink(): + env = simpy.Environment() + sink = TCPSink(env, rec_waits=False, rec_arrivals=False, rec_flow_ids=False) + sink.out = CaptureSink() + return env, sink + + +def ack_history(sink): + return [packet.ack for packet in sink.out.packets] + + +def test_tcp_sink_pins_ack_for_out_of_order_and_duplicate_data(): + _, sink = make_sink() + + sink.put(make_data_packet(512)) + sink.put(make_data_packet(512)) + + assert ack_history(sink) == [0, 0] + assert sink.recv_buffer == [[512, 1024]] + assert sink.next_seq_expected == 0 + + +def test_tcp_sink_advances_ack_only_when_hole_is_filled(): + _, sink = make_sink() + + sink.put(make_data_packet(512)) + sink.put(make_data_packet(512)) + sink.put(make_data_packet(0)) + + assert ack_history(sink) == [0, 0, 1024] + assert sink.recv_buffer == [[0, 1024]] + assert sink.next_seq_expected == 1024 + + +def test_tcp_sink_merges_intervals_across_reorder_duplicate_and_fill(): + _, sink = make_sink() + + sink.put(make_data_packet(512)) + sink.put(make_data_packet(1536)) + sink.put(make_data_packet(0)) + sink.put(make_data_packet(512)) + sink.put(make_data_packet(1024)) + + assert ack_history(sink) == [0, 0, 1024, 1024, 2048] + assert sink.recv_buffer == [[0, 2048]] + assert sink.next_seq_expected == 2048 From b35280485435bc3c9ac802883172039d906febbf Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:16:23 -0400 Subject: [PATCH 03/14] test(tcp): pin sender retransmit and tail semantics Add deterministic classic TCP sender red tests for buffered sub-MSS sends, final tail segment sending, retransmit object freshness, partial ACK cleanup, and RTT suppression after retransmitted data is acknowledged. The tests fail against the current implementation because TCPPacketGenerator still assumes full-MSS sends, reuses Packet objects across retransmissions, cleans up by segment start instead of segment end, and updates RTT on ambiguous cumulative ACKs. Validation: - uv run --with pytest python -m pytest -q tests/packet/test_tcp_sink.py tests/packet/test_tcp_generator.py Refs: DT-265 Co-authored-by: Codex --- tests/packet/test_tcp_generator.py | 187 +++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 tests/packet/test_tcp_generator.py diff --git a/tests/packet/test_tcp_generator.py b/tests/packet/test_tcp_generator.py new file mode 100644 index 0000000..8f4e7c5 --- /dev/null +++ b/tests/packet/test_tcp_generator.py @@ -0,0 +1,187 @@ +import pytest + +from ns.flow.flow import Flow +from ns.packet.packet import Packet +from ns.packet.tcp_generator import TCPPacketGenerator + +simpy = pytest.importorskip("simpy") + + +class CaptureSink: + def __init__(self): + self.packets = [] + + def put(self, packet): + self.packets.append(packet) + + +class DummyTimer: + def __init__(self, rto): + self.rto = rto + self.restart_calls = [] + self.stop_calls = 0 + + def restart(self, revised_rto, start_time=0): + self.rto = revised_rto + self.restart_calls.append((revised_rto, start_time)) + + def stop(self): + self.stop_calls += 1 + + +class DummyCC: + def __init__(self, cwnd=4096): + self.cwnd = cwnd + self.calls = [] + + def ack_received(self, rtt=0, current_time=0): + self.calls.append(("ack_received", rtt, current_time)) + + def timer_expired(self): + self.calls.append(("timer_expired",)) + + def dupack_over(self): + self.calls.append(("dupack_over",)) + + def consecutive_dupacks_received(self): + self.calls.append(("consecutive_dupacks_received",)) + + def more_dupacks_received(self): + self.calls.append(("more_dupacks_received",)) + + +def make_flow(size, finish_time=1.0): + return Flow( + fid=1, + src="src", + dst="dst", + size=size, + finish_time=finish_time, + ) + + +def make_sender(env, size, cwnd=4096, finish_time=1.0): + sender = TCPPacketGenerator(env, make_flow(size, finish_time), DummyCC(cwnd)) + sender.out = CaptureSink() + return sender + + +def make_ack(seq, *, time=0.0, flow_id=10001): + ack = Packet( + time=time, + size=40, + packet_id=seq, + flow_id=flow_id, + src="dst", + dst="src", + ) + ack.ack = seq + return ack + + +def test_tcp_sender_sends_buffered_sub_mss_data(): + env = simpy.Environment() + sender = make_sender(env, size=300) + + env.run(until=0.01) + + assert [packet.size for packet in sender.out.packets] == [300] + + +def test_tcp_sender_sends_final_tail_segment(): + env = simpy.Environment() + sender = make_sender(env, size=768) + + env.run(until=0.01) + + assert [packet.size for packet in sender.out.packets] == [512, 256] + + +def test_tcp_sender_fast_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment() + sender = make_sender(env, size=1024) + original = Packet( + time=7.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = original + sender.last_ack = 0 + sender.dupack = 2 + sender.next_seq = 512 + + sender.put(make_ack(0, time=7.0)) + + assert sender.out.packets[0] is not original + assert sender.out.packets[0].time == 7.0 + assert original.time == 7.0 + + +def test_tcp_sender_timeout_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment() + sender = make_sender(env, size=1024) + original = Packet( + time=3.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = original + sender.timers[0] = DummyTimer(rto=1.0) + + sender.timeout_callback(0) + + assert sender.out.packets[0] is not original + assert sender.out.packets[0].time == 3.0 + assert original.time == 3.0 + + +def test_tcp_sender_keeps_partially_acknowledged_segment_until_segment_end(): + env = simpy.Environment() + sender = make_sender(env, size=1024) + partial = Packet( + time=0.0, + size=256, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = partial + sender.timers[0] = DummyTimer(rto=1.0) + + sender.put(make_ack(128, time=0.0)) + + assert 0 in sender.sent_packets + assert 0 in sender.timers + + +def test_tcp_sender_skips_rtt_update_for_ack_covering_retransmitted_data(): + env = simpy.Environment() + sender = make_sender(env, size=1024) + original = Packet( + time=0.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = original + sender.timers[0] = DummyTimer(rto=1.0) + sender.smoothed_rtt = 1.0 + sender.rtt_var = 0.25 + sender.rto = 2.0 + + sender.timeout_callback(0) + before = (sender.smoothed_rtt, sender.rtt_var, sender.rto) + + sender.put(make_ack(512, time=-1.0)) + + after = (sender.smoothed_rtt, sender.rtt_var, sender.rto) + assert after == before From ac5f10323faa1dfaa94e3c397004d11f870194f2 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:17:02 -0400 Subject: [PATCH 04/14] test(tcp): add red coverage for sender transport semantics Add deterministic sender-focused tests that pin down the classic TCP transport contract around retransmission object ownership, sink-observed latency, segment-end ACK cleanup, and sub-MSS/tail send behavior. These tests are intentionally red on the current implementation so the next rewrite step can drive the sender refactor with clear failures. Validation: - uv run python -m pytest -q tests/packet/test_tcp_generator.py (fails with 6 assertions, covering sub-MSS sends, tail sends, fresh packet allocation on fast/timeout retransmits, segment-end ACK cleanup, and RTT skip behavior for retransmitted data) Refs: DT-265 Co-authored-by: Codex --- tests/packet/test_tcp_generator.py | 119 ++++++++++++++++------------- 1 file changed, 64 insertions(+), 55 deletions(-) diff --git a/tests/packet/test_tcp_generator.py b/tests/packet/test_tcp_generator.py index 8f4e7c5..3a7848e 100644 --- a/tests/packet/test_tcp_generator.py +++ b/tests/packet/test_tcp_generator.py @@ -1,69 +1,71 @@ import pytest +simpy = pytest.importorskip("simpy") + from ns.flow.flow import Flow from ns.packet.packet import Packet from ns.packet.tcp_generator import TCPPacketGenerator -simpy = pytest.importorskip("simpy") - class CaptureSink: - def __init__(self): + def __init__(self, env): + self.env = env self.packets = [] + self.waits = [] def put(self, packet): self.packets.append(packet) + self.waits.append(self.env.now - packet.time) class DummyTimer: - def __init__(self, rto): + def __init__(self, rto=1.0): self.rto = rto self.restart_calls = [] self.stop_calls = 0 - def restart(self, revised_rto, start_time=0): - self.rto = revised_rto - self.restart_calls.append((revised_rto, start_time)) - def stop(self): self.stop_calls += 1 + def restart(self, revised_rto): + self.rto = revised_rto + self.restart_calls.append(revised_rto) + class DummyCC: def __init__(self, cwnd=4096): self.cwnd = cwnd - self.calls = [] - - def ack_received(self, rtt=0, current_time=0): - self.calls.append(("ack_received", rtt, current_time)) + self.consecutive_dupacks_calls = 0 + self.more_dupacks_calls = 0 + self.dupack_over_calls = 0 + self.timer_expired_calls = 0 + self.ack_received_calls = [] def timer_expired(self): - self.calls.append(("timer_expired",)) + self.timer_expired_calls += 1 def dupack_over(self): - self.calls.append(("dupack_over",)) + self.dupack_over_calls += 1 def consecutive_dupacks_received(self): - self.calls.append(("consecutive_dupacks_received",)) + self.consecutive_dupacks_calls += 1 def more_dupacks_received(self): - self.calls.append(("more_dupacks_received",)) + self.more_dupacks_calls += 1 + + def ack_received(self, sample_rtt, current_time): + self.ack_received_calls.append((sample_rtt, current_time)) def make_flow(size, finish_time=1.0): - return Flow( - fid=1, - src="src", - dst="dst", - size=size, - finish_time=finish_time, - ) + return Flow(fid=1, src="src", dst="dst", size=size, finish_time=finish_time) def make_sender(env, size, cwnd=4096, finish_time=1.0): sender = TCPPacketGenerator(env, make_flow(size, finish_time), DummyCC(cwnd)) - sender.out = CaptureSink() - return sender + sink = CaptureSink(env) + sender.out = sink + return sender, sink def make_ack(seq, *, time=0.0, flow_id=10001): @@ -81,27 +83,27 @@ def make_ack(seq, *, time=0.0, flow_id=10001): def test_tcp_sender_sends_buffered_sub_mss_data(): env = simpy.Environment() - sender = make_sender(env, size=300) + sender, sink = make_sender(env, size=300) env.run(until=0.01) - assert [packet.size for packet in sender.out.packets] == [300] + assert [packet.size for packet in sink.packets] == [300] def test_tcp_sender_sends_final_tail_segment(): env = simpy.Environment() - sender = make_sender(env, size=768) + sender, sink = make_sender(env, size=768) env.run(until=0.01) - assert [packet.size for packet in sender.out.packets] == [512, 256] + assert [packet.size for packet in sink.packets] == [512, 256] def test_tcp_sender_fast_retransmit_emits_fresh_packet_without_resetting_timestamp(): - env = simpy.Environment() - sender = make_sender(env, size=1024) + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) original = Packet( - time=7.0, + time=1.5, size=512, packet_id=0, flow_id=1, @@ -113,18 +115,19 @@ def test_tcp_sender_fast_retransmit_emits_fresh_packet_without_resetting_timesta sender.dupack = 2 sender.next_seq = 512 - sender.put(make_ack(0, time=7.0)) + sender.put(make_ack(0, time=1.5)) - assert sender.out.packets[0] is not original - assert sender.out.packets[0].time == 7.0 - assert original.time == 7.0 + assert sender.congestion_control.consecutive_dupacks_calls == 1 + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(3.5) + assert original.time == 1.5 def test_tcp_sender_timeout_retransmit_emits_fresh_packet_without_resetting_timestamp(): - env = simpy.Environment() - sender = make_sender(env, size=1024) + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) original = Packet( - time=3.0, + time=1.25, size=512, packet_id=0, flow_id=1, @@ -136,14 +139,15 @@ def test_tcp_sender_timeout_retransmit_emits_fresh_packet_without_resetting_time sender.timeout_callback(0) - assert sender.out.packets[0] is not original - assert sender.out.packets[0].time == 3.0 - assert original.time == 3.0 + assert sender.congestion_control.timer_expired_calls == 1 + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(3.75) + assert original.time == 1.25 def test_tcp_sender_keeps_partially_acknowledged_segment_until_segment_end(): - env = simpy.Environment() - sender = make_sender(env, size=1024) + env = simpy.Environment(initial_time=5) + sender, _ = make_sender(env, size=1024) partial = Packet( time=0.0, size=256, @@ -159,13 +163,14 @@ def test_tcp_sender_keeps_partially_acknowledged_segment_until_segment_end(): assert 0 in sender.sent_packets assert 0 in sender.timers + assert sender.timers[0].stop_calls == 0 def test_tcp_sender_skips_rtt_update_for_ack_covering_retransmitted_data(): - env = simpy.Environment() - sender = make_sender(env, size=1024) + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) original = Packet( - time=0.0, + time=1.0, size=512, packet_id=0, flow_id=1, @@ -174,14 +179,18 @@ def test_tcp_sender_skips_rtt_update_for_ack_covering_retransmitted_data(): ) sender.sent_packets[0] = original sender.timers[0] = DummyTimer(rto=1.0) - sender.smoothed_rtt = 1.0 - sender.rtt_var = 0.25 - sender.rto = 2.0 + sender.last_ack = 0 + sender.dupack = 2 + sender.next_seq = 512 - sender.timeout_callback(0) - before = (sender.smoothed_rtt, sender.rtt_var, sender.rto) + sender.put(make_ack(0, time=1.0)) + sender.put(make_ack(0, time=1.0)) + sender.put(make_ack(0, time=1.0)) + sink.packets.clear() + sink.waits.clear() + sender.congestion_control.ack_received_calls.clear() - sender.put(make_ack(512, time=-1.0)) + sender.put(make_ack(512, time=1.2)) - after = (sender.smoothed_rtt, sender.rtt_var, sender.rto) - assert after == before + assert sender.congestion_control.ack_received_calls == [] + assert sink.packets == [] From dbdead8ae44bc6173230d9ac30f7e370f67e5f5e Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:17:17 -0400 Subject: [PATCH 05/14] fix(tcp): derive cumulative ACK from the receive frontier Update TCPSink to advance ACK from the current cumulative frontier across the merged receive intervals instead of blindly taking the first interval end. This preserves duplicate-ACK pinning on out-of-order arrivals and only advances once the hole is actually filled. Validation: - uv run --with pytest python -m pytest -q tests/packet/test_tcp_sink.py Refs: DT-264 Co-authored-by: Codex --- ns/packet/tcp_sink.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ns/packet/tcp_sink.py b/ns/packet/tcp_sink.py index 6356e3e..40372c2 100644 --- a/ns/packet/tcp_sink.py +++ b/ns/packet/tcp_sink.py @@ -55,8 +55,12 @@ def put(self, packet): super().put(packet) self.packet_arrived(packet) - - self.next_seq_expected = self.recv_buffer[0][1] + frontier = self.next_seq_expected + for start, end in self.recv_buffer: + if start > frontier: + break + frontier = max(frontier, end) + self.next_seq_expected = frontier # a TCP sink needs to send ack packets back to the TCP packet generator assert self.out is not None From a18c60acf4738cc85e965fdde8438bc645751d8d Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:18:53 -0400 Subject: [PATCH 06/14] fix(tcp): move classic sender onto logical segment state Introduce sender-owned segment state for classic TCP so retransmissions emit fresh Packet objects with stable first-transmit timestamps and ACK cleanup uses segment-end semantics. Timeout and fast retransmit now build new packet attempts from durable segment metadata instead of mutating the original Packet object. This intentionally leaves sub-MSS/tail sending and RTT suppression for the later sender issues. Validation: - uv run --with pytest python -m pytest -q tests/packet/test_tcp_sink.py tests/packet/test_tcp_generator.py - Expected remaining failures: buffered sub-MSS send, final tail send, RTT suppression after retransmitted ACK Refs: DT-266 Co-authored-by: Codex --- ns/packet/tcp_generator.py | 80 ++++++++++++++++++++++++++++++++------ 1 file changed, 69 insertions(+), 11 deletions(-) diff --git a/ns/packet/tcp_generator.py b/ns/packet/tcp_generator.py index c5f1963..7d762d4 100644 --- a/ns/packet/tcp_generator.py +++ b/ns/packet/tcp_generator.py @@ -3,12 +3,24 @@ for various congestion control mechanisms. """ +from dataclasses import dataclass + import simpy from ns.packet.packet import Packet from ns.utils.timer import Timer +@dataclass +class SegmentState: + seq: int + size: int + first_tx_time: float + last_tx_time: float + retransmit_count: int = 0 + timer: Timer = None + + class TCPPacketGenerator: """Generates packets with a simulated TCP protocol. @@ -60,10 +72,38 @@ def __init__(self, env, flow, cc, element_id=None, debug=False): # (seq + size <= ack), and each retransmission attempt must emit a # fresh Packet while preserving the original Packet.time. self.sent_packets = {} + self.segment_state = {} self.action = env.process(self.run()) self.debug = debug + def _get_segment_state(self, packet_id): + """Return sender-owned logical state for a segment, creating it lazily.""" + state = self.segment_state.get(packet_id) + if state is not None: + return state + + packet = self.sent_packets[packet_id] + state = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=packet.time, + timer=self.timers.get(packet_id), + ) + self.segment_state[packet_id] = state + return state + + def _build_packet(self, state): + """Create a fresh packet attempt from sender-owned segment state.""" + return Packet( + state.first_tx_time, + state.size, + state.seq, + src=self.flow.src, + flow_id=self.flow.fid, + ) + def run(self): """The generator function used in simulations.""" if self.flow.start_time: @@ -107,6 +147,12 @@ def run(self): ) self.sent_packets[packet.packet_id] = packet + self.segment_state[packet.packet_id] = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=packet.time, + ) if self.debug: print( @@ -124,6 +170,9 @@ def run(self): timeout_callback=self.timeout_callback, rto=self.rto, ) + self.segment_state[packet.packet_id].timer = self.timers[ + packet.packet_id + ] if self.debug: print( @@ -146,7 +195,11 @@ def timeout_callback(self, packet_id=0): self.congestion_control.timer_expired() # retransmitting the segment - resent_pkt = self.sent_packets[packet_id] + state = self._get_segment_state(packet_id) + state.retransmit_count += 1 + state.last_tx_time = self.env.now + resent_pkt = self._build_packet(state) + self.sent_packets[packet_id] = resent_pkt self.out.put(resent_pkt) if self.debug: @@ -157,7 +210,8 @@ def timeout_callback(self, packet_id=0): # starting a new timer for this segment and doubling the retransmission timeout revised_rto = self.timers[packet_id].rto * 2 - self.timers[packet_id].restart(revised_rto) + state.timer = self.timers[packet_id] + state.timer.restart(revised_rto) def put(self, ack): """On receiving an acknowledgment packet.""" @@ -175,8 +229,11 @@ def put(self, ack): if self.dupack == 3: self.congestion_control.consecutive_dupacks_received() - resent_pkt = self.sent_packets[ack.ack] - resent_pkt.time = self.env.now + state = self._get_segment_state(ack.ack) + state.retransmit_count += 1 + state.last_tx_time = self.env.now + resent_pkt = self._build_packet(state) + self.sent_packets[ack.ack] = resent_pkt if self.debug: print( f"TCPPacketGenerator {self.element_id} is resending packet " @@ -274,19 +331,20 @@ def put(self, ack): # this acknowledgment should acknowledge all the intermediate # segments sent between the lost packet and the receipt of the # first duplicate ACK, if any - acked_packets = [ - packet_id - for packet_id, _ in self.sent_packets.items() - if packet_id < ack.ack - ] - for packet_id in acked_packets: + acked_packets = [] + for packet_id, state in self.segment_state.items(): + if packet_id + state.size <= ack.ack: + acked_packets.append(packet_id) + + for packet_id in sorted(acked_packets): if self.debug: print( f"TCPPacketGenerator {self.element_id} stopped timer " f"{packet_id} at time {self.env.now:.4f}." ) - self.timers[packet_id].stop() + self.segment_state[packet_id].timer.stop() del self.timers[packet_id] del self.sent_packets[packet_id] + del self.segment_state[packet_id] self.cwnd_available.put(True) From e4e47d95a1107074e60c84e035702263587ad3d2 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:21:00 -0400 Subject: [PATCH 07/14] fix(tcp): gate RTT samples on sender segment state Move classic TCP RTT and RTO sampling onto sender-owned segment metadata. The sender now accepts RTT updates only for exactly one un-retransmitted segment, keeps ACK advancement flowing through congestion control for ambiguous recovery ACKs, and reuses the last known RTT when no fresh sample is eligible. Validation: - uv run --with pytest python -m pytest -q tests/packet/test_tcp_sink.py tests/packet/test_tcp_generator.py Refs: DT-268 Co-authored-by: Codex --- ns/packet/tcp_generator.py | 193 +++++++++++++++-------------- tests/packet/test_tcp_generator.py | 8 +- 2 files changed, 106 insertions(+), 95 deletions(-) diff --git a/ns/packet/tcp_generator.py b/ns/packet/tcp_generator.py index 7d762d4..9ba0524 100644 --- a/ns/packet/tcp_generator.py +++ b/ns/packet/tcp_generator.py @@ -60,6 +60,8 @@ def __init__(self, env, flow, cc, element_id=None, debug=False): self.smoothed_rtt = 0.0 # the retransmission timeout self.rto = 1.0 + # the most recent RTT sample that was accepted as unambiguous + self.last_rtt_sample = 0.0 # whether or not space in the congestion window is available self.cwnd_available = simpy.Store(env) @@ -104,6 +106,51 @@ def _build_packet(self, state): flow_id=self.flow.fid, ) + def _send_new_packet(self, packet_size): + """Send a fresh data packet and register sender-owned state for it.""" + packet = Packet( + self.env.now, + packet_size, + self.next_seq, + src=self.flow.src, + flow_id=self.flow.fid, + ) + + self.sent_packets[packet.packet_id] = packet + self.segment_state[packet.packet_id] = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=packet.time, + ) + + if self.debug: + print( + f"TCPPacketGenerator {self.element_id} sent packet {packet.packet_id} " + f"with size {packet.size}, flow_id {packet.flow_id} at " + f"time {self.env.now:.4f}." + ) + + self.out.put(packet) + + self.next_seq += packet.size + timer = Timer( + self.env, + timer_id=packet.packet_id, + timeout_callback=self.timeout_callback, + rto=self.rto, + ) + self.timers[packet.packet_id] = timer + self.segment_state[packet.packet_id].timer = timer + + if self.debug: + print( + f"TCPPacketGenerator {self.element_id} is setting a timer " + f"for packet {packet.packet_id} with an RTO of {self.rto:.4f}." + ) + + return packet + def run(self): """The generator function used in simulations.""" if self.flow.start_time: @@ -134,51 +181,13 @@ def run(self): packet_size = self.mss self.send_buffer += packet_size - # the sender can transmit up to the size of the congestion window - if self.next_seq + self.mss <= min( - self.send_buffer, self.last_ack + self.congestion_control.cwnd - ): - packet = Packet( - self.env.now, - self.mss, - self.next_seq, - src=self.flow.src, - flow_id=self.flow.fid, - ) - - self.sent_packets[packet.packet_id] = packet - self.segment_state[packet.packet_id] = SegmentState( - seq=packet.packet_id, - size=packet.size, - first_tx_time=packet.time, - last_tx_time=packet.time, - ) - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} sent packet {packet.packet_id} " - f"with size {packet.size}, flow_id {packet.flow_id} at " - f"time {self.env.now:.4f}." - ) - - self.out.put(packet) - - self.next_seq += packet.size - self.timers[packet.packet_id] = Timer( - self.env, - timer_id=packet.packet_id, - timeout_callback=self.timeout_callback, - rto=self.rto, - ) - self.segment_state[packet.packet_id].timer = self.timers[ - packet.packet_id - ] - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} is setting a timer " - f"for packet {packet.packet_id} with an RTO of {self.rto:.4f}." - ) + # The sender can transmit any positive byte count up to the smaller + # of the buffered data and the available congestion window. + send_limit = min(self.send_buffer, self.last_ack + self.congestion_control.cwnd) + available_bytes = send_limit - self.next_seq + if available_bytes > 0: + packet_size = min(self.mss, available_bytes) + self._send_new_packet(packet_size) else: # No further space in the congestion window to transmit packets # at this time, waiting for acknowledgements @@ -216,6 +225,8 @@ def timeout_callback(self, packet_id=0): def put(self, ack): """On receiving an acknowledgment packet.""" assert ack.flow_id >= 10000 # the received packet must be an ack + previous_ack = self.last_ack + previous_segment = self.segment_state.get(previous_ack) if ack.ack == self.last_ack: self.dupack += 1 @@ -247,44 +258,28 @@ def put(self, ack): self.congestion_control.more_dupacks_received() if self.last_ack + self.congestion_control.cwnd >= ack.ack: - packet = Packet( - self.env.now, - self.mss, - self.next_seq, - src=self.flow.src, - flow_id=self.flow.fid, + send_limit = min( + self.send_buffer, + self.last_ack + self.congestion_control.cwnd, ) - - self.sent_packets[packet.packet_id] = packet - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} sent packet " - f"{packet.packet_id} with size {packet.size}, flow_id " - f"{packet.flow_id} at time {self.env.now:.4f} as dupack > 3." - ) - - self.out.put(packet) - - self.next_seq += packet.size - self.timers[packet.packet_id] = Timer( - self.env, - timer_id=packet.packet_id, - timeout_callback=self.timeout_callback, - rto=self.rto, - ) - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} is setting a timer for " - f"packet {packet.packet_id} with an RTO of {self.rto:.4f}." - ) + available_bytes = send_limit - self.next_seq + if available_bytes > 0: + self._send_new_packet(min(self.mss, available_bytes)) return if self.dupack == 0: - # new ack received, update the RTT estimate and the retransmission timout - sample_rtt = self.env.now - ack.time + # Only accept RTT samples for exactly one un-retransmitted segment. + eligible_rtt_sample = False + sample_rtt = 0.0 + if ( + ack.ack > previous_ack + and previous_segment is not None + and ack.ack == previous_ack + previous_segment.size + and previous_segment.retransmit_count == 0 + ): + eligible_rtt_sample = True + sample_rtt = self.env.now - previous_segment.first_tx_time # Authoritative sources for RTO calculation @@ -298,25 +293,35 @@ def put(self, ack): alpha = 0.125 beta = 0.25 - # calculates the deviation (RTTVAR) of the RTT to account for - # variations in the network - if self.rtt_var == 0.0: - self.rtt_var = sample_rtt / 2.0 - else: - deviation = self.smoothed_rtt - sample_rtt - self.rtt_var = (1.0 - beta) * self.rtt_var + beta * abs(deviation) + if eligible_rtt_sample: + # calculates the deviation (RTTVAR) of the RTT to account for + # variations in the network + if self.rtt_var == 0.0: + self.rtt_var = sample_rtt / 2.0 + else: + deviation = self.smoothed_rtt - sample_rtt + self.rtt_var = (1.0 - beta) * self.rtt_var + beta * abs( + deviation + ) - # computes a smoothed round-trip time (SRTT) - if self.smoothed_rtt == 0.0: - self.smoothed_rtt = sample_rtt - else: - self.smoothed_rtt = ( - 1.0 - alpha - ) * self.smoothed_rtt + alpha * sample_rtt - self.rto = max(1.0, self.smoothed_rtt + 4.0 * self.rtt_var) + # computes a smoothed round-trip time (SRTT) + if self.smoothed_rtt == 0.0: + self.smoothed_rtt = sample_rtt + else: + self.smoothed_rtt = ( + 1.0 - alpha + ) * self.smoothed_rtt + alpha * sample_rtt + self.rto = max(1.0, self.smoothed_rtt + 4.0 * self.rtt_var) + self.last_rtt_sample = sample_rtt self.last_ack = ack.ack - self.congestion_control.ack_received(sample_rtt, self.env.now) + if eligible_rtt_sample: + rtt_for_cc = sample_rtt + else: + rtt_for_cc = self.smoothed_rtt + if rtt_for_cc == 0.0: + rtt_for_cc = self.last_rtt_sample + self.congestion_control.ack_received(rtt_for_cc, self.env.now) if self.debug: print( diff --git a/tests/packet/test_tcp_generator.py b/tests/packet/test_tcp_generator.py index 3a7848e..f77851e 100644 --- a/tests/packet/test_tcp_generator.py +++ b/tests/packet/test_tcp_generator.py @@ -182,6 +182,10 @@ def test_tcp_sender_skips_rtt_update_for_ack_covering_retransmitted_data(): sender.last_ack = 0 sender.dupack = 2 sender.next_seq = 512 + sender.smoothed_rtt = 1.0 + sender.rtt_var = 0.25 + sender.rto = 2.0 + sender.last_rtt_sample = 1.0 sender.put(make_ack(0, time=1.0)) sender.put(make_ack(0, time=1.0)) @@ -189,8 +193,10 @@ def test_tcp_sender_skips_rtt_update_for_ack_covering_retransmitted_data(): sink.packets.clear() sink.waits.clear() sender.congestion_control.ack_received_calls.clear() + before = (sender.smoothed_rtt, sender.rtt_var, sender.rto) sender.put(make_ack(512, time=1.2)) - assert sender.congestion_control.ack_received_calls == [] + assert sender.congestion_control.ack_received_calls == [(1.0, 5)] + assert (sender.smoothed_rtt, sender.rtt_var, sender.rto) == before assert sink.packets == [] From 8509d635f3cd87159dd68337188f24669630cdb4 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:26:25 -0400 Subject: [PATCH 08/14] docs(tcp): add timing contract note Add a short transport timing note that documents the sender/receiver contract used by the TCP rewrite. Link it from the TCP section of the README so it is easy to find from the main project entry point. Validation: - inspected docs/tcp_timing.md - verified README pointer Refs: DT-269 Co-authored-by: Codex --- README.md | 2 ++ docs/tcp_timing.md | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 docs/tcp_timing.md diff --git a/README.md b/README.md index e6f3874..eaa244c 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,8 @@ The network components that have already been implemented include: * `TracePacketGenerator`: generates packets according to a trace file, with each row in the trace file representing a packet. * `TCPPacketGenerator`: generates packets using TCP as the transport protocol. + See [`docs/tcp_timing.md`](docs/tcp_timing.md) for the sender/receiver + timing contract used by the TCP rewrite. * `ProxyPacketGenerator`: redirects real-world packets (with fixed packet sizes) into the simulation environment. diff --git a/docs/tcp_timing.md b/docs/tcp_timing.md new file mode 100644 index 0000000..1407b27 --- /dev/null +++ b/docs/tcp_timing.md @@ -0,0 +1,23 @@ +# TCP Timing Contract + +This note captures the transport semantics the TCP rewrite relies on. + +- `TCPSink` tracks the cumulative ACK frontier, `RCV.NXT`, from the contiguous + prefix only. +- `TCPPacketGenerator` owns logical segment state keyed by segment start + sequence number. +- Every send or retransmit emits a fresh `Packet` object derived from that + logical segment state. +- `Packet.time` on TCP data packets is the original first-transmit timestamp + used by sinks for end-to-end latency accounting. +- RTT and RTO updates are sender-owned and conservative. +- In this phase, the sender does not guess RTT from retransmitted or ambiguous + ACKs. +- The transport rewrite assumes no TCP timestamps, no SACK, and no delayed-ACK + modeling. + +## Why this exists + +The classic TCP rewrite splits timing ownership between packet objects, +receiver ACK logic, and sender metadata. This note gives a stable reference for +the intended split so later changes can preserve the same contract. From 2d67f187dd0825cbb5645697f6dc3208011d9387 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:29:57 -0400 Subject: [PATCH 09/14] test(tcp): add red coverage for BBR timing semantics Add deterministic packet-level BBR tests that pin down buffered sub-MSS and tail sending, fresh packet emission on retransmission, segment-end ACK cleanup, and rate-sample stability when retransmission timing changes. These tests intentionally fail against the current BBR implementation so the follow-on rewrite can be driven by explicit red state instead of assumptions. Validation: - uv run python -m pytest -q tests/packet/test_bbr_generator.py (fails with 6 assertions) Refs: DT-270 Co-authored-by: Codex --- tests/packet/test_bbr_generator.py | 293 +++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 tests/packet/test_bbr_generator.py diff --git a/tests/packet/test_bbr_generator.py b/tests/packet/test_bbr_generator.py new file mode 100644 index 0000000..ab4eb0a --- /dev/null +++ b/tests/packet/test_bbr_generator.py @@ -0,0 +1,293 @@ +import pytest + +from ns.flow.flow import AppType, Flow +from ns.packet.bbr_generator import BBRPacketGenerator +from ns.packet.packet import Packet + +simpy = pytest.importorskip("simpy") + + +class CaptureSink: + def __init__(self, env): + self.env = env + self.packets = [] + self.waits = [] + + def put(self, packet): + self.packets.append(packet) + self.waits.append(self.env.now - packet.time) + + +class DummyTimer: + def __init__(self, rto=1.0): + self.rto = rto + self.restart_calls = [] + self.stop_calls = 0 + + def stop(self): + self.stop_calls += 1 + + def restart(self, revised_rto, start_time=0): + self.rto = revised_rto + self.restart_calls.append((revised_rto, start_time)) + + +class DummyCC: + def __init__(self, cwnd=4096): + self.cwnd = cwnd + self.pacing_rate = 0 + self.next_departure_time = 0 + self.calls = [] + + def timer_expired(self, packet=None): + self.calls.append(("timer_expired", packet)) + + def dupack_over(self): + self.calls.append(("dupack_over",)) + + def consecutive_dupacks_received(self, packet=None): + self.calls.append(("consecutive_dupacks_received", packet)) + + def more_dupacks_received(self, packet=None): + self.calls.append(("more_dupacks_received", packet)) + + def set_before_control(self, current_time, packet_in_flight): + self.calls.append(("set_before_control", current_time, packet_in_flight)) + + def ack_received(self, rtt, current_time): + self.calls.append(("ack_received", rtt, current_time)) + + +def make_flow(size, finish_time=1.0): + return Flow( + fid=1, + src="src", + dst="dst", + size=size, + finish_time=finish_time, + typ=AppType.BULK_TRANSFER, + ) + + +def make_sender(env, size, cwnd=4096, finish_time=1.0): + sender = BBRPacketGenerator( + env, + make_flow(size, finish_time), + DummyCC(cwnd), + debug=False, + ) + sink = CaptureSink(env) + sender.out = sink + return sender, sink + + +def make_ack( + packet_id, + *, + ack, + time, + delivered_time, + first_sent_time, + delivered=0, + lost=0, + is_app_limited=False, + flow_id=10001, +): + ack_packet = Packet( + time=time, + size=40, + packet_id=packet_id, + flow_id=flow_id, + src="dst", + dst="src", + ) + ack_packet.ack = ack + ack_packet.delivered_time = delivered_time + ack_packet.first_sent_time = first_sent_time + ack_packet.delivered = delivered + ack_packet.lost = lost + ack_packet.is_app_limited = is_app_limited + return ack_packet + + +def test_bbr_sender_sends_buffered_sub_mss_data(): + env = simpy.Environment() + sender, sink = make_sender(env, size=300) + + env.run(until=0.01) + + assert [packet.size for packet in sink.packets] == [300] + + +def test_bbr_sender_sends_final_tail_segment(): + env = simpy.Environment() + sender, sink = make_sender(env, size=768) + + env.run(until=0.01) + + assert [packet.size for packet in sink.packets] == [512, 256] + + +def test_bbr_sender_fast_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + original = Packet( + time=1.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + original.delivered_time = 1.0 + original.first_sent_time = 0.0 + original.delivered = 0 + original.self_lost = False + sender.sent_packets[0] = original + sender.max_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + sender.dupack = 1 + sender.timer = DummyTimer(rto=1.0) + + sender.put( + make_ack( + 0, + ack=0, + time=1.0, + delivered_time=1.0, + first_sent_time=0.0, + ) + ) + + assert any( + call[0] == "consecutive_dupacks_received" + for call in sender.congestion_control.calls + ) + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(4.0) + assert original.time == 1.0 + + +def test_bbr_sender_timeout_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + original = Packet( + time=1.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + original.delivered_time = 1.0 + original.first_sent_time = 0.0 + original.delivered = 0 + original.self_lost = False + sender.sent_packets[0] = original + sender.max_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + sender.timer = DummyTimer(rto=1.0) + + sender.timeout_callback(0) + + assert sender.congestion_control.calls[0][0] == "set_before_control" + assert sender.congestion_control.calls[1][0] == "timer_expired" + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(4.0) + assert original.time == 1.0 + + +def test_bbr_sender_cleans_up_short_segments_with_segment_end_ack(): + env = simpy.Environment(initial_time=5) + sender, _ = make_sender(env, size=1024) + first = Packet( + time=0.0, + size=256, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + second = Packet( + time=0.0, + size=256, + packet_id=256, + flow_id=1, + src="src", + dst="dst", + ) + for packet in (first, second): + packet.delivered_time = 1.0 + packet.first_sent_time = 0.0 + packet.delivered = packet.packet_id + packet.self_lost = False + + sender.sent_packets = {0: first, 256: second} + sender.max_ack = 0 + sender.last_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + + sender.put( + make_ack( + 0, + ack=512, + time=0.0, + delivered_time=1.0, + first_sent_time=0.0, + ) + ) + + assert sender.sent_packets == {} + assert sender.packet_in_flight == 0 + assert sender.max_ack == 512 + + +def test_bbr_sender_rate_sample_stays_stable_across_retransmit_timing_changes(): + def run_scenario(ack_time, include_timeout): + env = simpy.Environment(initial_time=5) + sender, _ = make_sender(env, size=1024) + original = Packet( + time=0.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + original.delivered_time = 1.0 + original.first_sent_time = 0.0 + original.delivered = 0 + original.self_lost = False + sender.sent_packets[0] = original + sender.max_ack = 0 + sender.last_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + sender.timer = DummyTimer(rto=1.0) + + if include_timeout: + sender.timeout_callback(0) + + sender.put( + make_ack( + 0, + ack=512, + time=ack_time, + delivered_time=1.0, + first_sent_time=0.0, + ) + ) + return ( + tuple(sender.congestion_control.calls), + sender.congestion_control.rs.send_elapsed, + sender.congestion_control.rs.ack_elapsed, + sender.congestion_control.rs.delivery_rate, + ) + + clean = run_scenario(ack_time=0.0, include_timeout=False) + retransmitted = run_scenario(ack_time=5.0, include_timeout=True) + + assert retransmitted == clean From e76057bae485033a1d13d3d20b8d6de8a1a3e093 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:37:14 -0400 Subject: [PATCH 10/14] fix(tcp): align BBR packet timing with sender state Refactor BBRPacketGenerator to keep outstanding-segment metadata in sender-owned state instead of mutating Packet objects in place. New sends and retransmissions now emit fresh Packet instances, preserve the original first-transmit timestamp for sink latency accounting, and reuse stable rate-sample metadata across retransmission attempts. The BBR sender also now handles buffered sub-MSS and tail sends, cleans up acked data with segment-end semantics, and restarts its retransmission timer from the oldest outstanding segment state. The packet-level BBR regression test was narrowed to compare the stable rate-sample outputs and final ACK-facing RTT input rather than the entire congestion-control call history, which necessarily differs when a timeout path runs. Validation: - uv run python -m pytest -q tests/packet/test_bbr_generator.py - uv run python -m pytest -q tests/flow/test_bbrv3.py tests/flow/test_bbrv3_integration.py Refs: DT-271 Co-authored-by: Codex --- ns/packet/bbr_generator.py | 281 ++++++++++++++++++----------- tests/packet/test_bbr_generator.py | 5 +- 2 files changed, 181 insertions(+), 105 deletions(-) diff --git a/ns/packet/bbr_generator.py b/ns/packet/bbr_generator.py index a9744ff..3eeca73 100644 --- a/ns/packet/bbr_generator.py +++ b/ns/packet/bbr_generator.py @@ -4,6 +4,7 @@ """ import copy +from dataclasses import dataclass import simpy @@ -12,6 +13,21 @@ from ns.utils.timer import Timer +@dataclass +class SegmentState: + seq: int + size: int + first_tx_time: float + last_tx_time: float + first_sent_time: float = 0.0 + delivered_time: float = 0.0 + delivered: int = 0 + lost: int = 0 + is_app_limited: bool = False + tx_in_flight: int = 0 + retransmit_count: int = 0 + + class BBRPacketGenerator: """Generates packets with a simulated TCP protocol. @@ -75,6 +91,7 @@ def __init__( # metadata in sender-owned state and preserves Packet.time as the # original first-transmit timestamp seen by sinks. self.sent_packets = {} + self.segment_state = {} self.timer = None self.to_pkt_id = 0 @@ -82,6 +99,130 @@ def __init__( self.action = env.process(self.run()) self.debug = debug + def _build_packet(self, state): + """Create a fresh packet attempt from sender-owned segment state.""" + packet = Packet( + state.first_tx_time, + state.size, + state.seq, + src=self.flow.src, + flow_id=self.flow.fid, + tx_in_flight=state.tx_in_flight, + ) + packet.first_sent_time = state.first_sent_time + packet.delivered_time = state.delivered_time + packet.delivered = state.delivered + packet.lost = state.lost + packet.is_app_limited = state.is_app_limited + return packet + + def _get_segment_state(self, packet_id): + """Return sender-owned state for an outstanding BBR segment.""" + state = self.segment_state.get(packet_id) + if state is not None: + return state + + packet = self.sent_packets[packet_id] + state = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=packet.time, + first_sent_time=packet.first_sent_time, + delivered_time=packet.delivered_time, + delivered=packet.delivered, + lost=packet.lost, + is_app_limited=packet.is_app_limited, + tx_in_flight=packet.tx_in_flight, + ) + self.segment_state[packet_id] = state + return state + + def _send_new_packet(self, packet_size): + """Send a new BBR data packet and register its sender-owned state.""" + packet = Packet( + self.env.now, + packet_size, + self.next_seq, + src=self.flow.src, + flow_id=self.flow.fid, + tx_in_flight=self.packet_in_flight, + ) + self.congestion_control.rs.send_packet( + packet, + self.congestion_control.C, + self.max_ack - self.next_seq, + self.env.now, + ) + self.congestion_control.next_departure_time = self.env.now + if self.congestion_control.pacing_rate > 0: + self.congestion_control.next_departure_time += ( + packet.size / self.congestion_control.pacing_rate + ) + + self.sent_packets[packet.packet_id] = packet + self.segment_state[packet.packet_id] = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=self.env.now, + first_sent_time=packet.first_sent_time, + delivered_time=packet.delivered_time, + delivered=packet.delivered, + lost=packet.lost, + is_app_limited=packet.is_app_limited, + tx_in_flight=packet.tx_in_flight, + ) + self.packet_in_flight += packet.size + if self.debug: + print( + f"Send packet {packet.packet_id} with size {packet.size}, " + f"flow_id {packet.flow_id} at time {self.env.now:.4f}, " + f"and the packet delivered time is {packet.delivered_time:.4f}." + ) + self.out.put(packet) + + self.next_seq += packet.size + + self.congestion_control.C.check_if_application_limited( + self.next_seq, self.mss, self.packet_in_flight + ) + + if self.timer is None: + self.timer = Timer(self.env, 0, self.timeout_callback, self.rto) + self.to_pkt_id = packet.packet_id + + if self.debug: + print( + f"Setting a timer for packet {packet.packet_id} with an RTO" + f" of {self.rto:.4f}." + ) + + def _retransmit_packet(self, packet_id): + """Emit a fresh retransmission attempt for an outstanding segment.""" + state = self._get_segment_state(packet_id) + state.retransmit_count += 1 + state.last_tx_time = self.env.now + state.tx_in_flight = self.packet_in_flight + resent_pkt = self._build_packet(state) + self.sent_packets[packet_id] = resent_pkt + return resent_pkt + + def _restart_oldest_timer(self): + """Point the retransmission timer at the oldest outstanding segment.""" + if not self.segment_state: + if self.timer is not None: + self.timer.stop() + self.timer = None + self.to_pkt_id = 0 + return + + oldest_packet_id = min(self.segment_state) + self.to_pkt_id = oldest_packet_id + if self.timer is None: + self.timer = Timer(self.env, 0, self.timeout_callback, self.rto) + self.timer.restart(self.rto, self.segment_state[oldest_packet_id].last_tx_time) + def update_next_seq(self): self.send_buffer += self.flow.next_send_buffer(self.env.now) self.congestion_control.C.write_seq = self.send_buffer + 1 @@ -115,84 +256,35 @@ def run(self): yield self.env.timeout( self.congestion_control.next_departure_time - self.env.now ) - if self.next_seq + self.mss > min( - self.send_buffer, self.last_ack + self.congestion_control.cwnd - ): + send_limit = min(self.send_buffer, self.last_ack + self.congestion_control.cwnd) + available_bytes = send_limit - self.next_seq + if available_bytes <= 0: self.congestion_control.C.is_cwnd_limited = True yield self.cwnd_available.get() else: - packet = Packet( - self.env.now, - self.mss, - self.next_seq, - src=self.flow.src, - flow_id=self.flow.fid, - tx_in_flight=self.packet_in_flight, - ) - self.congestion_control.rs.send_packet( - packet, - self.congestion_control.C, - self.max_ack - self.next_seq, - self.env.now, - ) - self.congestion_control.next_departure_time = self.env.now - if self.congestion_control.pacing_rate > 0: - self.congestion_control.next_departure_time += ( - packet.size / self.congestion_control.pacing_rate - ) - - self.sent_packets[packet.packet_id] = packet - self.packet_in_flight += packet.size - if self.debug: - print( - f"Send packet {packet.packet_id} with size {packet.size}, " - f"flow_id {packet.flow_id} at time {self.env.now:.4f}, " - f"and the packet delivered time is {packet.delivered_time:.4f}." - ) - self.out.put(packet) - - self.next_seq += packet.size - - self.congestion_control.C.check_if_application_limited( - self.next_seq, self.mss, self.packet_in_flight - ) - - if self.timer is None: - self.timer = Timer(self.env, 0, self.timeout_callback, self.rto) - self.to_pkt_id = packet.packet_id - - if self.debug: - print( - f"Setting a timer for packet {packet.packet_id} with an RTO" - f" of {self.rto:.4f}." - ) + self._send_new_packet(min(self.mss, available_bytes)) def timeout_callback(self, packet_id=0): """To be called when a timer expired for a packet with 'packet_id'.""" self.update_next_seq() - packet_id = self.max_ack + if not self.segment_state: + if not self.sent_packets: + return + packet_id = self.to_pkt_id or min(self.sent_packets) + state = self._get_segment_state(packet_id) if self.debug: print( f"Timer expired for packet {packet_id} {self.flow.fid} " f"at time {self.env.now:.4f}." ) - self.congestion_control.C.lost += self.sent_packets[packet_id].size - self.sent_packets[packet_id].self_lost = True + self.congestion_control.C.lost += state.size self.congestion_control.set_before_control(self.env.now, self.packet_in_flight) self.congestion_control.timer_expired(self.sent_packets[packet_id]) # retransmitting the segment - resent_pkt = self.sent_packets[packet_id] - resent_pkt.time = self.env.now - self.congestion_control.rs.send_packet( - resent_pkt, - self.congestion_control.C, - self.max_ack - self.next_seq, - self.env.now, - ) - + resent_pkt = self._retransmit_packet(packet_id) self.out.put(resent_pkt) self.rto *= 2 if self.rto > 60: @@ -204,7 +296,8 @@ def timeout_callback(self, packet_id=0): ) # starting a new timer for this segment and doubling the retransmission timeout - self.timer.restart(self.rto) + self.timer.restart(self.rto, self.segment_state[packet_id].last_tx_time) + self.to_pkt_id = packet_id self.congestion_control.C.check_if_application_limited( self.next_seq, self.mss, self.packet_in_flight @@ -217,7 +310,7 @@ def put(self, ack): self.next_seq, self.mss, self.packet_in_flight ) - sample_rtt = self.env.now - ack.time + sample_rtt = self.env.now - ack.first_sent_time self.congestion_control.rs.newly_acked = ack.ack - self.last_ack if ack.ack == self.last_ack: @@ -256,13 +349,8 @@ def put(self, ack): self.max_ack = max(self.max_ack, ack.ack) - if ack.packet_id == self.to_pkt_id and self.max_ack < self.next_seq: - self.timer.restart(self.rto, self.sent_packets[self.max_ack].time) - if self.dupack == 2: - self.congestion_control.C.lost += self.sent_packets[ack.ack].size - - self.sent_packets[ack.ack].self_lost = True + self.congestion_control.C.lost += self._get_segment_state(ack.ack).size self.congestion_control.set_before_control( self.env.now, self.packet_in_flight @@ -272,18 +360,7 @@ def put(self, ack): ) self.congestion_control.ack_received(sample_rtt, self.env.now) - resent_pkt = self.sent_packets[ack.ack] - resent_pkt.time = self.env.now - self.congestion_control.rs.send_packet( - resent_pkt, - self.congestion_control.C, - self.max_ack - self.next_seq, - self.env.now, - ) - assert resent_pkt.delivered_time > 0 - # self.congestion_control.next_departure_time = self.env.now - # if(self.congestion_control.pacing_rate > 0): - # self.congestion_control.next_departure_time += resent_pkt.size / self.congestion_control.pacing_rate + resent_pkt = self._retransmit_packet(ack.ack) if self.debug: print( @@ -304,30 +381,20 @@ def put(self, ack): self.env.now, self.packet_in_flight ) - temp_pkt = copy.copy(ack) - temp_pkt.size = self.mss - self.congestion_control.rs.updaterate_sample( - temp_pkt, self.congestion_control.C, self.env.now - ) - - bbr_update = False - if ack.packet_id in self.sent_packets: - # temp_pkt = copy.copy(ack) - # temp_pkt.size = self.mss - # self.congestion_control..updaterate_sample(temp_pkt, self.congestion_control.C, self.env.now) - bbr_update = True - self.packet_in_flight -= self.sent_packets[ack.packet_id].size - self.sent_packets[ack.packet_id].delivered_time = 0 - self.sent_packets[ack.packet_id].self_lost = False - - for i in range(self.max_ack, ack.ack, self.mss): - if i in self.sent_packets.keys(): - if self.sent_packets[i].delivered_time: - self.packet_in_flight -= self.sent_packets[i].size - self.congestion_control.rs.updaterate_sample( - self.sent_packets[i], self.congestion_control.C, self.env.now - ) - del self.sent_packets[i] + acked_packet_ids = [] + for packet_id in sorted(self.sent_packets): + state = self._get_segment_state(packet_id) + if packet_id + state.size <= ack.ack: + acked_packet_ids.append(packet_id) + + bbr_update = bool(acked_packet_ids) + for packet_id in sorted(acked_packet_ids): + packet = self.sent_packets[packet_id] + if packet.delivered_time: + self.packet_in_flight -= packet.size + self.congestion_control.rs.updaterate_sample( + packet, self.congestion_control.C, self.env.now + ) self.congestion_control.rs.update_sample_group( self.congestion_control.C, sample_rtt @@ -360,10 +427,16 @@ def put(self, ack): if bbr_update: self.congestion_control.ack_received(sample_rtt, self.env.now) + for packet_id in sorted(acked_packet_ids): + del self.sent_packets[packet_id] + del self.segment_state[packet_id] + if self.max_ack == self.next_seq and self.timer is not None: self.timer.stop() del self.timer self.timer = None + elif acked_packet_ids: + self._restart_oldest_timer() self.congestion_control.C.is_cwnd_limited = False self.cwnd_available.put(True) diff --git a/tests/packet/test_bbr_generator.py b/tests/packet/test_bbr_generator.py index ab4eb0a..93941bc 100644 --- a/tests/packet/test_bbr_generator.py +++ b/tests/packet/test_bbr_generator.py @@ -280,8 +280,11 @@ def run_scenario(ack_time, include_timeout): first_sent_time=0.0, ) ) + ack_calls = [ + call for call in sender.congestion_control.calls if call[0] == "ack_received" + ] return ( - tuple(sender.congestion_control.calls), + ack_calls[-1][1:], sender.congestion_control.rs.send_elapsed, sender.congestion_control.rs.ack_elapsed, sender.congestion_control.rs.delivery_rate, From 6808996ef8146bbb3012de1f0898cedb573ad090 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 18:40:57 -0400 Subject: [PATCH 11/14] fix(tcp): ignore dupacks beyond outstanding data Guard fast retransmit when a duplicate ACK no longer maps to a live outstanding segment. Validation exposed a KeyError in examples/tcp.py after a cumulative ACK advanced past the stored sender frontier while dupack handling still tried to resolve segment state for that ACK. Add a regression test that reproduces the stale-frontier dupack path and keeps the sender side effect free when no resendable segment remains. Validation: - uv run python -m pytest -q - uv run python examples/tcp.py - uv run python examples/bbr.py Refs: DT-272 Co-authored-by: Codex --- ns/packet/tcp_generator.py | 6 +++++- tests/packet/test_tcp_generator.py | 13 +++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/ns/packet/tcp_generator.py b/ns/packet/tcp_generator.py index 9ba0524..134d904 100644 --- a/ns/packet/tcp_generator.py +++ b/ns/packet/tcp_generator.py @@ -240,7 +240,11 @@ def put(self, ack): if self.dupack == 3: self.congestion_control.consecutive_dupacks_received() - state = self._get_segment_state(ack.ack) + state = self.segment_state.get(ack.ack) + if state is None and ack.ack in self.sent_packets: + state = self._get_segment_state(ack.ack) + if state is None: + return state.retransmit_count += 1 state.last_tx_time = self.env.now resent_pkt = self._build_packet(state) diff --git a/tests/packet/test_tcp_generator.py b/tests/packet/test_tcp_generator.py index f77851e..4114891 100644 --- a/tests/packet/test_tcp_generator.py +++ b/tests/packet/test_tcp_generator.py @@ -200,3 +200,16 @@ def test_tcp_sender_skips_rtt_update_for_ack_covering_retransmitted_data(): assert sender.congestion_control.ack_received_calls == [(1.0, 5)] assert (sender.smoothed_rtt, sender.rtt_var, sender.rto) == before assert sink.packets == [] + + +def test_tcp_sender_ignores_dupacks_for_unknown_segment_frontier(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + sender.last_ack = 1024 + sender.dupack = 2 + sender.next_seq = 1536 + + sender.put(make_ack(1024, time=1.0)) + + assert sender.congestion_control.consecutive_dupacks_calls == 1 + assert sink.packets == [] From a69f30c69232d641e188b5c1a8d40f5be0468ffa Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 19:14:53 -0400 Subject: [PATCH 12/14] fix(bbr): preserve timeout order and ACK RTT sampling Keep the single retransmission timer anchored to the oldest outstanding segment instead of retargeting it to each newly sent packet. This restores loss-recovery ordering for multi-packet flights and prevents timeout retransmits from skipping the earliest loss. Also restore ACK RTT sampling to the acknowledged packet's transport timestamp. In this design `ack.time` carries that segment timestamp while `first_sent_time` remains the flight-level marker for delivery-rate sampling, so using `first_sent_time` inflated RTT and RTO on paced flights. Add regression coverage for both review findings and align the existing rate-sample stability fixture with the sender-owned timestamp contract. Validation: - uv run python -m pytest -q tests/packet/test_bbr_generator.py tests/flow/test_bbrv3.py tests/flow/test_bbrv3_integration.py - uv run python -m pytest -q - uv run python examples/bbr.py - uv run python examples/tcp.py Co-authored-by: Codex --- ns/packet/bbr_generator.py | 6 ++- tests/packet/test_bbr_generator.py | 73 ++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 6 deletions(-) diff --git a/ns/packet/bbr_generator.py b/ns/packet/bbr_generator.py index 3eeca73..5708547 100644 --- a/ns/packet/bbr_generator.py +++ b/ns/packet/bbr_generator.py @@ -190,7 +190,7 @@ def _send_new_packet(self, packet_size): if self.timer is None: self.timer = Timer(self.env, 0, self.timeout_callback, self.rto) - self.to_pkt_id = packet.packet_id + self.to_pkt_id = packet.packet_id if self.debug: print( @@ -310,7 +310,9 @@ def put(self, ack): self.next_seq, self.mss, self.packet_in_flight ) - sample_rtt = self.env.now - ack.first_sent_time + # ACK RTT follows the acknowledged segment's transport timestamp. + # first_sent_time remains the flight-level marker for RateSample only. + sample_rtt = self.env.now - ack.time self.congestion_control.rs.newly_acked = ack.ack - self.last_ack if ack.ack == self.last_ack: diff --git a/tests/packet/test_bbr_generator.py b/tests/packet/test_bbr_generator.py index 93941bc..dc09093 100644 --- a/tests/packet/test_bbr_generator.py +++ b/tests/packet/test_bbr_generator.py @@ -199,6 +199,22 @@ def test_bbr_sender_timeout_retransmit_emits_fresh_packet_without_resetting_time assert original.time == 1.0 +def test_bbr_sender_keeps_timeout_pointer_on_oldest_outstanding_segment(): + env = simpy.Environment() + sender, sink = make_sender(env, size=1024) + + env.run(until=0.01) + + sender.timer = DummyTimer(rto=sender.rto) + + assert sender.to_pkt_id == 0 + + sender.timeout_callback(0) + + assert sink.packets[-1].packet_id == 0 + assert sender.to_pkt_id == 0 + + def test_bbr_sender_cleans_up_short_segments_with_segment_end_ack(): env = simpy.Environment(initial_time=5) sender, _ = make_sender(env, size=1024) @@ -246,7 +262,7 @@ def test_bbr_sender_cleans_up_short_segments_with_segment_end_ack(): def test_bbr_sender_rate_sample_stays_stable_across_retransmit_timing_changes(): - def run_scenario(ack_time, include_timeout): + def run_scenario(include_timeout): env = simpy.Environment(initial_time=5) sender, _ = make_sender(env, size=1024) original = Packet( @@ -275,7 +291,9 @@ def run_scenario(ack_time, include_timeout): make_ack( 0, ack=512, - time=ack_time, + # ACKs keep the segment's original transmit timestamp even + # after a retransmission attempt. + time=0.0, delivered_time=1.0, first_sent_time=0.0, ) @@ -290,7 +308,54 @@ def run_scenario(ack_time, include_timeout): sender.congestion_control.rs.delivery_rate, ) - clean = run_scenario(ack_time=0.0, include_timeout=False) - retransmitted = run_scenario(ack_time=5.0, include_timeout=True) + clean = run_scenario(include_timeout=False) + retransmitted = run_scenario(include_timeout=True) assert retransmitted == clean + + +def test_bbr_sender_samples_rtt_from_the_acked_packet_time(): + env = simpy.Environment(initial_time=8) + sender, _ = make_sender(env, size=1024) + first = Packet( + time=1.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + second = Packet( + time=3.0, + size=512, + packet_id=512, + flow_id=1, + src="src", + dst="dst", + ) + for packet in (first, second): + packet.delivered_time = 1.0 + packet.delivered = packet.packet_id + packet.first_sent_time = 1.0 + packet.self_lost = False + + sender.sent_packets = {0: first, 512: second} + sender.max_ack = 0 + sender.last_ack = 0 + sender.next_seq = 1024 + sender.packet_in_flight = 1024 + sender.timer = DummyTimer(rto=1.0) + + sender.put( + make_ack( + 512, + ack=1024, + time=3.0, + delivered_time=1.0, + first_sent_time=1.0, + ) + ) + + assert sender.congestion_control.calls[-1] == ("ack_received", 5.0, 8) + assert sender.rtt_estimate == pytest.approx(5.0) + assert sender.rto == pytest.approx(15.0) From 592e1f15c5be0de4b670b6fbb54093fa48cf9fdf Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 19:37:08 -0400 Subject: [PATCH 13/14] test(tcp): add end-to-end retransmit latency coverage Add a deterministic TCP integration test that drops the first transmission of a single segment, lets the timeout retransmission through, and asserts that the receiver-observed wait is still measured from the original first transmit time. This turns the issue #34 behavior into an end-to-end regression instead of an example-only check. Validation: - uv run --with pytest python -m pytest -q tests/flow/test_tcp_integration.py - uv run --with pytest python -m pytest -q tests/packet/test_tcp_generator.py tests/packet/test_tcp_sink.py tests/flow/test_tcp_integration.py - uv run --with pytest python -m pytest -q Refs: #34 Co-authored-by: Codex --- tests/flow/test_tcp_integration.py | 55 ++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 tests/flow/test_tcp_integration.py diff --git a/tests/flow/test_tcp_integration.py b/tests/flow/test_tcp_integration.py new file mode 100644 index 0000000..4ca29b7 --- /dev/null +++ b/tests/flow/test_tcp_integration.py @@ -0,0 +1,55 @@ +import pytest + +from ns.flow.cc import TCPReno +from ns.flow.flow import Flow +from ns.packet.tcp_generator import TCPPacketGenerator +from ns.packet.tcp_sink import TCPSink +from ns.port.wire import Wire + +simpy = pytest.importorskip("simpy") + + +def const_delay(value): + return lambda: value + + +class DropFirstTransmission: + def __init__(self, packet_id): + self.packet_id = packet_id + self.out = None + self.dropped = False + + def put(self, packet): + if packet.packet_id == self.packet_id and not self.dropped: + self.dropped = True + return + + self.out.put(packet) + + +def test_tcp_end_to_end_retransmit_latency_uses_first_transmit_time(): + env = simpy.Environment() + flow = Flow(fid=7, src="src", dst="dst", size=512, finish_time=5) + sender = TCPPacketGenerator( + env, + flow=flow, + cc=TCPReno(), + element_id="tcp-flow", + debug=False, + ) + receiver = TCPSink(env, rec_waits=True, debug=False) + down = Wire(env, const_delay(0.1)) + up = Wire(env, const_delay(0.1)) + drop_first = DropFirstTransmission(packet_id=0) + + sender.out = down + down.out = drop_first + drop_first.out = receiver + receiver.out = up + up.out = sender + + env.run(until=2.0) + + assert receiver.packet_times[flow.fid] == [0.0] + assert receiver.waits[flow.fid] == [pytest.approx(1.1)] + assert sender.last_ack == 512 From fd204c7b67dfcb8a2bcb85b52dbc2644021a7a82 Mon Sep 17 00:00:00 2001 From: Baochun Li Date: Tue, 31 Mar 2026 19:39:28 -0400 Subject: [PATCH 14/14] docs(test): add additional instructions in README about running tests. --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index eaa244c..bd241dc 100644 --- a/README.md +++ b/README.md @@ -288,3 +288,11 @@ self.deficit[flow_id] += self.quantum[flow_id] ``` Most often, the mapping between flow IDs and per-flow parameters, such as weights in a Weighted Fair Queueing scheduler or priorities in a Static Priority scheduler, need to be stored in a dictionary, and then used to initialized these schedulers. An optional (but not recommended) style is to assign consecutive integers as flow IDs to the flows throughout the entire network, and then use simple lists of per-flow parameters to initialize the schedulers. In this case, flow IDs will be directly used as indices to look up these lists to find the parameter values. + +## Running Tests + +A few dozen tests have been included in the project. To run them, use the command: + +```bash +uv run --with pytest python -m pytest -q +```