diff --git a/haproxy_agent.py b/haproxy_agent.py index 9ea2636..d3acc82 100644 --- a/haproxy_agent.py +++ b/haproxy_agent.py @@ -213,7 +213,7 @@ class InstallCertificatesPayload(ExAppName): class ExposeExAppPayload(ExAppName): port: int = Field(..., ge=1, le=65535, description="Port on which the ExApp listens inside the Pod/container.") - expose_type: Literal["nodeport", "clusterip", "loadbalancer", "manual"] = Field("nodeport") + expose_type: Literal["nodeport", "clusterip", "loadbalancer", "manual"] = Field("clusterip") upstream_host: str | None = Field(None, description="Host override. Required for expose_type=manual.") upstream_port: int | None = Field(None, ge=1, le=65535, description="Port override (manual only).") service_port: int | None = Field(None, ge=1, le=65535, description="Service port (defaults to payload.port).") @@ -2227,6 +2227,7 @@ def _k8s_build_service_manifest( "app.kubernetes.io/name": service_name, "app.kubernetes.io/part-of": base_exapp_name, "app.kubernetes.io/component": "exapp", + "app.kubernetes.io/managed-by": "harp", **(payload.service_labels or {}), } if payload.instance_id: @@ -2267,6 +2268,78 @@ def _k8s_build_service_manifest( } +def _extract_manual_upstream(annotations: dict[str, str], deploy_name: str) -> tuple[str, int] | None: + """Extract manual upstream host/port from Deployment annotations.""" + if annotations.get("appapi.nextcloud.com/expose-type") != "manual": + return None + + host = annotations.get("appapi.nextcloud.com/upstream-host") + port_str = annotations.get("appapi.nextcloud.com/upstream-port") + if not host or not port_str: + LOGGER.warning( + "Deployment '%s' has manual expose-type annotation but missing upstream-host/port.", + deploy_name, + ) + return None + + try: + port = int(port_str) + except ValueError: + LOGGER.warning("Invalid upstream-port annotation on Deployment '%s': %s", deploy_name, port_str) + return None + return (host, port) + + +async def _k8s_resolve_manual_upstream(deployment_name: str, app_name: str) -> tuple[str, int] | None: + """Read manual-expose upstream config from Deployment annotations. + + For expose_type=manual, no K8s Service is created. Instead, the upstream + host and port are persisted as annotations on the Deployment during the + expose call. This function reads those annotations to re-discover the + upstream after a cache eviction or HaRP restart. + + Returns (host, port) if annotations are present, None otherwise (e.g. a + Docker-only ExApp that has no K8s Deployment at all). + """ + # Try the base Deployment name first (single-role ExApps). + status, data, _ = await _k8s_request( + "GET", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + ) + if status == 200 and isinstance(data, dict): + annotations = (data.get("metadata") or {}).get("annotations") or {} + result = _extract_manual_upstream(annotations, deployment_name) + if result: + LOGGER.info("Resolved manual upstream for '%s' from Deployment annotations: %s:%d", app_name, *result) + return result + elif status != 404: + LOGGER.warning( + "K8s API error fetching Deployment '%s' for manual upstream (status %s)", + deployment_name, status, + ) + return None + + # Fallback: search for multi-role Deployments by label (the exposed role + # carries the manual annotations). + base_k8s_name = _sanitize_k8s_name(f"nc_app_{app_name}") + label_selector = f"app.kubernetes.io/part-of={base_k8s_name}" + status, deploy_list, _ = await _k8s_request( + "GET", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments", + query={"labelSelector": label_selector}, + ) + if status == 200 and isinstance(deploy_list, dict): + for item in deploy_list.get("items") or []: + annotations = (item.get("metadata") or {}).get("annotations") or {} + d_name = (item.get("metadata") or {}).get("name", "") + result = _extract_manual_upstream(annotations, d_name) + if result: + LOGGER.info("Resolved manual upstream for '%s' from role Deployment '%s': %s:%d", app_name, d_name, *result) + return result + + return None + + async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None: """Look up K8s Service for an ExApp, return (host, port) or None. @@ -2325,7 +2398,10 @@ async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None: raise web.HTTPServiceUnavailable( text=f"K8s API error during Service lookup for '{app_name}'" ) - return None + + # No Service found — check if this is a manual-expose ExApp by reading + # the upstream config stored as annotations on the Deployment. + return await _k8s_resolve_manual_upstream(exapp.exapp_k8s_name, app_name) svc_type = (svc.get("spec") or {}).get("type", "ClusterIP") try: @@ -2335,7 +2411,15 @@ async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None: return (host, port) if svc_type == "ClusterIP": port = _k8s_extract_service_port(svc) - host = _k8s_service_dns_name(service_name, K8S_NAMESPACE) + # Use the actual ClusterIP address instead of DNS name. DNS names + # like "svc.namespace.svc" only resolve inside the cluster, but HaRP + # may run outside (e.g. Docker on the host). ClusterIPs are routable + # via iptables/ipvs from anywhere with kube-proxy access. + cluster_ip = (svc.get("spec") or {}).get("clusterIP") + if cluster_ip and cluster_ip != "None": + host = cluster_ip + else: + host = _k8s_service_dns_name(service_name, K8S_NAMESPACE) return (host, port) if svc_type == "LoadBalancer": port = _k8s_extract_service_port(svc) @@ -2665,7 +2749,24 @@ async def k8s_exapp_wait_for_start(request: web.Request): } ) - if phase in ("Failed", "Unknown", "Succeeded"): + # Succeeded means the pod ran and exited 0 (e.g. init-only apps + # like app-skeleton-python). Treat as successful start. + if phase == "Succeeded": + LOGGER.info( + "Deployment '%s' pod completed successfully (phase=Succeeded).", + deployment_name, + ) + return web.json_response( + { + "started": True, + "status": "succeeded", + "health": "ready", + "reason": last_reason, + "message": last_message, + } + ) + + if phase in ("Failed", "Unknown"): LOGGER.warning( "Deployment '%s' pod is in phase '%s', treating as not successfully started.", deployment_name, @@ -2758,12 +2859,21 @@ async def k8s_exapp_remove(request: web.Request): LOGGER.error("Error removing PVC '%s' (status %s): %s", pvc_name, status, _k8s_error_msg(data, text)) raise web.HTTPServiceUnavailable(text=f"Error removing PVC '{pvc_name}': Status {status}") - status, data, text = await _k8s_request( - "DELETE", f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{deployment_name}", - ) - if status not in (200, 202, 404): - LOGGER.error("Error removing Service '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) - raise web.HTTPServiceUnavailable(text=f"Error removing Service '{deployment_name}': Status {status}") + # Only delete Services that HaRP created (labelled managed-by=harp). + # For manual expose, the operator manages the Service — don't delete it. + svc_path = f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{deployment_name}" + status, svc_data, text = await _k8s_request("GET", svc_path) + if status == 200 and isinstance(svc_data, dict): + svc_labels = (svc_data.get("metadata") or {}).get("labels") or {} + if svc_labels.get("app.kubernetes.io/managed-by") == "harp": + status, data, text = await _k8s_request("DELETE", svc_path) + if status not in (200, 202, 404): + LOGGER.error("Error removing Service '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text)) + raise web.HTTPServiceUnavailable(text=f"Error removing Service '{deployment_name}': Status {status}") + else: + LOGGER.info("Skipping deletion of externally-managed Service '%s'.", deployment_name) + elif status != 404: + LOGGER.warning("Error checking Service '%s' (status %s), skipping deletion.", deployment_name, status) return web.HTTPNoContent() @@ -2781,6 +2891,37 @@ async def k8s_exapp_expose(request: web.Request): if payload.expose_type == "manual": upstream_host = payload.upstream_host upstream_port = int(payload.upstream_port or payload.port) + + # Persist the manual upstream config as annotations on the Deployment so + # that _k8s_resolve_exapp_upstream() can re-discover it after a cache + # eviction or HaRP restart (no K8s Service exists for manual type). + # This is best-effort: if K8s is not configured or the PATCH fails, the + # expose still succeeds (upstream is cached in memory). + if K8S_ENABLED and K8S_API_SERVER and _get_k8s_token(): + try: + deploy_name = payload.exapp_k8s_name + annotation_patch = { + "metadata": { + "annotations": { + "appapi.nextcloud.com/expose-type": "manual", + "appapi.nextcloud.com/upstream-host": upstream_host, + "appapi.nextcloud.com/upstream-port": str(upstream_port), + }, + }, + } + status, _, text = await _k8s_request( + "PATCH", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deploy_name}", + json_body=annotation_patch, + content_type="application/strategic-merge-patch+json", + ) + if status not in (200, 201): + LOGGER.warning( + "Failed to annotate Deployment '%s' with manual upstream (status %s): %s", + deploy_name, status, text[:200] if text else "", + ) + except Exception as e: + LOGGER.warning("Best-effort annotation PATCH failed for '%s': %s", payload.name, e) else: _ensure_k8s_configured() @@ -2817,7 +2958,18 @@ async def k8s_exapp_expose(request: web.Request): ) elif payload.expose_type == "clusterip": upstream_port = _k8s_extract_service_port(svc) - upstream_host = payload.upstream_host or _k8s_service_dns_name(service_name, K8S_NAMESPACE) + # Use the actual ClusterIP address instead of DNS name. DNS names + # like "svc.namespace.svc" only resolve inside the cluster, but HaRP + # may run outside (e.g. Docker on the host). ClusterIPs are routable + # via iptables/ipvs from anywhere with kube-proxy access. + if not payload.upstream_host: + cluster_ip = (svc.get("spec") or {}).get("clusterIP") + if cluster_ip and cluster_ip != "None": + upstream_host = cluster_ip + else: + upstream_host = _k8s_service_dns_name(service_name, K8S_NAMESPACE) + else: + upstream_host = payload.upstream_host else: # loadbalancer upstream_port = _k8s_extract_service_port(svc) upstream_host = payload.upstream_host or _k8s_extract_loadbalancer_host(svc)