From 3a642d3c8b331381f4f2a1e0ee9e154cd45e7e29 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Thu, 18 Sep 2025 18:39:53 +0100 Subject: [PATCH 1/2] HTTP connection reset --- src/ahttpx/_parsers.py | 5 +++-- src/ahttpx/_pool.py | 8 ++++---- src/ahttpx/_server.py | 9 +++++---- src/httpx/_network.py | 12 +++++++++++- src/httpx/_parsers.py | 5 +++-- src/httpx/_pool.py | 8 ++++---- src/httpx/_server.py | 7 ++++--- tests/test_parsers.py | 10 +++++----- 8 files changed, 39 insertions(+), 25 deletions(-) diff --git a/src/ahttpx/_parsers.py b/src/ahttpx/_parsers.py index 8a52a56fdf..ad06fbc116 100644 --- a/src/ahttpx/_parsers.py +++ b/src/ahttpx/_parsers.py @@ -375,13 +375,13 @@ async def recv_body(self) -> bytes: self.recv_state = State.DONE return body - async def complete(self): + async def reset(self) -> bool: is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE is_keepalive = self.send_keep_alive and self.recv_keep_alive if not (is_fully_complete and is_keepalive): await self.close() - return + return False if self.mode == Mode.CLIENT: self.send_state = State.SEND_METHOD_LINE @@ -397,6 +397,7 @@ async def complete(self): self.send_keep_alive = True self.recv_keep_alive = True self.processing_1xx = False + return True async def close(self): if self.send_state != State.CLOSED: diff --git a/src/ahttpx/_pool.py b/src/ahttpx/_pool.py index f712cfac27..4a9f787b52 100644 --- a/src/ahttpx/_pool.py +++ b/src/ahttpx/_pool.py @@ -170,7 +170,7 @@ async def send(self, request: Request) -> Response: await self._send_head(request) await self._send_body(request) code, headers = await self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO... return Response(code, headers=headers, content=stream) # finally: @@ -235,9 +235,9 @@ async def _recv_head(self) -> tuple[int, Headers]: async def _recv_body(self) -> bytes: return await self._parser.recv_body() - # Request/response cycle complete... - async def _complete(self) -> None: - await self._parser.complete() + # Request/response cycle reset... + async def _reset(self) -> None: + await self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration async def _close(self) -> None: diff --git a/src/ahttpx/_server.py b/src/ahttpx/_server.py index a9103cc97f..02098cb2e6 100644 --- a/src/ahttpx/_server.py +++ b/src/ahttpx/_server.py @@ -33,7 +33,7 @@ async def handle_requests(self): try: while not self._parser.is_closed(): method, url, headers = await self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO: Handle endpoint exceptions async with Request(method, url, headers=headers, content=stream) as request: try: @@ -43,12 +43,13 @@ async def handle_requests(self): except Exception: logger.error("Internal Server Error", exc_info=True) content = Text("Internal Server Error") - err = Response(code=500, content=content) + err = Response(500, content=content) await self._send_head(err) await self._send_body(err) else: await self._send_head(response) await self._send_body(response) + await self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) @@ -88,8 +89,8 @@ async def _send_body(self, response: Response): await self._parser.send_body(b'') # Start it all over again... - async def _complete(self): - await self._parser.complete + async def _reset(self): + await self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration diff --git a/src/httpx/_network.py b/src/httpx/_network.py index 5ea9bb5472..a204d2068b 100644 --- a/src/httpx/_network.py +++ b/src/httpx/_network.py @@ -83,6 +83,9 @@ def close(self) -> None: self._is_closed = True self._socket.close() + def is_closed(self) -> bool: + return self._is_closed + def __repr__(self): description = "" description += " TLS" if self._is_tls else "" @@ -160,7 +163,7 @@ def __init__(self, listener: NetworkListener, handler: typing.Callable[[NetworkS self._max_workers = 5 self._executor = None self._thread = None - self._streams = list[NetworkStream] + self._streams: list[NetworkStream] = [] @property def host(self): @@ -177,11 +180,18 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.listener.close() + for stream in self._streams: + stream.close() self._executor.shutdown(wait=True) def _serve(self): while stream := self.listener.accept(): self._executor.submit(self._handler, stream) + self._streams = [ + stream for stream in self._streams + if not stream.is_closed() + ] + self._streams.append(stream) def _handler(self, stream): try: diff --git a/src/httpx/_parsers.py b/src/httpx/_parsers.py index 830fccd901..1d9999f56d 100644 --- a/src/httpx/_parsers.py +++ b/src/httpx/_parsers.py @@ -375,13 +375,13 @@ def recv_body(self) -> bytes: self.recv_state = State.DONE return body - def complete(self): + def reset(self) -> bool: is_fully_complete = self.send_state == State.DONE and self.recv_state == State.DONE is_keepalive = self.send_keep_alive and self.recv_keep_alive if not (is_fully_complete and is_keepalive): self.close() - return + return False if self.mode == Mode.CLIENT: self.send_state = State.SEND_METHOD_LINE @@ -397,6 +397,7 @@ def complete(self): self.send_keep_alive = True self.recv_keep_alive = True self.processing_1xx = False + return True def close(self): if self.send_state != State.CLOSED: diff --git a/src/httpx/_pool.py b/src/httpx/_pool.py index 7193f8d81c..b56f29eaad 100644 --- a/src/httpx/_pool.py +++ b/src/httpx/_pool.py @@ -170,7 +170,7 @@ def send(self, request: Request) -> Response: self._send_head(request) self._send_body(request) code, headers = self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO... return Response(code, headers=headers, content=stream) # finally: @@ -235,9 +235,9 @@ def _recv_head(self) -> tuple[int, Headers]: def _recv_body(self) -> bytes: return self._parser.recv_body() - # Request/response cycle complete... - def _complete(self) -> None: - self._parser.complete() + # Request/response cycle reset... + def _reset(self) -> None: + self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration def _close(self) -> None: diff --git a/src/httpx/_server.py b/src/httpx/_server.py index 95226d9914..6d1a53be01 100644 --- a/src/httpx/_server.py +++ b/src/httpx/_server.py @@ -33,7 +33,7 @@ def handle_requests(self): try: while not self._parser.is_closed(): method, url, headers = self._recv_head() - stream = HTTPStream(self._recv_body, self._complete) + stream = HTTPStream(self._recv_body, self._reset) # TODO: Handle endpoint exceptions with Request(method, url, headers=headers, content=stream) as request: try: @@ -49,6 +49,7 @@ def handle_requests(self): else: self._send_head(response) self._send_body(response) + self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) @@ -88,8 +89,8 @@ def _send_body(self, response: Response): self._parser.send_body(b'') # Start it all over again... - def _complete(self): - self._parser.complete + def _reset(self): + self._parser.reset() self._idle_expiry = time.monotonic() + self._keepalive_duration diff --git a/tests/test_parsers.py b/tests/test_parsers.py index e2a321e2a2..32b20deba4 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -67,7 +67,7 @@ def test_parser(): assert terminator == b'' assert not p.is_idle() - p.complete() + p.reset() assert p.is_idle() @@ -113,7 +113,7 @@ def test_parser_server(): ) assert not p.is_idle() - p.complete() + p.reset() assert p.is_idle() @@ -315,7 +315,7 @@ def test_parser_repr(): p.recv_body() assert repr(p) == "" - p.complete() + p.reset() assert repr(p) == "" @@ -554,7 +554,7 @@ def test_client_connection_close(): assert repr(p) == "" - p.complete() + p.reset() assert repr(p) == "" assert p.is_closed() @@ -591,7 +591,7 @@ def test_server_connection_close(): assert terminator == b"" assert repr(p) == "" - p.complete() + p.reset() assert repr(p) == "" From a711667febc881914bd117d6e86dfbf0b91c9f47 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Fri, 19 Sep 2025 11:56:15 +0100 Subject: [PATCH 2/2] Tidy up Connection resets --- src/ahttpx/_parsers.py | 7 +++++++ src/ahttpx/_server.py | 2 ++ src/httpx/_parsers.py | 7 +++++++ src/httpx/_server.py | 9 +++++++-- 4 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/ahttpx/_parsers.py b/src/ahttpx/_parsers.py index ad06fbc116..371b870201 100644 --- a/src/ahttpx/_parsers.py +++ b/src/ahttpx/_parsers.py @@ -405,6 +405,13 @@ async def close(self): self.recv_state = State.CLOSED await self.stream.close() + def is_keepalive(self) -> bool: + return ( + self.send_keep_alive and + self.recv_keep_alive and + self.send_state != State.CLOSED + ) + def is_idle(self) -> bool: return ( self.send_state == State.SEND_METHOD_LINE or diff --git a/src/ahttpx/_server.py b/src/ahttpx/_server.py index 02098cb2e6..d2dafe99f7 100644 --- a/src/ahttpx/_server.py +++ b/src/ahttpx/_server.py @@ -49,6 +49,8 @@ async def handle_requests(self): else: await self._send_head(response) await self._send_body(response) + if self._parser.is_keepalive(): + await stream.read() await self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) diff --git a/src/httpx/_parsers.py b/src/httpx/_parsers.py index 1d9999f56d..1e92f5347c 100644 --- a/src/httpx/_parsers.py +++ b/src/httpx/_parsers.py @@ -405,6 +405,13 @@ def close(self): self.recv_state = State.CLOSED self.stream.close() + def is_keepalive(self) -> bool: + return ( + self.send_keep_alive and + self.recv_keep_alive and + self.send_state != State.CLOSED + ) + def is_idle(self) -> bool: return ( self.send_state == State.SEND_METHOD_LINE or diff --git a/src/httpx/_server.py b/src/httpx/_server.py index 6d1a53be01..44ec3bff3e 100644 --- a/src/httpx/_server.py +++ b/src/httpx/_server.py @@ -43,12 +43,14 @@ def handle_requests(self): except Exception: logger.error("Internal Server Error", exc_info=True) content = Text("Internal Server Error") - err = Response(code=500, content=content) + err = Response(500, content=content) self._send_head(err) self._send_body(err) else: self._send_head(response) self._send_body(response) + if self._parser.is_keepalive(): + stream.read() self._reset() except Exception: logger.error("Internal Server Error", exc_info=True) @@ -100,7 +102,10 @@ def __init__(self, host, port): def wait(self): while(True): - sleep(1) + try: + sleep(1) + except KeyboardInterrupt: + break @contextlib.contextmanager