diff --git a/README.md b/README.md index 4f336ab..6c6906b 100644 --- a/README.md +++ b/README.md @@ -545,7 +545,7 @@ Use the Docker Engine `/_ping` endpoint via HaRP’s ExApps HTTP frontend to con curl -fsS \ -H "harp-shared-key: " \ -H "docker-engine-port: 24000" \ - http://127.0.0.1:8780/exapps/app_api/v1.41/_ping + http://127.0.0.1:8780/exapps/app_api/v1.44/_ping ``` * `24000` is the **default** FRP remote port used by the HaRP container for the **built‑in/local** Docker Engine (enabled when `/var/run/docker.sock` is mounted). diff --git a/development/redeploy_host_k8s.sh b/development/redeploy_host_k8s.sh new file mode 100755 index 0000000..f8da05e --- /dev/null +++ b/development/redeploy_host_k8s.sh @@ -0,0 +1,88 @@ +#!/bin/sh +# SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +# Redeploy HaRP with Kubernetes backend for local development. +# +# Prerequisites: +# - kind cluster "nc-exapps" running (see docs/kubernetes-local-setup.md) +# - kubectl context set to kind-nc-exapps +# - Nextcloud Docker-Dev running with nginx proxy +# - nginx vhost configured to proxy /exapps/ to HaRP (see docs) + +set -e + +# ── Configuration ────────────────────────────────────────────────────── +KIND_CLUSTER="nc-exapps" +KIND_NODE="${KIND_CLUSTER}-control-plane" +K8S_CONTEXT="kind-${KIND_CLUSTER}" +K8S_NAMESPACE="nextcloud-exapps" +K8S_SA="harp-exapps" +NC_DOCKER_NETWORK="master_default" + +HP_SHARED_KEY="some_very_secure_password" +NC_INSTANCE_URL="http://nextcloud.local" +# ─────────────────────────────────────────────────────────────────────── + +echo "==> Obtaining K8s API server URL..." +K8S_API_SERVER=$(kubectl --context "$K8S_CONTEXT" config view --minify -o jsonpath='{.clusters[0].cluster.server}') +echo " API server: $K8S_API_SERVER" + +echo "==> Generating fresh bearer token for SA '$K8S_SA' (valid 1 year)..." +K8S_BEARER_TOKEN=$(kubectl --context "$K8S_CONTEXT" -n "$K8S_NAMESPACE" create token "$K8S_SA" --duration=8760h) +echo " Token generated (${#K8S_BEARER_TOKEN} chars)" + +# ── Ensure kind node can reach the Nextcloud Docker network ─────────── +echo "==> Connecting kind node '$KIND_NODE' to Docker network '$NC_DOCKER_NETWORK'..." +if docker network connect "$NC_DOCKER_NETWORK" "$KIND_NODE" 2>/dev/null; then + echo " Connected." +else + echo " Already connected (or network not found)." +fi + +# Detect the nginx proxy IP on NC_DOCKER_NETWORK for pod DNS resolution. +# Pods inside the kind cluster cannot resolve hostnames like "nextcloud.local" that only exist in the host's /etc/hosts. +# Try to inject hostAliases so that ExApp pods can reach Nextcloud. +echo "==> Detecting nginx proxy IP for host aliases..." +PROXY_IP=$(docker inspect master-proxy-1 \ + --format "{{(index .NetworkSettings.Networks \"$NC_DOCKER_NETWORK\").IPAddress}}" 2>/dev/null || true) +K8S_HOST_ALIASES="" +if [ -n "$PROXY_IP" ]; then + K8S_HOST_ALIASES="nextcloud.local:${PROXY_IP}" + echo " nextcloud.local -> $PROXY_IP" +else + echo " WARNING: Could not detect proxy IP. ExApp pods may not resolve nextcloud.local." +fi + +echo "==> Removing old HaRP container..." +docker container remove --force appapi-harp 2>/dev/null || true + +echo "==> Building HaRP image..." +docker build -t nextcloud-appapi-harp:local . + +echo "==> Starting HaRP container..." +docker run \ + -e HP_SHARED_KEY="$HP_SHARED_KEY" \ + -e NC_INSTANCE_URL="$NC_INSTANCE_URL" \ + -e HP_LOG_LEVEL="info" \ + -e HP_VERBOSE_START="1" \ + -e HP_K8S_ENABLED="true" \ + -e HP_K8S_API_SERVER="$K8S_API_SERVER" \ + -e HP_K8S_BEARER_TOKEN="$K8S_BEARER_TOKEN" \ + -e HP_K8S_NAMESPACE="$K8S_NAMESPACE" \ + -e HP_K8S_VERIFY_SSL="false" \ + -e HP_K8S_HOST_ALIASES="$K8S_HOST_ALIASES" \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v "$(pwd)/certs:/certs" \ + --name appapi-harp -h appapi-harp \ + --restart unless-stopped \ + --network=host \ + -d nextcloud-appapi-harp:local + +echo "==> HaRP container started. Waiting for health check..." +sleep 5 +if docker inspect appapi-harp --format '{{.State.Health.Status}}' 2>/dev/null | grep -q healthy; then + echo "==> HaRP is healthy!" +else + echo "==> HaRP still starting... check with: docker ps | grep harp" +fi diff --git a/haproxy_agent.py b/haproxy_agent.py index f0a35ca..9ea2636 100644 --- a/haproxy_agent.py +++ b/haproxy_agent.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import asyncio +import collections import contextlib import io import ipaddress @@ -12,6 +13,7 @@ import os import re import socket +import ssl import tarfile import time from base64 import b64encode @@ -31,8 +33,27 @@ SPOA_ADDRESS = os.environ.get("HP_SPOA_ADDRESS", "127.0.0.1:9600") SPOA_HOST, SPOA_PORT = SPOA_ADDRESS.rsplit(":", 1) SPOA_PORT = int(SPOA_PORT) +# Kubernetes environment variables +K8S_ENABLED = os.environ.get("HP_K8S_ENABLED", "false").lower() in {"1", "true", "yes"} +K8S_NAMESPACE = os.environ.get("HP_K8S_NAMESPACE", "nextcloud-exapps") +K8S_API_SERVER = os.environ.get("HP_K8S_API_SERVER") # e.g. https://kubernetes.default.svc +K8S_CA_FILE = os.environ.get("HP_K8S_CA_FILE", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") +K8S_TOKEN_ENV = os.environ.get("HP_K8S_BEARER_TOKEN") # static token from env (never rotated) +K8S_TOKEN_FILE = os.environ.get("HP_K8S_BEARER_TOKEN_FILE", "/var/run/secrets/kubernetes.io/serviceaccount/token") +K8S_VERIFY_SSL = os.environ.get("HP_K8S_VERIFY_SSL", "true").lower() != "false" +K8S_STORAGE_CLASS = os.environ.get("HP_K8S_STORAGE_CLASS", "") +K8S_DEFAULT_STORAGE_SIZE = os.environ.get("HP_K8S_DEFAULT_STORAGE_SIZE", "10Gi") +K8S_HOST_ALIASES_RAW = os.environ.get("HP_K8S_HOST_ALIASES", "") # "hostname:ip,hostname2:ip2" +if not K8S_API_SERVER and os.environ.get("KUBERNETES_SERVICE_HOST"): + host = os.environ["KUBERNETES_SERVICE_HOST"] + port = os.environ.get("KUBERNETES_SERVICE_PORT", "443") + K8S_API_SERVER = f"https://{host}:{port}" + +K8S_HTTP_TIMEOUT = aiohttp.ClientTimeout(total=60.0) +_k8s_session: aiohttp.ClientSession | None = None +K8S_NAME_MAX_LENGTH = 63 # Set up the logging configuration -LOG_LEVEL = os.environ["HP_LOG_LEVEL"].upper() +LOG_LEVEL = os.environ.get("HP_LOG_LEVEL", "INFO").upper() logging.basicConfig(level=LOG_LEVEL) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(level=LOG_LEVEL) @@ -59,7 +80,8 @@ LOGGER.error( "Invalid value for HP_TRUSTED_PROXY_IPS: %s. Client IP detection from headers is disabled. " "The X-Forwarded-For and X-Real-IP headers will not be respected. " - "This can lead to the outer proxy's IP being blocked during a bruteforce attempt instead of the actual client's IP.", + "This can lead to the outer proxy's IP being blocked " + "during a bruteforce attempt instead of the actual client's IP.", e, ) TRUSTED_PROXIES = [] @@ -105,20 +127,51 @@ class NcUser(BaseModel): access_level: AccessLevel = Field(..., description="ADMIN(2), USER(1), or PUBLIC(0)") +def _sanitize_k8s_name(raw: str) -> str: + """Convert an arbitrary string into a DNS-1123 compatible name for Kubernetes.""" + name = raw.lower().replace("_", "-") + name = re.sub(r"[^a-z0-9-]", "-", name) + name = re.sub(r"-+", "-", name).strip("-") + if not name: + name = "exapp" + if len(name) > K8S_NAME_MAX_LENGTH: + name = name[:K8S_NAME_MAX_LENGTH].rstrip("-") + return name + + class ExAppName(BaseModel): name: str = Field(..., description="ExApp name.") instance_id: str = Field("", description="Nextcloud instance ID.") + role_suffix: str = Field("", description="Role suffix, e.g. 'rp', 'idx'.") @computed_field @property def exapp_container_name(self) -> str: - return f"nc_app_{self.instance_id}_{self.name}" if self.instance_id else f"nc_app_{self.name}" + base = f"nc_app_{self.instance_id}_{self.name}" if self.instance_id else f"nc_app_{self.name}" + if self.role_suffix: + base = f"{base}_{self.role_suffix}" + return base @computed_field @property def exapp_container_volume(self) -> str: return f"{self.exapp_container_name}_data" + @computed_field + @property + def exapp_k8s_name(self) -> str: + """K8s Deployment name.""" + return _sanitize_k8s_name(self.exapp_container_name) + + @computed_field + @property + def exapp_k8s_volume_name(self) -> str: + """K8s PVC name.""" + base = _sanitize_k8s_name(self.exapp_container_volume) + if len(base) > K8S_NAME_MAX_LENGTH: + base = base[:K8S_NAME_MAX_LENGTH].rstrip("-") + return base + class CreateExAppMounts(BaseModel): source: str = Field(...) @@ -137,6 +190,17 @@ class CreateExAppPayload(ExAppName): mount_points: list[CreateExAppMounts] = Field([], description="List of mount points for the container.") resource_limits: dict[str, Any] = Field({}, description="Resource limits for the container.") + @model_validator(mode="before") + @classmethod + def accept_k8s_friendly_payload(cls, data: Any) -> Any: + """Map 'image' -> 'image_id' and default network_mode for K8s payloads.""" + if isinstance(data, dict): + if "image_id" not in data and "image" in data: + data = {**data, "image_id": data["image"]} # Allow 'image' instead of 'image_id' + if "network_mode" not in data: + data = {**data, "network_mode": "bridge"} # Default network_mode (used only for Docker) + return data + class RemoveExAppPayload(ExAppName): remove_data: bool = Field(False, description="Flag indicating whether the Docker ExApp volume should be deleted.") @@ -147,12 +211,38 @@ class InstallCertificatesPayload(ExAppName): install_frp_certs: bool = Field(True, description="Flag to control installation of FRP certificates.") +class ExposeExAppPayload(ExAppName): + port: int = Field(..., ge=1, le=65535, description="Port on which the ExApp listens inside the Pod/container.") + expose_type: Literal["nodeport", "clusterip", "loadbalancer", "manual"] = Field("nodeport") + upstream_host: str | None = Field(None, description="Host override. Required for expose_type=manual.") + upstream_port: int | None = Field(None, ge=1, le=65535, description="Port override (manual only).") + service_port: int | None = Field(None, ge=1, le=65535, description="Service port (defaults to payload.port).") + node_port: int | None = Field(None, ge=30000, le=32767) + external_traffic_policy: Literal["Cluster", "Local"] | None = Field(None) + load_balancer_ip: str | None = Field(None) + service_annotations: dict[str, str] = Field(default_factory=dict) + service_labels: dict[str, str] = Field(default_factory=dict) + wait_timeout_seconds: float = Field(60.0, ge=0, le=600) + wait_interval_seconds: float = Field(1.0, ge=0.1, le=10.0) + node_address_type: Literal["InternalIP", "ExternalIP"] = Field("InternalIP") + node_name: str | None = Field(None) + node_label_selector: str | None = Field(None) + + @model_validator(mode="after") + def validate_expose_payload(self) -> Self: + if self.expose_type == "manual" and not self.upstream_host: + raise ValueError("upstream_host is required when expose_type='manual'") + return self + + ############################################################################### # In-memory caches ############################################################################### -EXAPP_CACHE_LOCK = asyncio.Lock() EXAPP_CACHE: dict[str, ExApp] = {} +_EXAPP_INFLIGHT: dict[str, asyncio.Future[ExApp | None]] = {} +_EXAPP_NEGATIVE_CACHE: collections.OrderedDict[str, float] = collections.OrderedDict() +_NEGATIVE_CACHE_TTL = 15.0 # seconds to cache "ExApp not found" results SESSION_CACHE_LOCK = asyncio.Lock() SESSION_CACHE: dict[str, tuple[NcUser, float]] = {} # Stores NcUser and timestamp @@ -241,7 +331,7 @@ async def record_session(pass_cookie: str, nc_user: NcUser) -> None: now = time.time() async with SESSION_CACHE_LOCK: SESSION_CACHE[pass_cookie] = (nc_user, now) - LOGGER.error("Recorded session for cookie %s, User %s", pass_cookie, nc_user.user_id) + LOGGER.info("Recorded session for cookie %s, User %s", pass_cookie, nc_user.user_id) async def get_session(pass_cookie: str) -> NcUser | None: @@ -256,7 +346,7 @@ async def get_session(pass_cookie: str) -> NcUser | None: return nc_user # Session expired, remove it del SESSION_CACHE[pass_cookie] - LOGGER.error("Session for cookie %s expired", pass_cookie) + LOGGER.info("Session for cookie %s expired", pass_cookie) return None @@ -312,32 +402,34 @@ async def exapps_msg( if request_headers["harp-shared-key"] != SHARED_KEY: await record_ip_failure(client_ip) return reply.set_txn_var("bad_request", 1) - exapp_record = ExApp( - exapp_token="", - exapp_version=request_headers["ex-app-version"], - host=request_headers["ex-app-host"], - port=int(request_headers["ex-app-port"]), - ) authorization_app_api = request_headers["authorization-app-api"] + # Use unified fetch path (cache + singleflight + K8s resolution) + try: + exapp_record = await _get_or_fetch_exapp(exapp_id_lower) + except Exception: + exapp_record = None + if not exapp_record: + # ExApp not in Nextcloud yet (e.g. during /init) — use headers as fallback + exapp_record = ExApp( + exapp_token="", + exapp_version=request_headers["ex-app-version"], + host=request_headers["ex-app-host"], + port=int(request_headers["ex-app-port"]), + ) if not exapp_record: - async with EXAPP_CACHE_LOCK: - exapp_record = EXAPP_CACHE.get(exapp_id_lower) + try: + exapp_record = await _get_or_fetch_exapp(exapp_id_lower) if not exapp_record: - try: - exapp_record = await nc_get_exapp(exapp_id_lower) - if not exapp_record: - LOGGER.error("No such ExApp enabled: %s", exapp_id) - await record_ip_failure(client_ip_str) - return reply.set_txn_var("not_found", 1) - LOGGER.info("Received new ExApp record: %s", exapp_record) - EXAPP_CACHE[exapp_id_lower] = exapp_record - except ValidationError as e: - LOGGER.error("Invalid ExApp metadata from Nextcloud: %s", e) - return reply.set_txn_var("not_found", 1) - except Exception as e: - LOGGER.exception("Failed to fetch ExApp metadata from Nextcloud", exc_info=e) - return reply.set_txn_var("not_found", 1) + LOGGER.error("No such ExApp enabled: %s", exapp_id) + await record_ip_failure(client_ip_str) + return reply.set_txn_var("not_found", 1) + except ValidationError as e: + LOGGER.error("Invalid ExApp metadata from Nextcloud: %s", e) + return reply.set_txn_var("not_found", 1) + except Exception as e: + LOGGER.exception("Failed to fetch ExApp metadata from Nextcloud", exc_info=e) + return reply.set_txn_var("not_found", 1) route_allowed = False if authorization_app_api: @@ -475,13 +567,94 @@ async def nc_get_exapp(app_id: str) -> ExApp | None: EX_APP_URL, headers={"harp-shared-key": SHARED_KEY}, params={"appId": app_id} ) as resp: if not resp.ok: - if resp.status == 404: + if resp.status in (401, 404): + # 404 = ExApp not found; 401 = ExApp not found so key can't be + # validated against a daemon config (Nextcloud returns 401 for + # non-existent apps because it has no daemon to check the key). return None raise Exception("Failed to fetch ExApp metadata from Nextcloud.", await resp.text()) data = await resp.json() return ExApp.model_validate(data) +async def _fetch_exapp_record(exapp_id: str) -> ExApp | None: + """Fetch ExApp metadata from Nextcloud and optionally resolve K8s upstream. + + Performs network I/O (Nextcloud HTTP + K8s API). Must NEVER be called + under a lock. Returns a fully-populated ExApp ready for caching, or + None if the ExApp is not found/enabled in Nextcloud. + """ + exapp_record = await nc_get_exapp(exapp_id) + if not exapp_record: + return None + k8s_upstream = await _k8s_resolve_exapp_upstream(exapp_id) + if k8s_upstream: + exapp_record.host, exapp_record.port = k8s_upstream + exapp_record.resolved_host = "" + LOGGER.info("Resolved K8s upstream for '%s': %s:%d", exapp_id, *k8s_upstream) + return exapp_record + + +def _negative_cache_evict_expired() -> None: + """Remove expired entries from the front of the ordered negative cache.""" + now = time.monotonic() + while _EXAPP_NEGATIVE_CACHE: + _, expiry = next(iter(_EXAPP_NEGATIVE_CACHE.items())) + if now < expiry: + break + _EXAPP_NEGATIVE_CACHE.popitem(last=False) + + +async def _get_or_fetch_exapp(exapp_id: str) -> ExApp | None: + """Get ExApp from cache, or fetch with request coalescing. + + Fast path: returns cached record immediately. + Negative-cache path: if we recently confirmed this ExApp doesn't exist, + return None without network I/O. + On cache miss: if another coroutine is already fetching this ExApp, + awaits its result (singleflight). Otherwise initiates the fetch and + lets other coroutines await our result. + + Safety: between the _EXAPP_INFLIGHT.get() check and the dict insert + there is no ``await``, so no coroutine interleaving can occur in + single-threaded asyncio — only one coroutine wins the race. + """ + cached = EXAPP_CACHE.get(exapp_id) + if cached is not None: + return cached + + neg_expiry = _EXAPP_NEGATIVE_CACHE.get(exapp_id) + if neg_expiry is not None: + if time.monotonic() < neg_expiry: + return None + _EXAPP_NEGATIVE_CACHE.pop(exapp_id, None) + + inflight = _EXAPP_INFLIGHT.get(exapp_id) + if inflight is not None: + return await inflight + + loop = asyncio.get_running_loop() + fut: asyncio.Future[ExApp | None] = loop.create_future() + _EXAPP_INFLIGHT[exapp_id] = fut + + try: + exapp_record = await _fetch_exapp_record(exapp_id) + if exapp_record is not None: + LOGGER.info("Received new ExApp record: %s", exapp_record) + EXAPP_CACHE[exapp_id] = exapp_record + else: + _negative_cache_evict_expired() + _EXAPP_NEGATIVE_CACHE[exapp_id] = time.monotonic() + _NEGATIVE_CACHE_TTL + fut.set_result(exapp_record) + return exapp_record + except BaseException as exc: + if not fut.done(): + fut.set_exception(exc) + raise + finally: + _EXAPP_INFLIGHT.pop(exapp_id, None) + + async def nc_get_user(app_id: str, all_headers: dict[str, str]) -> NcUser | None: ext_headers = {k: v for k, v in all_headers.items() if k.lower() not in EXCLUDE_HEADERS_USER_INFO} LOGGER.debug("all_headers = %s\next_headers = %s", str(all_headers), str(ext_headers)) @@ -491,10 +664,11 @@ async def nc_get_user(app_id: str, all_headers: dict[str, str]) -> NcUser | None params={"appId": app_id}, ) as resp: if not resp.ok: - LOGGER.info("Failed to fetch ExApp metadata from Nextcloud.", await resp.text()) + error_text = await resp.text() + LOGGER.info("Failed to fetch user info from Nextcloud: %s", error_text) if resp.status // 100 == 4: return None - raise Exception("Failed to fetch ExApp metadata from Nextcloud.", await resp.text()) + raise Exception("Failed to fetch user info from Nextcloud.", error_text) data = await resp.json() return NcUser.model_validate(data) @@ -518,7 +692,21 @@ def resolve_ip(hostname: str) -> str: async def get_info(request: web.Request): - return web.json_response({"version": 0.3}) + k8s_status: dict[str, Any] = {"enabled": K8S_ENABLED} + if K8S_ENABLED: + k8s_status["api_server"] = K8S_API_SERVER or "" + try: + _ensure_k8s_configured() + status, _, _ = await _k8s_request("GET", "/api") + k8s_status["reachable"] = status == 200 + except Exception: + k8s_status["reachable"] = False + + return web.json_response({ + "version": 0.3, + "docker": True, + "kubernetes": k8s_status, + }) ############################################################################### @@ -528,18 +716,17 @@ async def get_info(request: web.Request): async def add_exapp(request: web.Request): data = await request.json() - # Overwrite if already exists - async with EXAPP_CACHE_LOCK: - try: - EXAPP_CACHE[request.match_info["app_id"].lower()] = ExApp.model_validate(data) - except ValidationError: - raise web.HTTPBadRequest() from None + app_id = request.match_info["app_id"].lower() + try: + EXAPP_CACHE[app_id] = ExApp.model_validate(data) + _EXAPP_NEGATIVE_CACHE.pop(app_id, None) + except ValidationError: + raise web.HTTPBadRequest() from None return web.HTTPNoContent() async def delete_exapp(request: web.Request): - async with EXAPP_CACHE_LOCK: - old = EXAPP_CACHE.pop(request.match_info["app_id"].lower(), None) + old = EXAPP_CACHE.pop(request.match_info["app_id"].lower(), None) if old is None: raise web.HTTPNotFound() return web.HTTPNoContent() @@ -1663,6 +1850,1013 @@ def _get_certificate_update_command(os_info_content: str | None) -> list[str] | return None +############################################################################### +# Kubernetes helpers functions +############################################################################### + + +async def _parse_json_payload(request: web.Request, model: type[BaseModel]) -> Any: + """Parse JSON body and validate against a Pydantic model.""" + try: + payload_dict = await request.json() + except json.JSONDecodeError: + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + return model.model_validate(payload_dict) + except ValidationError as e: + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + +def _k8s_error_msg(data: dict[str, Any] | None, text: str) -> str: + """Extract a human-readable message from a K8s API response.""" + if isinstance(data, dict): + return (data or {}).get("message", text) + return text + + +def _get_k8s_token() -> str | None: + """Get the Kubernetes bearer token. + + If HP_K8S_BEARER_TOKEN env var is set, returns the static value. + Otherwise re-reads the token file on every call so that kubelet-rotated + projected service account tokens (default ~1h lifetime) are picked up automatically. + """ + if K8S_TOKEN_ENV: + return K8S_TOKEN_ENV.strip() + if K8S_TOKEN_FILE and os.path.exists(K8S_TOKEN_FILE): + try: + with open(K8S_TOKEN_FILE, encoding="utf-8") as f: + token = f.read().strip() + if token: + return token + except Exception as e: + LOGGER.error("Failed to read Kubernetes token file '%s': %s", K8S_TOKEN_FILE, e) + LOGGER.error( + "Kubernetes bearer token not found. " + "Set HP_K8S_BEARER_TOKEN or HP_K8S_BEARER_TOKEN_FILE when HP_K8S_ENABLED=true." + ) + return None + + +def _get_k8s_ssl_context() -> ssl.SSLContext | bool: + """Return SSL context (or False to disable verification) for K8s API.""" + if not K8S_API_SERVER or not K8S_API_SERVER.startswith("https"): + return False + if not K8S_VERIFY_SSL: + return False + try: + cafile = K8S_CA_FILE if K8S_CA_FILE and os.path.exists(K8S_CA_FILE) else None + return ssl.create_default_context(cafile=cafile) + except Exception as e: + LOGGER.warning("Failed to create SSL context for Kubernetes API: %s", e) + return ssl.create_default_context() + + +def _ensure_k8s_configured() -> None: + if not K8S_ENABLED: + LOGGER.error("Kubernetes backend requested but HP_K8S_ENABLED is not true.") + raise web.HTTPServiceUnavailable(text="Kubernetes backend is disabled in HaRP.") + if not K8S_API_SERVER: + LOGGER.error("Kubernetes backend requested but HP_K8S_API_SERVER is not configured.") + raise web.HTTPServiceUnavailable(text="Kubernetes API server is not configured.") + if not _get_k8s_token(): + raise web.HTTPServiceUnavailable(text="Kubernetes token is not configured.") + + +def _get_k8s_session() -> aiohttp.ClientSession: + """Return a reusable aiohttp session for the Kubernetes API. + + Lazily creates a module-level singleton with connection pooling. + The session is reused across all K8s API calls, avoiding the overhead + of a new TCP+TLS handshake per request. + """ + global _k8s_session + if _k8s_session is None or _k8s_session.closed: + ssl_ctx = _get_k8s_ssl_context() + connector = aiohttp.TCPConnector(ssl=ssl_ctx) + _k8s_session = aiohttp.ClientSession( + timeout=K8S_HTTP_TIMEOUT, + connector=connector, + ) + return _k8s_session + + +async def _close_k8s_session(_app: web.Application = None) -> None: + """Close the K8s session on shutdown to avoid ResourceWarning.""" + global _k8s_session + if _k8s_session is not None and not _k8s_session.closed: + await _k8s_session.close() + _k8s_session = None + + +async def _k8s_request( + method: str, + path: str, + *, + query: dict[str, str] | None = None, + json_body: Any | None = None, + content_type: str | None = None, +) -> tuple[int, dict[str, Any] | None, str]: + """Low-level helper for talking to the Kubernetes API.""" + _ensure_k8s_configured() + token = _get_k8s_token() + headers: dict[str, str] = { + "Authorization": f"Bearer {token}", + "Accept": "application/json", + } + if json_body is not None: + headers["Content-Type"] = content_type or "application/json" + + url = f"{K8S_API_SERVER}{path}" + session = _get_k8s_session() + + try: + async with session.request(method.upper(), url, headers=headers, params=query, json=json_body) as resp: + text = await resp.text() + data: dict[str, Any] | None = None + if "application/json" in resp.headers.get("Content-Type", "") and text: + try: + data = json.loads(text) + except json.JSONDecodeError: + LOGGER.warning("Failed to parse JSON from Kubernetes API %s %s: %s", method, url, text[:200]) + return resp.status, data, text + except aiohttp.ClientError as e: + LOGGER.error("Error communicating with Kubernetes API (%s %s): %s", method, url, e) + raise web.HTTPServiceUnavailable(text="Error communicating with Kubernetes API") from e + + +def _k8s_parse_env(env_list: list[str]) -> list[dict[str, str]]: + """Convert ['KEY=VALUE', ...] to Kubernetes env entries.""" + result: list[dict[str, str]] = [] + for raw in env_list: + if not raw: + continue + if "=" in raw: + name, value = raw.split("=", 1) + result.append({"name": name, "value": value}) + else: + result.append({"name": raw, "value": ""}) # No '=', keep name and use empty value + return result + + +def _k8s_build_resources(resource_limits: dict[str, Any], compute_device: str = "cpu") -> dict[str, Any]: + """Convert resource_limits dict to K8s resources spec.""" + if not resource_limits and compute_device == "cpu": + return {} + limits: dict[str, str] = {} + requests: dict[str, str] = {} + + # Memory + mem_val = resource_limits.get("memory") + mem_str: str | None = None + if isinstance(mem_val, int) and mem_val > 0: + # bytes -> Mi (ceil) + mem_mi = (mem_val + (1024 * 1024 - 1)) // (1024 * 1024) + mem_str = f"{mem_mi}Mi" + elif isinstance(mem_val, str) and mem_val: + mem_str = mem_val # Already in K8s units, e.g. "512Mi" + + if mem_str: + limits["memory"] = mem_str + requests["memory"] = mem_str # conservative: same as limit + + # CPU + cpu_str: str | None = None + nano_cpus = resource_limits.get("nanoCPUs") + if isinstance(nano_cpus, int) and nano_cpus > 0: + milli = (nano_cpus * 1000 + 1_000_000_000 - 1) // 1_000_000_000 # 1e9 nanoCPUs = 1 CPU => millicores + milli = max(1, milli) + cpu_str = f"{milli}m" + else: + cpu_val = resource_limits.get("cpu") + if isinstance(cpu_val, str) and cpu_val: + cpu_str = cpu_val # Already in K8s units, e.g. "500m" + + if cpu_str: + limits["cpu"] = cpu_str + requests["cpu"] = cpu_str + + if compute_device == "cuda": + limits["nvidia.com/gpu"] = "1" + elif compute_device == "rocm": + limits["amd.com/gpu"] = "1" + + res: dict[str, Any] = {} + if limits: + res["limits"] = limits + if requests: + res["requests"] = requests + return res + + +def _k8s_parse_host_aliases() -> list[dict[str, Any]]: + """Parse HP_K8S_HOST_ALIASES into K8s hostAliases format.""" + if not K8S_HOST_ALIASES_RAW.strip(): + return [] + + ip_to_hosts: dict[str, list[str]] = {} + for entry in K8S_HOST_ALIASES_RAW.split(","): + entry = entry.strip() + if not entry or ":" not in entry: + continue + hostname, ip_addr = entry.rsplit(":", 1) + hostname = hostname.strip() + ip_addr = ip_addr.strip() + if hostname and ip_addr: + ip_to_hosts.setdefault(ip_addr, []).append(hostname) + + return [{"ip": ip, "hostnames": hosts} for ip, hosts in ip_to_hosts.items()] + + +async def _k8s_ensure_coredns_host_aliases() -> None: + """Patch CoreDNS to resolve HP_K8S_HOST_ALIASES cluster-wide.""" + if not K8S_ENABLED or not K8S_HOST_ALIASES_RAW.strip(): + return + + host_aliases = _k8s_parse_host_aliases() + if not host_aliases: + return + + LOGGER.info("Ensuring CoreDNS resolves host aliases: %s", K8S_HOST_ALIASES_RAW) + + try: + status, data, _text = await _k8s_request( + "GET", "/api/v1/namespaces/kube-system/configmaps/coredns", + ) + if status != 200 or not data: + LOGGER.warning("Could not read CoreDNS ConfigMap (HTTP %d), skipping.", status) + return + + corefile = data.get("data", {}).get("Corefile", "") + if not corefile: + LOGGER.warning("CoreDNS ConfigMap has no Corefile entry, skipping.") + return + + hosts_lines: list[str] = [] + for alias in host_aliases: + for hostname in alias["hostnames"]: + hosts_lines.append(f" {alias['ip']} {hostname}") + hosts_block = "hosts {\n" + "\n".join(hosts_lines) + "\n fallthrough\n }" + + hosts_re = re.compile(r"hosts\s*\{[^}]*\}") + if hosts_re.search(corefile): + new_corefile = hosts_re.sub(hosts_block, corefile, count=1) + elif "forward ." in corefile: + new_corefile = corefile.replace("forward .", f"{hosts_block}\n forward .", 1) + else: + LOGGER.warning( + "CoreDNS Corefile has no 'hosts' block and no 'forward' directive, cannot patch." + ) + return + + if new_corefile == corefile: + LOGGER.info("CoreDNS already has correct host aliases, no patch needed.") + return + + status, _, _text = await _k8s_request( + "PATCH", + "/api/v1/namespaces/kube-system/configmaps/coredns", + json_body={"data": {"Corefile": new_corefile}}, + content_type="application/strategic-merge-patch+json", + ) + if status != 200: + LOGGER.warning("Failed to patch CoreDNS ConfigMap (HTTP %d): %s", status, _text[:200]) + return + + restart_annotation = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + status, _, _text = await _k8s_request( + "PATCH", + "/apis/apps/v1/namespaces/kube-system/deployments/coredns", + json_body={ + "spec": {"template": {"metadata": {"annotations": { + "harp.nextcloud.com/restartedAt": restart_annotation, + }}}} + }, + content_type="application/strategic-merge-patch+json", + ) + if status != 200: + LOGGER.warning("Failed to restart CoreDNS Deployment (HTTP %d): %s", status, _text[:200]) + return + + LOGGER.info("CoreDNS patched and restarted with host aliases: %s", K8S_HOST_ALIASES_RAW) + + except Exception as exc: + LOGGER.warning("Failed to configure CoreDNS host aliases (non-fatal): %s", exc) + + +def _k8s_build_deployment_manifest(payload: CreateExAppPayload, replicas: int) -> dict[str, Any]: + """Build a Deployment manifest from CreateExAppPayload.""" + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + # Base ExApp name (without role suffix) - used for grouping multi-role Deployments. + base_exapp_name = _sanitize_k8s_name( + f"nc_app_{payload.instance_id}_{payload.name}" if payload.instance_id else f"nc_app_{payload.name}" + ) + + labels = { + "app": deployment_name, + "app.kubernetes.io/name": deployment_name, + "app.kubernetes.io/part-of": base_exapp_name, + "app.kubernetes.io/component": "exapp", + } + if payload.instance_id: + labels["app.kubernetes.io/instance"] = payload.instance_id + if payload.role_suffix: + labels["app.kubernetes.io/role"] = _sanitize_k8s_name(payload.role_suffix) + + container: dict[str, Any] = { + "name": "app", + "image": payload.image_id, + "imagePullPolicy": "IfNotPresent", + "env": _k8s_parse_env(payload.environment_variables), + } + + resources = _k8s_build_resources(payload.resource_limits, payload.compute_device) + if resources: + container["resources"] = resources + + # Main data volume + volumes = [ + { + "name": "data", + "persistentVolumeClaim": {"claimName": pvc_name}, + } + ] + volume_mounts = [ + { + "name": "data", + "mountPath": f"/{payload.exapp_container_volume}", + } + ] + + if payload.mount_points: + LOGGER.warning( + "Kubernetes backend currently ignores additional mount_points for ExApp '%s'.", + deployment_name, + ) + + container["volumeMounts"] = volume_mounts + + pod_spec: dict[str, Any] = {"containers": [container], "volumes": volumes} + host_aliases = _k8s_parse_host_aliases() + if host_aliases: + pod_spec["hostAliases"] = host_aliases + + return { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": {"name": deployment_name, "labels": labels}, + "spec": { + "replicas": replicas, + "selector": {"matchLabels": {"app": deployment_name}}, + "template": {"metadata": {"labels": labels}, "spec": pod_spec}, + }, + } + + +def _k8s_build_service_manifest( + payload: ExposeExAppPayload, service_type: Literal["NodePort", "ClusterIP", "LoadBalancer"] +) -> dict[str, Any]: + service_name = payload.exapp_k8s_name + base_exapp_name = _sanitize_k8s_name( + f"nc_app_{payload.instance_id}_{payload.name}" if payload.instance_id else f"nc_app_{payload.name}" + ) + labels = { + "app": service_name, + "app.kubernetes.io/name": service_name, + "app.kubernetes.io/part-of": base_exapp_name, + "app.kubernetes.io/component": "exapp", + **(payload.service_labels or {}), + } + if payload.instance_id: + labels.setdefault("app.kubernetes.io/instance", payload.instance_id) + if payload.role_suffix: + labels["app.kubernetes.io/role"] = _sanitize_k8s_name(payload.role_suffix) + + metadata: dict[str, Any] = {"name": service_name, "labels": labels} + if payload.service_annotations: + metadata["annotations"] = payload.service_annotations + + svc_port = payload.service_port or payload.port + port_entry: dict[str, Any] = { + "name": "http", + "port": svc_port, + "targetPort": payload.port, + } + if service_type == "NodePort" and payload.node_port: + port_entry["nodePort"] = payload.node_port + + spec: dict[str, Any] = { + "type": service_type, + "selector": {"app": service_name}, + "ports": [port_entry], + } + + if payload.external_traffic_policy and service_type in ("NodePort", "LoadBalancer"): + spec["externalTrafficPolicy"] = payload.external_traffic_policy + + if service_type == "LoadBalancer" and payload.load_balancer_ip: + spec["loadBalancerIP"] = payload.load_balancer_ip + + return { + "apiVersion": "v1", + "kind": "Service", + "metadata": metadata, + "spec": spec, + } + + +async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None: + """Look up K8s Service for an ExApp, return (host, port) or None. + + Returns None when K8s is disabled or no Service exists (Docker ExApp). + Raises web.HTTPServiceUnavailable when the K8s API returned an error + (e.g. 403 token expired, 500 server error) so the caller knows not to + cache the result with stale NC-provided host/port. + Connection-level errors already raise via _k8s_request. + """ + if not K8S_ENABLED or not K8S_API_SERVER or not _get_k8s_token(): + return None + + try: + exapp = ExAppName(name=app_name) + except Exception: + return None + + svc: dict[str, Any] | None = None + got_k8s_error = False + + # Try the base Service name first (single-role / legacy ExApps). + service_name = exapp.exapp_k8s_name + status, data, _ = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status == 200 and isinstance(data, dict): + svc = data + elif status != 404: + got_k8s_error = True + + # Fallback: search for any Service labelled for this ExApp (multi-role). + if svc is None: + base_k8s_name = _sanitize_k8s_name(f"nc_app_{app_name}") + label_selector = f"app.kubernetes.io/part-of={base_k8s_name}" + status, svc_list, _ = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services", + query={"labelSelector": label_selector}, + ) + if status == 200 and isinstance(svc_list, dict): + items = svc_list.get("items") or [] + if items: + svc = items[0] + service_name = (svc.get("metadata") or {}).get("name", service_name) + elif status != 200: + got_k8s_error = True + + if svc is None: + if got_k8s_error: + LOGGER.warning( + "K8s API returned error during Service lookup for '%s'; " + "will not cache stale record", + app_name, + ) + raise web.HTTPServiceUnavailable( + text=f"K8s API error during Service lookup for '{app_name}'" + ) + return None + + svc_type = (svc.get("spec") or {}).get("type", "ClusterIP") + try: + if svc_type == "NodePort": + port = _k8s_extract_nodeport(svc) + host = await _k8s_pick_node_address(preferred_type="InternalIP") + return (host, port) + if svc_type == "ClusterIP": + port = _k8s_extract_service_port(svc) + host = _k8s_service_dns_name(service_name, K8S_NAMESPACE) + return (host, port) + if svc_type == "LoadBalancer": + port = _k8s_extract_service_port(svc) + host = _k8s_extract_loadbalancer_host(svc) + if host: + return (host, port) + except Exception as e: + LOGGER.warning("Failed to resolve K8s upstream for '%s': %s", app_name, e) + return None + + +def _k8s_service_dns_name(service_name: str, namespace: str) -> str: + # Cluster domain suffix is typically .svc.cluster.local, but .svc is enough inside most resolvers. + return f"{service_name}.{namespace}.svc" + + +async def _k8s_pick_node_address( + *, + preferred_type: Literal["InternalIP", "ExternalIP"], + node_name: str | None = None, + label_selector: str | None = None, +) -> str: + query = {"labelSelector": label_selector} if label_selector else None + status, nodes_data, text = await _k8s_request("GET", "/api/v1/nodes", query=query) + if status != 200 or not isinstance(nodes_data, dict): + raise web.HTTPServiceUnavailable( + text=f"Failed to list K8s nodes: Status {status}, {_k8s_error_msg(nodes_data, text)}" + ) + + items = nodes_data.get("items", []) + if node_name: + items = [n for n in items if n.get("metadata", {}).get("name") == node_name] + + if not items: + raise web.HTTPServiceUnavailable(text="No Kubernetes nodes found (after filtering).") + + def is_ready(node: dict[str, Any]) -> bool: + for cond in node.get("status", {}).get("conditions", []) or []: + if cond.get("type") == "Ready" and cond.get("status") == "True": + return True + return False + + ready_nodes = [n for n in items if is_ready(n)] + nodes = ready_nodes or items + + fallback_type = "ExternalIP" if preferred_type == "InternalIP" else "InternalIP" + address_type_order = [preferred_type, fallback_type, "Hostname"] + + for node in nodes: + for t in address_type_order: + for addr in node.get("status", {}).get("addresses", []) or []: + if addr.get("type") == t and addr.get("address"): + return str(addr["address"]) + + raise web.HTTPServiceUnavailable(text="Could not determine a node address (no InternalIP/ExternalIP/Hostname).") + + +def _k8s_extract_nodeport(service: dict[str, Any]) -> int: + ports = (service.get("spec") or {}).get("ports") or [] + if not ports or "nodePort" not in ports[0]: + raise web.HTTPServiceUnavailable(text="Service has no nodePort assigned.") + return int(ports[0]["nodePort"]) + + +def _k8s_extract_service_port(service: dict[str, Any]) -> int: + ports = (service.get("spec") or {}).get("ports") or [] + if not ports or "port" not in ports[0]: + raise web.HTTPServiceUnavailable(text="Service has no port defined.") + return int(ports[0]["port"]) + + +def _k8s_extract_loadbalancer_host(service: dict[str, Any]) -> str | None: + ingress = ((service.get("status") or {}).get("loadBalancer") or {}).get("ingress") or [] + if not ingress: + return None + first = ingress[0] or {} + return first.get("ip") or first.get("hostname") + + +async def _k8s_wait_for_loadbalancer_host(service_name: str, timeout_s: float, interval_s: float) -> str: + deadline = time.time() + max(0.0, timeout_s) + while True: + status, svc, text = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status != 200 or not isinstance(svc, dict): + raise web.HTTPServiceUnavailable( + text=f"Failed to read Service '{service_name}': Status {status}, {_k8s_error_msg(svc, text)}" + ) + + host = _k8s_extract_loadbalancer_host(svc) + if host: + return host + + if time.time() >= deadline: + raise web.HTTPServiceUnavailable( + text=f"Timed out waiting for LoadBalancer address for Service '{service_name}'" + ) + + await asyncio.sleep(interval_s) + + +############################################################################### +# Endpoints for AppAPI to work with the Kubernetes API +############################################################################### + + +async def k8s_exapp_exists(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + + status, data, text = await _k8s_request( + "GET", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + ) + if status == 200: + return web.json_response({"exists": True}) + if status == 404: + return web.json_response({"exists": False}) + LOGGER.error("Error checking deployment '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error checking deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_create(request: web.Request): + payload = await _parse_json_payload(request, CreateExAppPayload) + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + LOGGER.info("Creating K8s resources for '%s' (ns=%s).", payload.name, K8S_NAMESPACE) + + pvc_manifest: dict[str, Any] = { + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": { + "name": pvc_name, + "labels": { + "app": deployment_name, + "app.kubernetes.io/name": deployment_name, + "app.kubernetes.io/component": "exapp", + }, + }, + "spec": { + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": K8S_DEFAULT_STORAGE_SIZE}}, + }, + } + if K8S_STORAGE_CLASS: + pvc_manifest["spec"]["storageClassName"] = K8S_STORAGE_CLASS + + status, data, text = await _k8s_request( + "POST", + f"/api/v1/namespaces/{K8S_NAMESPACE}/persistentvolumeclaims", + json_body=pvc_manifest, + ) + if status in (200, 201): + LOGGER.info("PVC '%s' created.", pvc_name) + elif status == 409: + LOGGER.info("PVC '%s' already exists.", pvc_name) + else: + LOGGER.error("Failed to create PVC '%s' (status %s): %s", pvc_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Failed to create PVC '{pvc_name}': Status {status}") + + deployment_manifest = _k8s_build_deployment_manifest(payload, replicas=0) + status, data, text = await _k8s_request( + "POST", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments", + json_body=deployment_manifest, + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' created.", deployment_name) + return web.json_response({"name": deployment_name}, status=201) + if status == 409: + raise web.HTTPConflict(text=f"Deployment '{deployment_name}' already exists.") + LOGGER.error("Error creating deployment '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error creating deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_start(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + + status, data, text = await _k8s_request( + "PATCH", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + json_body={"spec": {"replicas": 1}}, + content_type="application/strategic-merge-patch+json", + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' scaled to 1.", deployment_name) + return web.HTTPNoContent() + if status == 404: + raise web.HTTPNotFound(text=f"Deployment '{deployment_name}' not found.") + LOGGER.error("Error starting '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error starting deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_stop(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + + status, data, text = await _k8s_request( + "PATCH", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + json_body={"spec": {"replicas": 0}}, + content_type="application/strategic-merge-patch+json", + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' scaled to 0.", deployment_name) + return web.HTTPNoContent() + if status == 404: + raise web.HTTPNotFound(text=f"Deployment '{deployment_name}' not found.") + LOGGER.error("Error stopping '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error stopping deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_wait_for_start(request: web.Request): + payload = await _parse_json_payload(request, ExAppName) + deployment_name = payload.exapp_k8s_name + label_selector = f"app={deployment_name}" + + # Startup timeout (seconds) applies only *after* the image has been pulled. While the image is being pulled + # we wait indefinitely (image sizes vary and we cannot predict download speed). + startup_timeout = 90.0 + sleep_interval = 1.0 + # Safety cap: abort even during image pull after this many seconds to avoid waiting forever if something + # is truly stuck (e.g. registry unreachable but not yet in ImagePullBackOff). + image_pull_max_wait = 3600.0 + + LOGGER.info( + "Waiting for Kubernetes pod(s) of deployment '%s' to become Ready " + "(namespace=%s, startup_timeout=%.0fs, image_pull_max_wait=%.0fs).", + deployment_name, + K8S_NAMESPACE, + startup_timeout, + image_pull_max_wait, + ) + + last_phase: str | None = None + last_reason: str | None = None + last_message: str | None = None + image_pulled = False + start_time = time.monotonic() + post_pull_start: float | None = None + + while True: + elapsed = time.monotonic() - start_time + + if image_pulled: + assert post_pull_start is not None + if time.monotonic() - post_pull_start > startup_timeout: + LOGGER.warning( + "Deployment '%s' did not become Ready within %.0fs after image pull.", + deployment_name, + startup_timeout, + ) + break + elif elapsed > image_pull_max_wait: + LOGGER.warning( + "Deployment '%s' image pull did not complete within %.0fs.", + deployment_name, + image_pull_max_wait, + ) + break + + status, data, text = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/pods", + query={"labelSelector": label_selector}, + ) + if status != 200: + LOGGER.error( + "Error listing pods for '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text) + ) + raise web.HTTPServiceUnavailable( + text=f"Error listing pods for deployment '{deployment_name}': Status {status}" + ) + + items = (data or {}).get("items", []) if isinstance(data, dict) else [] + if not items: + LOGGER.debug( + "No pods yet for deployment '%s' (elapsed %.0fs).", + deployment_name, + elapsed, + ) + last_phase = "Pending" + else: + # Take the first pod; for single-replica deployments this is enough. + pod = items[0] + pod_status = pod.get("status", {}) + phase = pod_status.get("phase", "Unknown") + last_phase = phase + conditions = pod_status.get("conditions", []) + ready = any(c.get("type") == "Ready" and c.get("status") == "True" for c in conditions) + last_reason = pod_status.get("reason") + last_message = pod_status.get("message") + + # Inspect container statuses to detect image pull vs app startup. + container_statuses = pod_status.get("containerStatuses", []) + waiting_reason = None + if container_statuses: + cs = container_statuses[0] + waiting = cs.get("state", {}).get("waiting", {}) + waiting_reason = waiting.get("reason") + + LOGGER.debug( + "Pod status for '%s' (elapsed %.0fs): phase=%s, ready=%s, " + "waiting_reason=%s, reason=%s, message=%s", + deployment_name, + elapsed, + phase, + ready, + waiting_reason, + last_reason, + last_message, + ) + + if phase == "Running" and ready: + LOGGER.info("Deployment '%s' pod is Running and Ready.", deployment_name) + return web.json_response( + { + "started": True, + "status": "running", + "health": "ready", + "reason": last_reason, + "message": last_message, + } + ) + + if phase in ("Failed", "Unknown", "Succeeded"): + LOGGER.warning( + "Deployment '%s' pod is in phase '%s', treating as not successfully started.", + deployment_name, + phase, + ) + return web.json_response( + { + "started": False, + "status": phase, + "health": "not_ready", + "reason": last_reason, + "message": last_message, + } + ) + + # Fail fast on image pull errors. + if waiting_reason in ("ErrImagePull", "ImagePullBackOff", "InvalidImageName"): + wait_msg = container_statuses[0].get("state", {}).get("waiting", {}).get("message", "") + LOGGER.error( + "Deployment '%s' pod has image pull error: %s - %s", + deployment_name, + waiting_reason, + wait_msg, + ) + return web.json_response( + { + "started": False, + "status": waiting_reason, + "health": "image_error", + "reason": waiting_reason, + "message": wait_msg, + } + ) + + # Detect when image pull is done: once the container has a non-pull waiting reason (e.g. CrashLoopBackOff) + # or is running/terminated, the image has been pulled. + if not image_pulled: + # Still pulling: no container statuses yet, or waiting with ContainerCreating / PodInitializing. + still_pulling = ( + not container_statuses + or waiting_reason in ("ContainerCreating", "PodInitializing", None) + ) + if not still_pulling: + image_pulled = True + post_pull_start = time.monotonic() + LOGGER.info( + "Deployment '%s' image pull completed after %.0fs. " + "Starting startup countdown (%.0fs).", + deployment_name, + elapsed, + startup_timeout, + ) + + await asyncio.sleep(sleep_interval) + + return web.json_response( + { + "started": False, + "status": last_phase or "unknown", + "health": "timeout", + "reason": last_reason, + "message": last_message, + } + ) + + +async def k8s_exapp_remove(request: web.Request): + payload = await _parse_json_payload(request, RemoveExAppPayload) + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + LOGGER.info( + "Removing K8s resources for '%s' (ns=%s, remove_data=%s).", deployment_name, K8S_NAMESPACE, payload.remove_data + ) + + status, data, text = await _k8s_request( + "DELETE", f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + ) + if status not in (200, 202, 404): + LOGGER.error( + "Error removing deployment '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text) + ) + raise web.HTTPServiceUnavailable(text=f"Error removing deployment '{deployment_name}': Status {status}") + + if payload.remove_data: + status, data, text = await _k8s_request( + "DELETE", f"/api/v1/namespaces/{K8S_NAMESPACE}/persistentvolumeclaims/{pvc_name}", + ) + if status not in (200, 202, 404): + LOGGER.error("Error removing PVC '%s' (status %s): %s", pvc_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error removing PVC '{pvc_name}': Status {status}") + + status, data, text = await _k8s_request( + "DELETE", f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{deployment_name}", + ) + if status not in (200, 202, 404): + LOGGER.error("Error removing Service '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error removing Service '{deployment_name}': Status {status}") + return web.HTTPNoContent() + + +async def k8s_exapp_install_certificates(request: web.Request): + # K8s: certificates are handled via Secrets/volume mounts, this is a no-op. + await _parse_json_payload(request, InstallCertificatesPayload) + return web.HTTPNoContent() + + +async def k8s_exapp_expose(request: web.Request): + payload = await _parse_json_payload(request, ExposeExAppPayload) + app_id = payload.name.lower() + service_name = payload.exapp_k8s_name + + if payload.expose_type == "manual": + upstream_host = payload.upstream_host + upstream_port = int(payload.upstream_port or payload.port) + else: + _ensure_k8s_configured() + + type_map: dict[str, Literal["NodePort", "ClusterIP", "LoadBalancer"]] = { + "nodeport": "NodePort", "clusterip": "ClusterIP", "loadbalancer": "LoadBalancer", + } + desired_type = type_map.get(payload.expose_type) + if not desired_type: + raise web.HTTPBadRequest(text=f"Unknown expose_type '{payload.expose_type}'") + + service_manifest = _k8s_build_service_manifest(payload, desired_type) + status, data, text = await _k8s_request( + "POST", f"/api/v1/namespaces/{K8S_NAMESPACE}/services", json_body=service_manifest, + ) + if status not in (200, 201, 409): + LOGGER.error( + "Failed to create Service '%s' (status %s): %s", service_name, status, _k8s_error_msg(data, text) + ) + raise web.HTTPServiceUnavailable(text=f"Failed to create Service '{service_name}': Status {status}") + + status, svc, text = await _k8s_request( + "GET", f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status != 200 or not isinstance(svc, dict): + LOGGER.error("Failed to read Service '%s' (status %s): %s", service_name, status, _k8s_error_msg(svc, text)) + raise web.HTTPServiceUnavailable(text=f"Failed to read Service '{service_name}': Status {status}") + + if payload.expose_type == "nodeport": + upstream_port = _k8s_extract_nodeport(svc) + upstream_host = payload.upstream_host or await _k8s_pick_node_address( + preferred_type=payload.node_address_type, + node_name=payload.node_name, + label_selector=payload.node_label_selector, + ) + elif payload.expose_type == "clusterip": + upstream_port = _k8s_extract_service_port(svc) + upstream_host = payload.upstream_host or _k8s_service_dns_name(service_name, K8S_NAMESPACE) + else: # loadbalancer + upstream_port = _k8s_extract_service_port(svc) + upstream_host = payload.upstream_host or _k8s_extract_loadbalancer_host(svc) + if not upstream_host: + upstream_host = await _k8s_wait_for_loadbalancer_host( + service_name, timeout_s=payload.wait_timeout_seconds, interval_s=payload.wait_interval_seconds, + ) + + LOGGER.info("Expose '%s' (%s): upstream %s:%d", app_id, payload.expose_type, upstream_host, upstream_port) + try: + exapp_meta = await nc_get_exapp(app_id) + if not exapp_meta: + LOGGER.error("No ExApp metadata for '%s' in Nextcloud.", app_id) + raise web.HTTPNotFound(text=f"No ExApp metadata for '{app_id}'") + except web.HTTPException: + raise + except Exception as e: + LOGGER.exception("Failed to fetch ExApp metadata for '%s'", app_id) + raise web.HTTPServiceUnavailable(text=f"Failed to fetch metadata for '{app_id}'") from e + + exapp_meta.host = upstream_host + exapp_meta.port = int(upstream_port) + exapp_meta.resolved_host = "" + + EXAPP_CACHE[app_id] = exapp_meta + _EXAPP_NEGATIVE_CACHE.pop(app_id, None) + + return web.json_response( + { + "appId": app_id, + "host": upstream_host, + "port": int(upstream_port), + "exposeType": payload.expose_type, + "serviceName": service_name, + "namespace": K8S_NAMESPACE, + } + ) + + ############################################################################### # HTTP Server Setup ############################################################################### @@ -1688,6 +2882,17 @@ def create_web_app() -> web.Application: app.router.add_post("/docker/exapp/wait_for_start", docker_exapp_wait_for_start) app.router.add_post("/docker/exapp/remove", docker_exapp_remove) app.router.add_post("/docker/exapp/install_certificates", docker_exapp_install_certificates) + + # Kubernetes APIs wrappers + app.router.add_post("/k8s/exapp/exists", k8s_exapp_exists) + app.router.add_post("/k8s/exapp/create", k8s_exapp_create) + app.router.add_post("/k8s/exapp/start", k8s_exapp_start) + app.router.add_post("/k8s/exapp/stop", k8s_exapp_stop) + app.router.add_post("/k8s/exapp/wait_for_start", k8s_exapp_wait_for_start) + app.router.add_post("/k8s/exapp/remove", k8s_exapp_remove) + app.router.add_post("/k8s/exapp/install_certificates", k8s_exapp_install_certificates) + app.router.add_post("/k8s/exapp/expose", k8s_exapp_expose) + app.on_shutdown.append(_close_k8s_session) return app @@ -1708,6 +2913,9 @@ async def run_http_server(host="127.0.0.1", port=8200): async def main(): + # Ensure cluster-wide DNS for host aliases before starting servers. + await _k8s_ensure_coredns_host_aliases() + spoa_task = asyncio.create_task(SPOA_AGENT._run(host=SPOA_HOST, port=SPOA_PORT)) # noqa http_task = asyncio.create_task(run_http_server(host="127.0.0.1", port=8200))