diff --git a/README.md b/README.md index e6f3874..bd241dc 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,8 @@ The network components that have already been implemented include: * `TracePacketGenerator`: generates packets according to a trace file, with each row in the trace file representing a packet. * `TCPPacketGenerator`: generates packets using TCP as the transport protocol. + See [`docs/tcp_timing.md`](docs/tcp_timing.md) for the sender/receiver + timing contract used by the TCP rewrite. * `ProxyPacketGenerator`: redirects real-world packets (with fixed packet sizes) into the simulation environment. @@ -286,3 +288,11 @@ self.deficit[flow_id] += self.quantum[flow_id] ``` Most often, the mapping between flow IDs and per-flow parameters, such as weights in a Weighted Fair Queueing scheduler or priorities in a Static Priority scheduler, need to be stored in a dictionary, and then used to initialized these schedulers. An optional (but not recommended) style is to assign consecutive integers as flow IDs to the flows throughout the entire network, and then use simple lists of per-flow parameters to initialize the schedulers. In this case, flow IDs will be directly used as indices to look up these lists to find the parameter values. + +## Running Tests + +A few dozen tests have been included in the project. To run them, use the command: + +```bash +uv run --with pytest python -m pytest -q +``` diff --git a/docs/tcp_timing.md b/docs/tcp_timing.md new file mode 100644 index 0000000..1407b27 --- /dev/null +++ b/docs/tcp_timing.md @@ -0,0 +1,23 @@ +# TCP Timing Contract + +This note captures the transport semantics the TCP rewrite relies on. + +- `TCPSink` tracks the cumulative ACK frontier, `RCV.NXT`, from the contiguous + prefix only. +- `TCPPacketGenerator` owns logical segment state keyed by segment start + sequence number. +- Every send or retransmit emits a fresh `Packet` object derived from that + logical segment state. +- `Packet.time` on TCP data packets is the original first-transmit timestamp + used by sinks for end-to-end latency accounting. +- RTT and RTO updates are sender-owned and conservative. +- In this phase, the sender does not guess RTT from retransmitted or ambiguous + ACKs. +- The transport rewrite assumes no TCP timestamps, no SACK, and no delayed-ACK + modeling. + +## Why this exists + +The classic TCP rewrite splits timing ownership between packet objects, +receiver ACK logic, and sender metadata. This note gives a stable reference for +the intended split so later changes can preserve the same contract. diff --git a/ns/packet/bbr_generator.py b/ns/packet/bbr_generator.py index da0eaed..5708547 100644 --- a/ns/packet/bbr_generator.py +++ b/ns/packet/bbr_generator.py @@ -4,6 +4,7 @@ """ import copy +from dataclasses import dataclass import simpy @@ -12,6 +13,21 @@ from ns.utils.timer import Timer +@dataclass +class SegmentState: + seq: int + size: int + first_tx_time: float + last_tx_time: float + first_sent_time: float = 0.0 + delivered_time: float = 0.0 + delivered: int = 0 + lost: int = 0 + is_app_limited: bool = False + tx_in_flight: int = 0 + retransmit_count: int = 0 + + class BBRPacketGenerator: """Generates packets with a simulated TCP protocol. @@ -70,8 +86,12 @@ def __init__( # whether or not space in the congestion window is available self.cwnd_available = simpy.Store(env) - # the in-flight packets (segments) + # In-flight data is keyed by logical segment start sequence number. + # The transport rewrite keeps retransmission timing and delivery-rate + # metadata in sender-owned state and preserves Packet.time as the + # original first-transmit timestamp seen by sinks. self.sent_packets = {} + self.segment_state = {} self.timer = None self.to_pkt_id = 0 @@ -79,6 +99,130 @@ def __init__( self.action = env.process(self.run()) self.debug = debug + def _build_packet(self, state): + """Create a fresh packet attempt from sender-owned segment state.""" + packet = Packet( + state.first_tx_time, + state.size, + state.seq, + src=self.flow.src, + flow_id=self.flow.fid, + tx_in_flight=state.tx_in_flight, + ) + packet.first_sent_time = state.first_sent_time + packet.delivered_time = state.delivered_time + packet.delivered = state.delivered + packet.lost = state.lost + packet.is_app_limited = state.is_app_limited + return packet + + def _get_segment_state(self, packet_id): + """Return sender-owned state for an outstanding BBR segment.""" + state = self.segment_state.get(packet_id) + if state is not None: + return state + + packet = self.sent_packets[packet_id] + state = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=packet.time, + first_sent_time=packet.first_sent_time, + delivered_time=packet.delivered_time, + delivered=packet.delivered, + lost=packet.lost, + is_app_limited=packet.is_app_limited, + tx_in_flight=packet.tx_in_flight, + ) + self.segment_state[packet_id] = state + return state + + def _send_new_packet(self, packet_size): + """Send a new BBR data packet and register its sender-owned state.""" + packet = Packet( + self.env.now, + packet_size, + self.next_seq, + src=self.flow.src, + flow_id=self.flow.fid, + tx_in_flight=self.packet_in_flight, + ) + self.congestion_control.rs.send_packet( + packet, + self.congestion_control.C, + self.max_ack - self.next_seq, + self.env.now, + ) + self.congestion_control.next_departure_time = self.env.now + if self.congestion_control.pacing_rate > 0: + self.congestion_control.next_departure_time += ( + packet.size / self.congestion_control.pacing_rate + ) + + self.sent_packets[packet.packet_id] = packet + self.segment_state[packet.packet_id] = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=self.env.now, + first_sent_time=packet.first_sent_time, + delivered_time=packet.delivered_time, + delivered=packet.delivered, + lost=packet.lost, + is_app_limited=packet.is_app_limited, + tx_in_flight=packet.tx_in_flight, + ) + self.packet_in_flight += packet.size + if self.debug: + print( + f"Send packet {packet.packet_id} with size {packet.size}, " + f"flow_id {packet.flow_id} at time {self.env.now:.4f}, " + f"and the packet delivered time is {packet.delivered_time:.4f}." + ) + self.out.put(packet) + + self.next_seq += packet.size + + self.congestion_control.C.check_if_application_limited( + self.next_seq, self.mss, self.packet_in_flight + ) + + if self.timer is None: + self.timer = Timer(self.env, 0, self.timeout_callback, self.rto) + self.to_pkt_id = packet.packet_id + + if self.debug: + print( + f"Setting a timer for packet {packet.packet_id} with an RTO" + f" of {self.rto:.4f}." + ) + + def _retransmit_packet(self, packet_id): + """Emit a fresh retransmission attempt for an outstanding segment.""" + state = self._get_segment_state(packet_id) + state.retransmit_count += 1 + state.last_tx_time = self.env.now + state.tx_in_flight = self.packet_in_flight + resent_pkt = self._build_packet(state) + self.sent_packets[packet_id] = resent_pkt + return resent_pkt + + def _restart_oldest_timer(self): + """Point the retransmission timer at the oldest outstanding segment.""" + if not self.segment_state: + if self.timer is not None: + self.timer.stop() + self.timer = None + self.to_pkt_id = 0 + return + + oldest_packet_id = min(self.segment_state) + self.to_pkt_id = oldest_packet_id + if self.timer is None: + self.timer = Timer(self.env, 0, self.timeout_callback, self.rto) + self.timer.restart(self.rto, self.segment_state[oldest_packet_id].last_tx_time) + def update_next_seq(self): self.send_buffer += self.flow.next_send_buffer(self.env.now) self.congestion_control.C.write_seq = self.send_buffer + 1 @@ -112,84 +256,35 @@ def run(self): yield self.env.timeout( self.congestion_control.next_departure_time - self.env.now ) - if self.next_seq + self.mss > min( - self.send_buffer, self.last_ack + self.congestion_control.cwnd - ): + send_limit = min(self.send_buffer, self.last_ack + self.congestion_control.cwnd) + available_bytes = send_limit - self.next_seq + if available_bytes <= 0: self.congestion_control.C.is_cwnd_limited = True yield self.cwnd_available.get() else: - packet = Packet( - self.env.now, - self.mss, - self.next_seq, - src=self.flow.src, - flow_id=self.flow.fid, - tx_in_flight=self.packet_in_flight, - ) - self.congestion_control.rs.send_packet( - packet, - self.congestion_control.C, - self.max_ack - self.next_seq, - self.env.now, - ) - self.congestion_control.next_departure_time = self.env.now - if self.congestion_control.pacing_rate > 0: - self.congestion_control.next_departure_time += ( - packet.size / self.congestion_control.pacing_rate - ) - - self.sent_packets[packet.packet_id] = packet - self.packet_in_flight += packet.size - if self.debug: - print( - f"Send packet {packet.packet_id} with size {packet.size}, " - f"flow_id {packet.flow_id} at time {self.env.now:.4f}, " - f"and the packet delivered time is {packet.delivered_time:.4f}." - ) - self.out.put(packet) - - self.next_seq += packet.size - - self.congestion_control.C.check_if_application_limited( - self.next_seq, self.mss, self.packet_in_flight - ) - - if self.timer is None: - self.timer = Timer(self.env, 0, self.timeout_callback, self.rto) - self.to_pkt_id = packet.packet_id - - if self.debug: - print( - f"Setting a timer for packet {packet.packet_id} with an RTO" - f" of {self.rto:.4f}." - ) + self._send_new_packet(min(self.mss, available_bytes)) def timeout_callback(self, packet_id=0): """To be called when a timer expired for a packet with 'packet_id'.""" self.update_next_seq() - packet_id = self.max_ack + if not self.segment_state: + if not self.sent_packets: + return + packet_id = self.to_pkt_id or min(self.sent_packets) + state = self._get_segment_state(packet_id) if self.debug: print( f"Timer expired for packet {packet_id} {self.flow.fid} " f"at time {self.env.now:.4f}." ) - self.congestion_control.C.lost += self.sent_packets[packet_id].size - self.sent_packets[packet_id].self_lost = True + self.congestion_control.C.lost += state.size self.congestion_control.set_before_control(self.env.now, self.packet_in_flight) self.congestion_control.timer_expired(self.sent_packets[packet_id]) # retransmitting the segment - resent_pkt = self.sent_packets[packet_id] - resent_pkt.time = self.env.now - self.congestion_control.rs.send_packet( - resent_pkt, - self.congestion_control.C, - self.max_ack - self.next_seq, - self.env.now, - ) - + resent_pkt = self._retransmit_packet(packet_id) self.out.put(resent_pkt) self.rto *= 2 if self.rto > 60: @@ -201,7 +296,8 @@ def timeout_callback(self, packet_id=0): ) # starting a new timer for this segment and doubling the retransmission timeout - self.timer.restart(self.rto) + self.timer.restart(self.rto, self.segment_state[packet_id].last_tx_time) + self.to_pkt_id = packet_id self.congestion_control.C.check_if_application_limited( self.next_seq, self.mss, self.packet_in_flight @@ -214,6 +310,8 @@ def put(self, ack): self.next_seq, self.mss, self.packet_in_flight ) + # ACK RTT follows the acknowledged segment's transport timestamp. + # first_sent_time remains the flight-level marker for RateSample only. sample_rtt = self.env.now - ack.time self.congestion_control.rs.newly_acked = ack.ack - self.last_ack @@ -253,13 +351,8 @@ def put(self, ack): self.max_ack = max(self.max_ack, ack.ack) - if ack.packet_id == self.to_pkt_id and self.max_ack < self.next_seq: - self.timer.restart(self.rto, self.sent_packets[self.max_ack].time) - if self.dupack == 2: - self.congestion_control.C.lost += self.sent_packets[ack.ack].size - - self.sent_packets[ack.ack].self_lost = True + self.congestion_control.C.lost += self._get_segment_state(ack.ack).size self.congestion_control.set_before_control( self.env.now, self.packet_in_flight @@ -269,18 +362,7 @@ def put(self, ack): ) self.congestion_control.ack_received(sample_rtt, self.env.now) - resent_pkt = self.sent_packets[ack.ack] - resent_pkt.time = self.env.now - self.congestion_control.rs.send_packet( - resent_pkt, - self.congestion_control.C, - self.max_ack - self.next_seq, - self.env.now, - ) - assert resent_pkt.delivered_time > 0 - # self.congestion_control.next_departure_time = self.env.now - # if(self.congestion_control.pacing_rate > 0): - # self.congestion_control.next_departure_time += resent_pkt.size / self.congestion_control.pacing_rate + resent_pkt = self._retransmit_packet(ack.ack) if self.debug: print( @@ -301,30 +383,20 @@ def put(self, ack): self.env.now, self.packet_in_flight ) - temp_pkt = copy.copy(ack) - temp_pkt.size = self.mss - self.congestion_control.rs.updaterate_sample( - temp_pkt, self.congestion_control.C, self.env.now - ) - - bbr_update = False - if ack.packet_id in self.sent_packets: - # temp_pkt = copy.copy(ack) - # temp_pkt.size = self.mss - # self.congestion_control..updaterate_sample(temp_pkt, self.congestion_control.C, self.env.now) - bbr_update = True - self.packet_in_flight -= self.sent_packets[ack.packet_id].size - self.sent_packets[ack.packet_id].delivered_time = 0 - self.sent_packets[ack.packet_id].self_lost = False - - for i in range(self.max_ack, ack.ack, self.mss): - if i in self.sent_packets.keys(): - if self.sent_packets[i].delivered_time: - self.packet_in_flight -= self.sent_packets[i].size - self.congestion_control.rs.updaterate_sample( - self.sent_packets[i], self.congestion_control.C, self.env.now - ) - del self.sent_packets[i] + acked_packet_ids = [] + for packet_id in sorted(self.sent_packets): + state = self._get_segment_state(packet_id) + if packet_id + state.size <= ack.ack: + acked_packet_ids.append(packet_id) + + bbr_update = bool(acked_packet_ids) + for packet_id in sorted(acked_packet_ids): + packet = self.sent_packets[packet_id] + if packet.delivered_time: + self.packet_in_flight -= packet.size + self.congestion_control.rs.updaterate_sample( + packet, self.congestion_control.C, self.env.now + ) self.congestion_control.rs.update_sample_group( self.congestion_control.C, sample_rtt @@ -357,10 +429,16 @@ def put(self, ack): if bbr_update: self.congestion_control.ack_received(sample_rtt, self.env.now) + for packet_id in sorted(acked_packet_ids): + del self.sent_packets[packet_id] + del self.segment_state[packet_id] + if self.max_ack == self.next_seq and self.timer is not None: self.timer.stop() del self.timer self.timer = None + elif acked_packet_ids: + self._restart_oldest_timer() self.congestion_control.C.is_cwnd_limited = False self.cwnd_available.put(True) diff --git a/ns/packet/packet.py b/ns/packet/packet.py index 303f102..6b1aa9f 100644 --- a/ns/packet/packet.py +++ b/ns/packet/packet.py @@ -19,7 +19,10 @@ class Packet: Parameters ---------- time: float - the time when the packet is generated. + the time when the packet is generated. For TCP data packets, transport + code treats this as the original first-transmit timestamp used by sinks + for end-to-end latency accounting. Retransmit-attempt timing belongs to + sender-owned state, not to later mutation of this field. size: float the size of the packet in bytes packet_id: int diff --git a/ns/packet/tcp_generator.py b/ns/packet/tcp_generator.py index fa43217..134d904 100644 --- a/ns/packet/tcp_generator.py +++ b/ns/packet/tcp_generator.py @@ -3,12 +3,24 @@ for various congestion control mechanisms. """ +from dataclasses import dataclass + import simpy from ns.packet.packet import Packet from ns.utils.timer import Timer +@dataclass +class SegmentState: + seq: int + size: int + first_tx_time: float + last_tx_time: float + retransmit_count: int = 0 + timer: Timer = None + + class TCPPacketGenerator: """Generates packets with a simulated TCP protocol. @@ -48,17 +60,97 @@ def __init__(self, env, flow, cc, element_id=None, debug=False): self.smoothed_rtt = 0.0 # the retransmission timeout self.rto = 1.0 + # the most recent RTT sample that was accepted as unambiguous + self.last_rtt_sample = 0.0 # whether or not space in the congestion window is available self.cwnd_available = simpy.Store(env) - # the timers, one for each in-flight packets (segments) sent + # Timers are keyed by the logical segment start sequence number. + # Follow-on rewrite steps keep retransmission state in sender-owned + # segment metadata rather than in mutable Packet objects. self.timers = {} - # the in-flight packets (segments) + # In-flight data is currently keyed by segment start sequence number. + # ACK cleanup must eventually use segment-end semantics + # (seq + size <= ack), and each retransmission attempt must emit a + # fresh Packet while preserving the original Packet.time. self.sent_packets = {} + self.segment_state = {} self.action = env.process(self.run()) self.debug = debug + def _get_segment_state(self, packet_id): + """Return sender-owned logical state for a segment, creating it lazily.""" + state = self.segment_state.get(packet_id) + if state is not None: + return state + + packet = self.sent_packets[packet_id] + state = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=packet.time, + timer=self.timers.get(packet_id), + ) + self.segment_state[packet_id] = state + return state + + def _build_packet(self, state): + """Create a fresh packet attempt from sender-owned segment state.""" + return Packet( + state.first_tx_time, + state.size, + state.seq, + src=self.flow.src, + flow_id=self.flow.fid, + ) + + def _send_new_packet(self, packet_size): + """Send a fresh data packet and register sender-owned state for it.""" + packet = Packet( + self.env.now, + packet_size, + self.next_seq, + src=self.flow.src, + flow_id=self.flow.fid, + ) + + self.sent_packets[packet.packet_id] = packet + self.segment_state[packet.packet_id] = SegmentState( + seq=packet.packet_id, + size=packet.size, + first_tx_time=packet.time, + last_tx_time=packet.time, + ) + + if self.debug: + print( + f"TCPPacketGenerator {self.element_id} sent packet {packet.packet_id} " + f"with size {packet.size}, flow_id {packet.flow_id} at " + f"time {self.env.now:.4f}." + ) + + self.out.put(packet) + + self.next_seq += packet.size + timer = Timer( + self.env, + timer_id=packet.packet_id, + timeout_callback=self.timeout_callback, + rto=self.rto, + ) + self.timers[packet.packet_id] = timer + self.segment_state[packet.packet_id].timer = timer + + if self.debug: + print( + f"TCPPacketGenerator {self.element_id} is setting a timer " + f"for packet {packet.packet_id} with an RTO of {self.rto:.4f}." + ) + + return packet + def run(self): """The generator function used in simulations.""" if self.flow.start_time: @@ -89,42 +181,13 @@ def run(self): packet_size = self.mss self.send_buffer += packet_size - # the sender can transmit up to the size of the congestion window - if self.next_seq + self.mss <= min( - self.send_buffer, self.last_ack + self.congestion_control.cwnd - ): - packet = Packet( - self.env.now, - self.mss, - self.next_seq, - src=self.flow.src, - flow_id=self.flow.fid, - ) - - self.sent_packets[packet.packet_id] = packet - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} sent packet {packet.packet_id} " - f"with size {packet.size}, flow_id {packet.flow_id} at " - f"time {self.env.now:.4f}." - ) - - self.out.put(packet) - - self.next_seq += packet.size - self.timers[packet.packet_id] = Timer( - self.env, - timer_id=packet.packet_id, - timeout_callback=self.timeout_callback, - rto=self.rto, - ) - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} is setting a timer " - f"for packet {packet.packet_id} with an RTO of {self.rto:.4f}." - ) + # The sender can transmit any positive byte count up to the smaller + # of the buffered data and the available congestion window. + send_limit = min(self.send_buffer, self.last_ack + self.congestion_control.cwnd) + available_bytes = send_limit - self.next_seq + if available_bytes > 0: + packet_size = min(self.mss, available_bytes) + self._send_new_packet(packet_size) else: # No further space in the congestion window to transmit packets # at this time, waiting for acknowledgements @@ -141,7 +204,11 @@ def timeout_callback(self, packet_id=0): self.congestion_control.timer_expired() # retransmitting the segment - resent_pkt = self.sent_packets[packet_id] + state = self._get_segment_state(packet_id) + state.retransmit_count += 1 + state.last_tx_time = self.env.now + resent_pkt = self._build_packet(state) + self.sent_packets[packet_id] = resent_pkt self.out.put(resent_pkt) if self.debug: @@ -152,11 +219,14 @@ def timeout_callback(self, packet_id=0): # starting a new timer for this segment and doubling the retransmission timeout revised_rto = self.timers[packet_id].rto * 2 - self.timers[packet_id].restart(revised_rto) + state.timer = self.timers[packet_id] + state.timer.restart(revised_rto) def put(self, ack): """On receiving an acknowledgment packet.""" assert ack.flow_id >= 10000 # the received packet must be an ack + previous_ack = self.last_ack + previous_segment = self.segment_state.get(previous_ack) if ack.ack == self.last_ack: self.dupack += 1 @@ -170,8 +240,15 @@ def put(self, ack): if self.dupack == 3: self.congestion_control.consecutive_dupacks_received() - resent_pkt = self.sent_packets[ack.ack] - resent_pkt.time = self.env.now + state = self.segment_state.get(ack.ack) + if state is None and ack.ack in self.sent_packets: + state = self._get_segment_state(ack.ack) + if state is None: + return + state.retransmit_count += 1 + state.last_tx_time = self.env.now + resent_pkt = self._build_packet(state) + self.sent_packets[ack.ack] = resent_pkt if self.debug: print( f"TCPPacketGenerator {self.element_id} is resending packet " @@ -185,44 +262,28 @@ def put(self, ack): self.congestion_control.more_dupacks_received() if self.last_ack + self.congestion_control.cwnd >= ack.ack: - packet = Packet( - self.env.now, - self.mss, - self.next_seq, - src=self.flow.src, - flow_id=self.flow.fid, - ) - - self.sent_packets[packet.packet_id] = packet - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} sent packet " - f"{packet.packet_id} with size {packet.size}, flow_id " - f"{packet.flow_id} at time {self.env.now:.4f} as dupack > 3." - ) - - self.out.put(packet) - - self.next_seq += packet.size - self.timers[packet.packet_id] = Timer( - self.env, - timer_id=packet.packet_id, - timeout_callback=self.timeout_callback, - rto=self.rto, + send_limit = min( + self.send_buffer, + self.last_ack + self.congestion_control.cwnd, ) - - if self.debug: - print( - f"TCPPacketGenerator {self.element_id} is setting a timer for " - f"packet {packet.packet_id} with an RTO of {self.rto:.4f}." - ) + available_bytes = send_limit - self.next_seq + if available_bytes > 0: + self._send_new_packet(min(self.mss, available_bytes)) return if self.dupack == 0: - # new ack received, update the RTT estimate and the retransmission timout - sample_rtt = self.env.now - ack.time + # Only accept RTT samples for exactly one un-retransmitted segment. + eligible_rtt_sample = False + sample_rtt = 0.0 + if ( + ack.ack > previous_ack + and previous_segment is not None + and ack.ack == previous_ack + previous_segment.size + and previous_segment.retransmit_count == 0 + ): + eligible_rtt_sample = True + sample_rtt = self.env.now - previous_segment.first_tx_time # Authoritative sources for RTO calculation @@ -236,25 +297,35 @@ def put(self, ack): alpha = 0.125 beta = 0.25 - # calculates the deviation (RTTVAR) of the RTT to account for - # variations in the network - if self.rtt_var == 0.0: - self.rtt_var = sample_rtt / 2.0 - else: - deviation = self.smoothed_rtt - sample_rtt - self.rtt_var = (1.0 - beta) * self.rtt_var + beta * abs(deviation) + if eligible_rtt_sample: + # calculates the deviation (RTTVAR) of the RTT to account for + # variations in the network + if self.rtt_var == 0.0: + self.rtt_var = sample_rtt / 2.0 + else: + deviation = self.smoothed_rtt - sample_rtt + self.rtt_var = (1.0 - beta) * self.rtt_var + beta * abs( + deviation + ) - # computes a smoothed round-trip time (SRTT) - if self.smoothed_rtt == 0.0: - self.smoothed_rtt = sample_rtt - else: - self.smoothed_rtt = ( - 1.0 - alpha - ) * self.smoothed_rtt + alpha * sample_rtt - self.rto = max(1.0, self.smoothed_rtt + 4.0 * self.rtt_var) + # computes a smoothed round-trip time (SRTT) + if self.smoothed_rtt == 0.0: + self.smoothed_rtt = sample_rtt + else: + self.smoothed_rtt = ( + 1.0 - alpha + ) * self.smoothed_rtt + alpha * sample_rtt + self.rto = max(1.0, self.smoothed_rtt + 4.0 * self.rtt_var) + self.last_rtt_sample = sample_rtt self.last_ack = ack.ack - self.congestion_control.ack_received(sample_rtt, self.env.now) + if eligible_rtt_sample: + rtt_for_cc = sample_rtt + else: + rtt_for_cc = self.smoothed_rtt + if rtt_for_cc == 0.0: + rtt_for_cc = self.last_rtt_sample + self.congestion_control.ack_received(rtt_for_cc, self.env.now) if self.debug: print( @@ -269,19 +340,20 @@ def put(self, ack): # this acknowledgment should acknowledge all the intermediate # segments sent between the lost packet and the receipt of the # first duplicate ACK, if any - acked_packets = [ - packet_id - for packet_id, _ in self.sent_packets.items() - if packet_id < ack.ack - ] - for packet_id in acked_packets: + acked_packets = [] + for packet_id, state in self.segment_state.items(): + if packet_id + state.size <= ack.ack: + acked_packets.append(packet_id) + + for packet_id in sorted(acked_packets): if self.debug: print( f"TCPPacketGenerator {self.element_id} stopped timer " f"{packet_id} at time {self.env.now:.4f}." ) - self.timers[packet_id].stop() + self.segment_state[packet_id].timer.stop() del self.timers[packet_id] del self.sent_packets[packet_id] + del self.segment_state[packet_id] self.cwnd_available.put(True) diff --git a/ns/packet/tcp_sink.py b/ns/packet/tcp_sink.py index 4748a7c..40372c2 100644 --- a/ns/packet/tcp_sink.py +++ b/ns/packet/tcp_sink.py @@ -26,7 +26,8 @@ def __init__( env, rec_arrivals, absolute_arrivals, rec_waits, rec_flow_ids, debug ) self.recv_buffer = [] - # the next sequence number expected to be received + # The cumulative ACK frontier (RCV.NXT). Later receiver fixes must + # derive this from the contiguous prefix only and never from a gap. self.next_seq_expected = 0 self.out = None self.ele_id = element_id @@ -54,14 +55,22 @@ def put(self, packet): super().put(packet) self.packet_arrived(packet) - - self.next_seq_expected = self.recv_buffer[0][1] + frontier = self.next_seq_expected + for start, end in self.recv_buffer: + if start > frontier: + break + frontier = max(frontier, end) + self.next_seq_expected = frontier # a TCP sink needs to send ack packets back to the TCP packet generator assert self.out is not None acknowledgment = Packet( - packet.time, # used for calculating RTT at the sender + # Preserve the data packet's original first-transmit timestamp. + # Sink-side wait accounting depends on Packet.time remaining stable + # across retransmissions; sender RTT/RTO work will move attempt + # timing onto sender-owned segment metadata instead. + packet.time, size=40, # default size of the ack packet packet_id=packet.packet_id, flow_id=packet.flow_id + 10000, diff --git a/tests/flow/test_tcp_integration.py b/tests/flow/test_tcp_integration.py new file mode 100644 index 0000000..4ca29b7 --- /dev/null +++ b/tests/flow/test_tcp_integration.py @@ -0,0 +1,55 @@ +import pytest + +from ns.flow.cc import TCPReno +from ns.flow.flow import Flow +from ns.packet.tcp_generator import TCPPacketGenerator +from ns.packet.tcp_sink import TCPSink +from ns.port.wire import Wire + +simpy = pytest.importorskip("simpy") + + +def const_delay(value): + return lambda: value + + +class DropFirstTransmission: + def __init__(self, packet_id): + self.packet_id = packet_id + self.out = None + self.dropped = False + + def put(self, packet): + if packet.packet_id == self.packet_id and not self.dropped: + self.dropped = True + return + + self.out.put(packet) + + +def test_tcp_end_to_end_retransmit_latency_uses_first_transmit_time(): + env = simpy.Environment() + flow = Flow(fid=7, src="src", dst="dst", size=512, finish_time=5) + sender = TCPPacketGenerator( + env, + flow=flow, + cc=TCPReno(), + element_id="tcp-flow", + debug=False, + ) + receiver = TCPSink(env, rec_waits=True, debug=False) + down = Wire(env, const_delay(0.1)) + up = Wire(env, const_delay(0.1)) + drop_first = DropFirstTransmission(packet_id=0) + + sender.out = down + down.out = drop_first + drop_first.out = receiver + receiver.out = up + up.out = sender + + env.run(until=2.0) + + assert receiver.packet_times[flow.fid] == [0.0] + assert receiver.waits[flow.fid] == [pytest.approx(1.1)] + assert sender.last_ack == 512 diff --git a/tests/packet/test_bbr_generator.py b/tests/packet/test_bbr_generator.py new file mode 100644 index 0000000..dc09093 --- /dev/null +++ b/tests/packet/test_bbr_generator.py @@ -0,0 +1,361 @@ +import pytest + +from ns.flow.flow import AppType, Flow +from ns.packet.bbr_generator import BBRPacketGenerator +from ns.packet.packet import Packet + +simpy = pytest.importorskip("simpy") + + +class CaptureSink: + def __init__(self, env): + self.env = env + self.packets = [] + self.waits = [] + + def put(self, packet): + self.packets.append(packet) + self.waits.append(self.env.now - packet.time) + + +class DummyTimer: + def __init__(self, rto=1.0): + self.rto = rto + self.restart_calls = [] + self.stop_calls = 0 + + def stop(self): + self.stop_calls += 1 + + def restart(self, revised_rto, start_time=0): + self.rto = revised_rto + self.restart_calls.append((revised_rto, start_time)) + + +class DummyCC: + def __init__(self, cwnd=4096): + self.cwnd = cwnd + self.pacing_rate = 0 + self.next_departure_time = 0 + self.calls = [] + + def timer_expired(self, packet=None): + self.calls.append(("timer_expired", packet)) + + def dupack_over(self): + self.calls.append(("dupack_over",)) + + def consecutive_dupacks_received(self, packet=None): + self.calls.append(("consecutive_dupacks_received", packet)) + + def more_dupacks_received(self, packet=None): + self.calls.append(("more_dupacks_received", packet)) + + def set_before_control(self, current_time, packet_in_flight): + self.calls.append(("set_before_control", current_time, packet_in_flight)) + + def ack_received(self, rtt, current_time): + self.calls.append(("ack_received", rtt, current_time)) + + +def make_flow(size, finish_time=1.0): + return Flow( + fid=1, + src="src", + dst="dst", + size=size, + finish_time=finish_time, + typ=AppType.BULK_TRANSFER, + ) + + +def make_sender(env, size, cwnd=4096, finish_time=1.0): + sender = BBRPacketGenerator( + env, + make_flow(size, finish_time), + DummyCC(cwnd), + debug=False, + ) + sink = CaptureSink(env) + sender.out = sink + return sender, sink + + +def make_ack( + packet_id, + *, + ack, + time, + delivered_time, + first_sent_time, + delivered=0, + lost=0, + is_app_limited=False, + flow_id=10001, +): + ack_packet = Packet( + time=time, + size=40, + packet_id=packet_id, + flow_id=flow_id, + src="dst", + dst="src", + ) + ack_packet.ack = ack + ack_packet.delivered_time = delivered_time + ack_packet.first_sent_time = first_sent_time + ack_packet.delivered = delivered + ack_packet.lost = lost + ack_packet.is_app_limited = is_app_limited + return ack_packet + + +def test_bbr_sender_sends_buffered_sub_mss_data(): + env = simpy.Environment() + sender, sink = make_sender(env, size=300) + + env.run(until=0.01) + + assert [packet.size for packet in sink.packets] == [300] + + +def test_bbr_sender_sends_final_tail_segment(): + env = simpy.Environment() + sender, sink = make_sender(env, size=768) + + env.run(until=0.01) + + assert [packet.size for packet in sink.packets] == [512, 256] + + +def test_bbr_sender_fast_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + original = Packet( + time=1.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + original.delivered_time = 1.0 + original.first_sent_time = 0.0 + original.delivered = 0 + original.self_lost = False + sender.sent_packets[0] = original + sender.max_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + sender.dupack = 1 + sender.timer = DummyTimer(rto=1.0) + + sender.put( + make_ack( + 0, + ack=0, + time=1.0, + delivered_time=1.0, + first_sent_time=0.0, + ) + ) + + assert any( + call[0] == "consecutive_dupacks_received" + for call in sender.congestion_control.calls + ) + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(4.0) + assert original.time == 1.0 + + +def test_bbr_sender_timeout_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + original = Packet( + time=1.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + original.delivered_time = 1.0 + original.first_sent_time = 0.0 + original.delivered = 0 + original.self_lost = False + sender.sent_packets[0] = original + sender.max_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + sender.timer = DummyTimer(rto=1.0) + + sender.timeout_callback(0) + + assert sender.congestion_control.calls[0][0] == "set_before_control" + assert sender.congestion_control.calls[1][0] == "timer_expired" + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(4.0) + assert original.time == 1.0 + + +def test_bbr_sender_keeps_timeout_pointer_on_oldest_outstanding_segment(): + env = simpy.Environment() + sender, sink = make_sender(env, size=1024) + + env.run(until=0.01) + + sender.timer = DummyTimer(rto=sender.rto) + + assert sender.to_pkt_id == 0 + + sender.timeout_callback(0) + + assert sink.packets[-1].packet_id == 0 + assert sender.to_pkt_id == 0 + + +def test_bbr_sender_cleans_up_short_segments_with_segment_end_ack(): + env = simpy.Environment(initial_time=5) + sender, _ = make_sender(env, size=1024) + first = Packet( + time=0.0, + size=256, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + second = Packet( + time=0.0, + size=256, + packet_id=256, + flow_id=1, + src="src", + dst="dst", + ) + for packet in (first, second): + packet.delivered_time = 1.0 + packet.first_sent_time = 0.0 + packet.delivered = packet.packet_id + packet.self_lost = False + + sender.sent_packets = {0: first, 256: second} + sender.max_ack = 0 + sender.last_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + + sender.put( + make_ack( + 0, + ack=512, + time=0.0, + delivered_time=1.0, + first_sent_time=0.0, + ) + ) + + assert sender.sent_packets == {} + assert sender.packet_in_flight == 0 + assert sender.max_ack == 512 + + +def test_bbr_sender_rate_sample_stays_stable_across_retransmit_timing_changes(): + def run_scenario(include_timeout): + env = simpy.Environment(initial_time=5) + sender, _ = make_sender(env, size=1024) + original = Packet( + time=0.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + original.delivered_time = 1.0 + original.first_sent_time = 0.0 + original.delivered = 0 + original.self_lost = False + sender.sent_packets[0] = original + sender.max_ack = 0 + sender.last_ack = 0 + sender.next_seq = 512 + sender.packet_in_flight = 512 + sender.timer = DummyTimer(rto=1.0) + + if include_timeout: + sender.timeout_callback(0) + + sender.put( + make_ack( + 0, + ack=512, + # ACKs keep the segment's original transmit timestamp even + # after a retransmission attempt. + time=0.0, + delivered_time=1.0, + first_sent_time=0.0, + ) + ) + ack_calls = [ + call for call in sender.congestion_control.calls if call[0] == "ack_received" + ] + return ( + ack_calls[-1][1:], + sender.congestion_control.rs.send_elapsed, + sender.congestion_control.rs.ack_elapsed, + sender.congestion_control.rs.delivery_rate, + ) + + clean = run_scenario(include_timeout=False) + retransmitted = run_scenario(include_timeout=True) + + assert retransmitted == clean + + +def test_bbr_sender_samples_rtt_from_the_acked_packet_time(): + env = simpy.Environment(initial_time=8) + sender, _ = make_sender(env, size=1024) + first = Packet( + time=1.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + second = Packet( + time=3.0, + size=512, + packet_id=512, + flow_id=1, + src="src", + dst="dst", + ) + for packet in (first, second): + packet.delivered_time = 1.0 + packet.delivered = packet.packet_id + packet.first_sent_time = 1.0 + packet.self_lost = False + + sender.sent_packets = {0: first, 512: second} + sender.max_ack = 0 + sender.last_ack = 0 + sender.next_seq = 1024 + sender.packet_in_flight = 1024 + sender.timer = DummyTimer(rto=1.0) + + sender.put( + make_ack( + 512, + ack=1024, + time=3.0, + delivered_time=1.0, + first_sent_time=1.0, + ) + ) + + assert sender.congestion_control.calls[-1] == ("ack_received", 5.0, 8) + assert sender.rtt_estimate == pytest.approx(5.0) + assert sender.rto == pytest.approx(15.0) diff --git a/tests/packet/test_tcp_generator.py b/tests/packet/test_tcp_generator.py new file mode 100644 index 0000000..4114891 --- /dev/null +++ b/tests/packet/test_tcp_generator.py @@ -0,0 +1,215 @@ +import pytest + +simpy = pytest.importorskip("simpy") + +from ns.flow.flow import Flow +from ns.packet.packet import Packet +from ns.packet.tcp_generator import TCPPacketGenerator + + +class CaptureSink: + def __init__(self, env): + self.env = env + self.packets = [] + self.waits = [] + + def put(self, packet): + self.packets.append(packet) + self.waits.append(self.env.now - packet.time) + + +class DummyTimer: + def __init__(self, rto=1.0): + self.rto = rto + self.restart_calls = [] + self.stop_calls = 0 + + def stop(self): + self.stop_calls += 1 + + def restart(self, revised_rto): + self.rto = revised_rto + self.restart_calls.append(revised_rto) + + +class DummyCC: + def __init__(self, cwnd=4096): + self.cwnd = cwnd + self.consecutive_dupacks_calls = 0 + self.more_dupacks_calls = 0 + self.dupack_over_calls = 0 + self.timer_expired_calls = 0 + self.ack_received_calls = [] + + def timer_expired(self): + self.timer_expired_calls += 1 + + def dupack_over(self): + self.dupack_over_calls += 1 + + def consecutive_dupacks_received(self): + self.consecutive_dupacks_calls += 1 + + def more_dupacks_received(self): + self.more_dupacks_calls += 1 + + def ack_received(self, sample_rtt, current_time): + self.ack_received_calls.append((sample_rtt, current_time)) + + +def make_flow(size, finish_time=1.0): + return Flow(fid=1, src="src", dst="dst", size=size, finish_time=finish_time) + + +def make_sender(env, size, cwnd=4096, finish_time=1.0): + sender = TCPPacketGenerator(env, make_flow(size, finish_time), DummyCC(cwnd)) + sink = CaptureSink(env) + sender.out = sink + return sender, sink + + +def make_ack(seq, *, time=0.0, flow_id=10001): + ack = Packet( + time=time, + size=40, + packet_id=seq, + flow_id=flow_id, + src="dst", + dst="src", + ) + ack.ack = seq + return ack + + +def test_tcp_sender_sends_buffered_sub_mss_data(): + env = simpy.Environment() + sender, sink = make_sender(env, size=300) + + env.run(until=0.01) + + assert [packet.size for packet in sink.packets] == [300] + + +def test_tcp_sender_sends_final_tail_segment(): + env = simpy.Environment() + sender, sink = make_sender(env, size=768) + + env.run(until=0.01) + + assert [packet.size for packet in sink.packets] == [512, 256] + + +def test_tcp_sender_fast_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + original = Packet( + time=1.5, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = original + sender.last_ack = 0 + sender.dupack = 2 + sender.next_seq = 512 + + sender.put(make_ack(0, time=1.5)) + + assert sender.congestion_control.consecutive_dupacks_calls == 1 + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(3.5) + assert original.time == 1.5 + + +def test_tcp_sender_timeout_retransmit_emits_fresh_packet_without_resetting_timestamp(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + original = Packet( + time=1.25, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = original + sender.timers[0] = DummyTimer(rto=1.0) + + sender.timeout_callback(0) + + assert sender.congestion_control.timer_expired_calls == 1 + assert sink.packets[0] is not original + assert sink.waits[0] == pytest.approx(3.75) + assert original.time == 1.25 + + +def test_tcp_sender_keeps_partially_acknowledged_segment_until_segment_end(): + env = simpy.Environment(initial_time=5) + sender, _ = make_sender(env, size=1024) + partial = Packet( + time=0.0, + size=256, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = partial + sender.timers[0] = DummyTimer(rto=1.0) + + sender.put(make_ack(128, time=0.0)) + + assert 0 in sender.sent_packets + assert 0 in sender.timers + assert sender.timers[0].stop_calls == 0 + + +def test_tcp_sender_skips_rtt_update_for_ack_covering_retransmitted_data(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + original = Packet( + time=1.0, + size=512, + packet_id=0, + flow_id=1, + src="src", + dst="dst", + ) + sender.sent_packets[0] = original + sender.timers[0] = DummyTimer(rto=1.0) + sender.last_ack = 0 + sender.dupack = 2 + sender.next_seq = 512 + sender.smoothed_rtt = 1.0 + sender.rtt_var = 0.25 + sender.rto = 2.0 + sender.last_rtt_sample = 1.0 + + sender.put(make_ack(0, time=1.0)) + sender.put(make_ack(0, time=1.0)) + sender.put(make_ack(0, time=1.0)) + sink.packets.clear() + sink.waits.clear() + sender.congestion_control.ack_received_calls.clear() + before = (sender.smoothed_rtt, sender.rtt_var, sender.rto) + + sender.put(make_ack(512, time=1.2)) + + assert sender.congestion_control.ack_received_calls == [(1.0, 5)] + assert (sender.smoothed_rtt, sender.rtt_var, sender.rto) == before + assert sink.packets == [] + + +def test_tcp_sender_ignores_dupacks_for_unknown_segment_frontier(): + env = simpy.Environment(initial_time=5) + sender, sink = make_sender(env, size=1024) + sender.last_ack = 1024 + sender.dupack = 2 + sender.next_seq = 1536 + + sender.put(make_ack(1024, time=1.0)) + + assert sender.congestion_control.consecutive_dupacks_calls == 1 + assert sink.packets == [] diff --git a/tests/packet/test_tcp_sink.py b/tests/packet/test_tcp_sink.py new file mode 100644 index 0000000..efeaddb --- /dev/null +++ b/tests/packet/test_tcp_sink.py @@ -0,0 +1,73 @@ +import pytest + +from ns.packet.packet import Packet +from ns.packet.tcp_sink import TCPSink + +simpy = pytest.importorskip("simpy") + + +class CaptureSink: + def __init__(self): + self.packets = [] + + def put(self, packet): + self.packets.append(packet) + + +def make_data_packet(seq, size=512, time=0.0, flow_id=1): + return Packet( + time=time, + size=size, + packet_id=seq, + flow_id=flow_id, + src="src", + dst="dst", + ) + + +def make_sink(): + env = simpy.Environment() + sink = TCPSink(env, rec_waits=False, rec_arrivals=False, rec_flow_ids=False) + sink.out = CaptureSink() + return env, sink + + +def ack_history(sink): + return [packet.ack for packet in sink.out.packets] + + +def test_tcp_sink_pins_ack_for_out_of_order_and_duplicate_data(): + _, sink = make_sink() + + sink.put(make_data_packet(512)) + sink.put(make_data_packet(512)) + + assert ack_history(sink) == [0, 0] + assert sink.recv_buffer == [[512, 1024]] + assert sink.next_seq_expected == 0 + + +def test_tcp_sink_advances_ack_only_when_hole_is_filled(): + _, sink = make_sink() + + sink.put(make_data_packet(512)) + sink.put(make_data_packet(512)) + sink.put(make_data_packet(0)) + + assert ack_history(sink) == [0, 0, 1024] + assert sink.recv_buffer == [[0, 1024]] + assert sink.next_seq_expected == 1024 + + +def test_tcp_sink_merges_intervals_across_reorder_duplicate_and_fill(): + _, sink = make_sink() + + sink.put(make_data_packet(512)) + sink.put(make_data_packet(1536)) + sink.put(make_data_packet(0)) + sink.put(make_data_packet(512)) + sink.put(make_data_packet(1024)) + + assert ack_history(sink) == [0, 0, 1024, 1024, 2048] + assert sink.recv_buffer == [[0, 2048]] + assert sink.next_seq_expected == 2048