diff --git a/e2e/rust/tests/host_gateway_alias.rs b/e2e/rust/tests/host_gateway_alias.rs index e66ef77d..76d8be57 100644 --- a/e2e/rust/tests/host_gateway_alias.rs +++ b/e2e/rust/tests/host_gateway_alias.rs @@ -3,18 +3,20 @@ #![cfg(feature = "e2e")] -use std::io::{Read, Write}; -use std::net::TcpListener; +use std::io::Write; +use std::process::Command; use std::process::Stdio; use std::sync::Mutex; -use std::thread; use std::time::Duration; use openshell_e2e::harness::binary::openshell_cmd; +use openshell_e2e::harness::port::find_free_port; use openshell_e2e::harness::sandbox::SandboxGuard; use tempfile::NamedTempFile; +use tokio::time::{interval, timeout}; const INFERENCE_PROVIDER_NAME: &str = "e2e-host-inference"; +const TEST_SERVER_IMAGE: &str = "python:3.13-alpine"; static INFERENCE_ROUTE_LOCK: Mutex<()> = Mutex::new(()); async fn run_cli(args: &[&str]) -> Result { @@ -41,68 +43,118 @@ async fn run_cli(args: &[&str]) -> Result { Ok(combined) } -fn spawn_server( - response_body: fn(&str) -> String, -) -> Result<(u16, thread::JoinHandle<()>), String> { - let listener = TcpListener::bind("0.0.0.0:0") - .map_err(|e| format!("bind echo server on 0.0.0.0:0: {e}"))?; - listener - .set_nonblocking(false) - .map_err(|e| format!("configure echo server blocking mode: {e}"))?; - let port = listener - .local_addr() - .map_err(|e| format!("read echo server address: {e}"))? - .port(); - - let handle = thread::spawn(move || { - let (mut stream, _) = listener.accept().expect("accept echo request"); - stream - .set_read_timeout(Some(Duration::from_secs(30))) - .expect("set read timeout"); - stream - .set_write_timeout(Some(Duration::from_secs(30))) - .expect("set write timeout"); - - let mut request = Vec::new(); - let mut buf = [0_u8; 1024]; - loop { - let read = stream.read(&mut buf).expect("read echo request"); - if read == 0 { - break; - } - request.extend_from_slice(&buf[..read]); - if request.windows(4).any(|window| window == b"\r\n\r\n") { - break; - } +struct DockerServer { + port: u16, + container_id: String, +} + +impl DockerServer { + async fn start(response_body: &str) -> Result { + let port = find_free_port(); + let script = r#"from http.server import BaseHTTPRequestHandler, HTTPServer +import os + +BODY = os.environ["RESPONSE_BODY"].encode() + +class Handler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(BODY))) + self.end_headers() + self.wfile.write(BODY) + + def do_POST(self): + length = int(self.headers.get("Content-Length", "0")) + if length: + self.rfile.read(length) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(BODY))) + self.end_headers() + self.wfile.write(BODY) + + def log_message(self, format, *args): + pass + +HTTPServer(("0.0.0.0", 8000), Handler).serve_forever() +"#; + + let output = Command::new("docker") + .args([ + "run", + "--detach", + "--rm", + "-e", + &format!("RESPONSE_BODY={response_body}"), + "-p", + &format!("{port}:8000"), + TEST_SERVER_IMAGE, + "python3", + "-c", + script, + ]) + .output() + .map_err(|e| format!("start docker test server: {e}"))?; + + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + + if !output.status.success() { + return Err(format!( + "docker run failed (exit {:?}):\n{stderr}", + output.status.code() + )); } - let request_text = String::from_utf8_lossy(&request); - let body = response_body(&request_text); - let response = format!( - "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", - body.len(), - body - ); - stream - .write_all(response.as_bytes()) - .expect("write echo response"); - stream.flush().expect("flush echo response"); - }); - - Ok((port, handle)) -} + let server = Self { + port, + container_id: stdout, + }; + server.wait_until_ready().await?; + Ok(server) + } -fn spawn_echo_server() -> Result<(u16, thread::JoinHandle<()>), String> { - spawn_server(|request_text| { - let request_line = request_text.lines().next().unwrap_or_default(); - format!(r#"{{"message":"hello-from-host","request_line":"{request_line}"}}"#) - }) + async fn wait_until_ready(&self) -> Result<(), String> { + let container_id = self.container_id.clone(); + timeout(Duration::from_secs(30), async move { + let mut tick = interval(Duration::from_millis(500)); + loop { + tick.tick().await; + let output = Command::new("docker") + .args([ + "exec", + &container_id, + "python3", + "-c", + "import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000', timeout=1).read()", + ]) + .output(); + + match output { + Ok(result) if result.status.success() => return Ok(()), + Ok(_) | Err(_) => continue, + } + } + }) + .await + .map_err(|_| { + format!( + "docker test server {} did not become ready within 30s", + self.container_id + ) + })? + } } -fn spawn_inference_server() -> Result<(u16, thread::JoinHandle<()>), String> { - spawn_server(|_| { - r#"{"id":"chatcmpl-test","object":"chat.completion","created":1,"model":"host-echo","choices":[{"index":0,"message":{"role":"assistant","content":"hello-from-host"},"finish_reason":"stop"}]}"#.to_string() - }) +impl Drop for DockerServer { + fn drop(&mut self) { + let _ = Command::new("docker") + .args(["rm", "-f", &self.container_id]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); + } } async fn provider_exists(name: &str) -> bool { @@ -173,8 +225,10 @@ network_policies: #[tokio::test] async fn sandbox_reaches_host_openshell_internal_via_host_gateway_alias() { - let (port, server) = spawn_echo_server().expect("start host echo server"); - let policy = write_policy(port).expect("write custom policy"); + let server = DockerServer::start(r#"{"message":"hello-from-host"}"#) + .await + .expect("start host echo server"); + let policy = write_policy(server.port).expect("write custom policy"); let policy_path = policy .path() .to_str() @@ -188,15 +242,13 @@ async fn sandbox_reaches_host_openshell_internal_via_host_gateway_alias() { "curl", "--silent", "--show-error", - &format!("http://host.openshell.internal:{port}/"), + "--max-time", + "15", + &format!("http://host.openshell.internal:{}/", server.port), ]) .await .expect("sandbox create with host.openshell.internal echo request"); - server - .join() - .expect("echo server thread should exit cleanly"); - assert!( guard .create_output @@ -204,13 +256,6 @@ async fn sandbox_reaches_host_openshell_internal_via_host_gateway_alias() { "expected sandbox to receive host echo response:\n{}", guard.create_output ); - assert!( - guard - .create_output - .contains("\"request_line\":\"GET / HTTP/1.1\""), - "expected host echo server to receive sandbox HTTP request:\n{}", - guard.create_output - ); } #[tokio::test] @@ -227,7 +272,11 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() { return; } - let (port, server) = spawn_inference_server().expect("start host inference echo server"); + let server = DockerServer::start( + r#"{"id":"chatcmpl-test","object":"chat.completion","created":1,"model":"host-echo","choices":[{"index":0,"message":{"role":"assistant","content":"hello-from-host"},"finish_reason":"stop"}]}"#, + ) + .await + .expect("start host inference echo server"); if provider_exists(INFERENCE_PROVIDER_NAME).await { delete_provider(INFERENCE_PROVIDER_NAME).await; @@ -243,7 +292,10 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() { "--credential", "OPENAI_API_KEY=dummy", "--config", - &format!("OPENAI_BASE_URL=http://host.openshell.internal:{port}/v1"), + &format!( + "OPENAI_BASE_URL=http://host.openshell.internal:{}/v1", + server.port + ), ]) .await .expect("create host-backed OpenAI provider"); @@ -265,6 +317,8 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() { "curl", "--silent", "--show-error", + "--max-time", + "15", "https://inference.local/v1/chat/completions", "--json", r#"{"messages":[{"role":"user","content":"hello"}]}"#, @@ -272,10 +326,6 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() { .await .expect("sandbox create with inference.local request"); - server - .join() - .expect("inference echo server thread should exit cleanly"); - assert!( guard .create_output