Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 130 additions & 80 deletions e2e/rust/tests/host_gateway_alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@

#![cfg(feature = "e2e")]

use std::io::{Read, Write};
use std::net::TcpListener;
use std::io::Write;
use std::process::Command;
use std::process::Stdio;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

use openshell_e2e::harness::binary::openshell_cmd;
use openshell_e2e::harness::port::find_free_port;
use openshell_e2e::harness::sandbox::SandboxGuard;
use tempfile::NamedTempFile;
use tokio::time::{interval, timeout};

const INFERENCE_PROVIDER_NAME: &str = "e2e-host-inference";
const TEST_SERVER_IMAGE: &str = "python:3.13-alpine";
static INFERENCE_ROUTE_LOCK: Mutex<()> = Mutex::new(());

async fn run_cli(args: &[&str]) -> Result<String, String> {
Expand All @@ -41,68 +43,118 @@ async fn run_cli(args: &[&str]) -> Result<String, String> {
Ok(combined)
}

fn spawn_server(
response_body: fn(&str) -> String,
) -> Result<(u16, thread::JoinHandle<()>), String> {
let listener = TcpListener::bind("0.0.0.0:0")
.map_err(|e| format!("bind echo server on 0.0.0.0:0: {e}"))?;
listener
.set_nonblocking(false)
.map_err(|e| format!("configure echo server blocking mode: {e}"))?;
let port = listener
.local_addr()
.map_err(|e| format!("read echo server address: {e}"))?
.port();

let handle = thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept echo request");
stream
.set_read_timeout(Some(Duration::from_secs(30)))
.expect("set read timeout");
stream
.set_write_timeout(Some(Duration::from_secs(30)))
.expect("set write timeout");

let mut request = Vec::new();
let mut buf = [0_u8; 1024];
loop {
let read = stream.read(&mut buf).expect("read echo request");
if read == 0 {
break;
}
request.extend_from_slice(&buf[..read]);
if request.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
struct DockerServer {
port: u16,
container_id: String,
}

impl DockerServer {
async fn start(response_body: &str) -> Result<Self, String> {
let port = find_free_port();
let script = r#"from http.server import BaseHTTPRequestHandler, HTTPServer
import os

BODY = os.environ["RESPONSE_BODY"].encode()

class Handler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(BODY)))
self.end_headers()
self.wfile.write(BODY)

def do_POST(self):
length = int(self.headers.get("Content-Length", "0"))
if length:
self.rfile.read(length)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(BODY)))
self.end_headers()
self.wfile.write(BODY)

def log_message(self, format, *args):
pass

HTTPServer(("0.0.0.0", 8000), Handler).serve_forever()
"#;

let output = Command::new("docker")
.args([
"run",
"--detach",
"--rm",
"-e",
&format!("RESPONSE_BODY={response_body}"),
"-p",
&format!("{port}:8000"),
TEST_SERVER_IMAGE,
"python3",
"-c",
script,
])
.output()
.map_err(|e| format!("start docker test server: {e}"))?;

let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();

if !output.status.success() {
return Err(format!(
"docker run failed (exit {:?}):\n{stderr}",
output.status.code()
));
}

let request_text = String::from_utf8_lossy(&request);
let body = response_body(&request_text);
let response = format!(
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
body.len(),
body
);
stream
.write_all(response.as_bytes())
.expect("write echo response");
stream.flush().expect("flush echo response");
});

Ok((port, handle))
}
let server = Self {
port,
container_id: stdout,
};
server.wait_until_ready().await?;
Ok(server)
}

fn spawn_echo_server() -> Result<(u16, thread::JoinHandle<()>), String> {
spawn_server(|request_text| {
let request_line = request_text.lines().next().unwrap_or_default();
format!(r#"{{"message":"hello-from-host","request_line":"{request_line}"}}"#)
})
async fn wait_until_ready(&self) -> Result<(), String> {
let container_id = self.container_id.clone();
timeout(Duration::from_secs(30), async move {
let mut tick = interval(Duration::from_millis(500));
loop {
tick.tick().await;
let output = Command::new("docker")
.args([
"exec",
&container_id,
"python3",
"-c",
"import urllib.request; urllib.request.urlopen('http://127.0.0.1:8000', timeout=1).read()",
])
.output();

match output {
Ok(result) if result.status.success() => return Ok(()),
Ok(_) | Err(_) => continue,
}
}
})
.await
.map_err(|_| {
format!(
"docker test server {} did not become ready within 30s",
self.container_id
)
})?
}
}

fn spawn_inference_server() -> Result<(u16, thread::JoinHandle<()>), String> {
spawn_server(|_| {
r#"{"id":"chatcmpl-test","object":"chat.completion","created":1,"model":"host-echo","choices":[{"index":0,"message":{"role":"assistant","content":"hello-from-host"},"finish_reason":"stop"}]}"#.to_string()
})
impl Drop for DockerServer {
fn drop(&mut self) {
let _ = Command::new("docker")
.args(["rm", "-f", &self.container_id])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
}

async fn provider_exists(name: &str) -> bool {
Expand Down Expand Up @@ -173,8 +225,10 @@ network_policies:

#[tokio::test]
async fn sandbox_reaches_host_openshell_internal_via_host_gateway_alias() {
let (port, server) = spawn_echo_server().expect("start host echo server");
let policy = write_policy(port).expect("write custom policy");
let server = DockerServer::start(r#"{"message":"hello-from-host"}"#)
.await
.expect("start host echo server");
let policy = write_policy(server.port).expect("write custom policy");
let policy_path = policy
.path()
.to_str()
Expand All @@ -188,29 +242,20 @@ async fn sandbox_reaches_host_openshell_internal_via_host_gateway_alias() {
"curl",
"--silent",
"--show-error",
&format!("http://host.openshell.internal:{port}/"),
"--max-time",
"15",
&format!("http://host.openshell.internal:{}/", server.port),
])
.await
.expect("sandbox create with host.openshell.internal echo request");

server
.join()
.expect("echo server thread should exit cleanly");

assert!(
guard
.create_output
.contains("\"message\":\"hello-from-host\""),
"expected sandbox to receive host echo response:\n{}",
guard.create_output
);
assert!(
guard
.create_output
.contains("\"request_line\":\"GET / HTTP/1.1\""),
"expected host echo server to receive sandbox HTTP request:\n{}",
guard.create_output
);
}

#[tokio::test]
Expand All @@ -227,7 +272,11 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() {
return;
}

let (port, server) = spawn_inference_server().expect("start host inference echo server");
let server = DockerServer::start(
r#"{"id":"chatcmpl-test","object":"chat.completion","created":1,"model":"host-echo","choices":[{"index":0,"message":{"role":"assistant","content":"hello-from-host"},"finish_reason":"stop"}]}"#,
)
.await
.expect("start host inference echo server");

if provider_exists(INFERENCE_PROVIDER_NAME).await {
delete_provider(INFERENCE_PROVIDER_NAME).await;
Expand All @@ -243,7 +292,10 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() {
"--credential",
"OPENAI_API_KEY=dummy",
"--config",
&format!("OPENAI_BASE_URL=http://host.openshell.internal:{port}/v1"),
&format!(
"OPENAI_BASE_URL=http://host.openshell.internal:{}/v1",
server.port
),
])
.await
.expect("create host-backed OpenAI provider");
Expand All @@ -265,17 +317,15 @@ async fn sandbox_inference_local_routes_to_host_openshell_internal() {
"curl",
"--silent",
"--show-error",
"--max-time",
"15",
"https://inference.local/v1/chat/completions",
"--json",
r#"{"messages":[{"role":"user","content":"hello"}]}"#,
])
.await
.expect("sandbox create with inference.local request");

server
.join()
.expect("inference echo server thread should exit cleanly");

assert!(
guard
.create_output
Expand Down
Loading