Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 163 additions & 11 deletions haproxy_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class InstallCertificatesPayload(ExAppName):

class ExposeExAppPayload(ExAppName):
port: int = Field(..., ge=1, le=65535, description="Port on which the ExApp listens inside the Pod/container.")
expose_type: Literal["nodeport", "clusterip", "loadbalancer", "manual"] = Field("nodeport")
expose_type: Literal["nodeport", "clusterip", "loadbalancer", "manual"] = Field("clusterip")
upstream_host: str | None = Field(None, description="Host override. Required for expose_type=manual.")
upstream_port: int | None = Field(None, ge=1, le=65535, description="Port override (manual only).")
service_port: int | None = Field(None, ge=1, le=65535, description="Service port (defaults to payload.port).")
Expand Down Expand Up @@ -2227,6 +2227,7 @@ def _k8s_build_service_manifest(
"app.kubernetes.io/name": service_name,
"app.kubernetes.io/part-of": base_exapp_name,
"app.kubernetes.io/component": "exapp",
"app.kubernetes.io/managed-by": "harp",
**(payload.service_labels or {}),
}
if payload.instance_id:
Expand Down Expand Up @@ -2267,6 +2268,78 @@ def _k8s_build_service_manifest(
}


def _extract_manual_upstream(annotations: dict[str, str], deploy_name: str) -> tuple[str, int] | None:
"""Extract manual upstream host/port from Deployment annotations."""
if annotations.get("appapi.nextcloud.com/expose-type") != "manual":
return None

host = annotations.get("appapi.nextcloud.com/upstream-host")
port_str = annotations.get("appapi.nextcloud.com/upstream-port")
if not host or not port_str:
LOGGER.warning(
"Deployment '%s' has manual expose-type annotation but missing upstream-host/port.",
deploy_name,
)
return None

try:
port = int(port_str)
except ValueError:
LOGGER.warning("Invalid upstream-port annotation on Deployment '%s': %s", deploy_name, port_str)
return None
return (host, port)


async def _k8s_resolve_manual_upstream(deployment_name: str, app_name: str) -> tuple[str, int] | None:
"""Read manual-expose upstream config from Deployment annotations.

For expose_type=manual, no K8s Service is created. Instead, the upstream
host and port are persisted as annotations on the Deployment during the
expose call. This function reads those annotations to re-discover the
upstream after a cache eviction or HaRP restart.

Returns (host, port) if annotations are present, None otherwise (e.g. a
Docker-only ExApp that has no K8s Deployment at all).
"""
# Try the base Deployment name first (single-role ExApps).
status, data, _ = await _k8s_request(
"GET",
f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}",
)
if status == 200 and isinstance(data, dict):
annotations = (data.get("metadata") or {}).get("annotations") or {}
result = _extract_manual_upstream(annotations, deployment_name)
if result:
LOGGER.info("Resolved manual upstream for '%s' from Deployment annotations: %s:%d", app_name, *result)
return result
elif status != 404:
LOGGER.warning(
"K8s API error fetching Deployment '%s' for manual upstream (status %s)",
deployment_name, status,
)
return None

# Fallback: search for multi-role Deployments by label (the exposed role
# carries the manual annotations).
base_k8s_name = _sanitize_k8s_name(f"nc_app_{app_name}")
label_selector = f"app.kubernetes.io/part-of={base_k8s_name}"
status, deploy_list, _ = await _k8s_request(
"GET",
f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments",
query={"labelSelector": label_selector},
)
if status == 200 and isinstance(deploy_list, dict):
for item in deploy_list.get("items") or []:
annotations = (item.get("metadata") or {}).get("annotations") or {}
d_name = (item.get("metadata") or {}).get("name", "")
result = _extract_manual_upstream(annotations, d_name)
if result:
LOGGER.info("Resolved manual upstream for '%s' from role Deployment '%s': %s:%d", app_name, d_name, *result)
return result

return None


async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None:
"""Look up K8s Service for an ExApp, return (host, port) or None.

Expand Down Expand Up @@ -2325,7 +2398,10 @@ async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None:
raise web.HTTPServiceUnavailable(
text=f"K8s API error during Service lookup for '{app_name}'"
)
return None

# No Service found — check if this is a manual-expose ExApp by reading
# the upstream config stored as annotations on the Deployment.
return await _k8s_resolve_manual_upstream(exapp.exapp_k8s_name, app_name)

svc_type = (svc.get("spec") or {}).get("type", "ClusterIP")
try:
Expand All @@ -2335,7 +2411,15 @@ async def _k8s_resolve_exapp_upstream(app_name: str) -> tuple[str, int] | None:
return (host, port)
if svc_type == "ClusterIP":
port = _k8s_extract_service_port(svc)
host = _k8s_service_dns_name(service_name, K8S_NAMESPACE)
# Use the actual ClusterIP address instead of DNS name. DNS names
# like "svc.namespace.svc" only resolve inside the cluster, but HaRP
# may run outside (e.g. Docker on the host). ClusterIPs are routable
# via iptables/ipvs from anywhere with kube-proxy access.
cluster_ip = (svc.get("spec") or {}).get("clusterIP")
if cluster_ip and cluster_ip != "None":
host = cluster_ip
else:
host = _k8s_service_dns_name(service_name, K8S_NAMESPACE)
return (host, port)
if svc_type == "LoadBalancer":
port = _k8s_extract_service_port(svc)
Expand Down Expand Up @@ -2665,7 +2749,24 @@ async def k8s_exapp_wait_for_start(request: web.Request):
}
)

if phase in ("Failed", "Unknown", "Succeeded"):
# Succeeded means the pod ran and exited 0 (e.g. init-only apps
# like app-skeleton-python). Treat as successful start.
if phase == "Succeeded":
LOGGER.info(
"Deployment '%s' pod completed successfully (phase=Succeeded).",
deployment_name,
)
return web.json_response(
{
"started": True,
"status": "succeeded",
"health": "ready",
"reason": last_reason,
"message": last_message,
}
)

if phase in ("Failed", "Unknown"):
LOGGER.warning(
"Deployment '%s' pod is in phase '%s', treating as not successfully started.",
deployment_name,
Expand Down Expand Up @@ -2758,12 +2859,21 @@ async def k8s_exapp_remove(request: web.Request):
LOGGER.error("Error removing PVC '%s' (status %s): %s", pvc_name, status, _k8s_error_msg(data, text))
raise web.HTTPServiceUnavailable(text=f"Error removing PVC '{pvc_name}': Status {status}")

status, data, text = await _k8s_request(
"DELETE", f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{deployment_name}",
)
if status not in (200, 202, 404):
LOGGER.error("Error removing Service '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text))
raise web.HTTPServiceUnavailable(text=f"Error removing Service '{deployment_name}': Status {status}")
# Only delete Services that HaRP created (labelled managed-by=harp).
# For manual expose, the operator manages the Service — don't delete it.
svc_path = f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{deployment_name}"
status, svc_data, text = await _k8s_request("GET", svc_path)
if status == 200 and isinstance(svc_data, dict):
svc_labels = (svc_data.get("metadata") or {}).get("labels") or {}
if svc_labels.get("app.kubernetes.io/managed-by") == "harp":
status, data, text = await _k8s_request("DELETE", svc_path)
if status not in (200, 202, 404):
LOGGER.error("Error removing Service '%s' (status %s): %s", deployment_name, status, _k8s_error_msg(data, text))
raise web.HTTPServiceUnavailable(text=f"Error removing Service '{deployment_name}': Status {status}")
else:
LOGGER.info("Skipping deletion of externally-managed Service '%s'.", deployment_name)
elif status != 404:
LOGGER.warning("Error checking Service '%s' (status %s), skipping deletion.", deployment_name, status)
return web.HTTPNoContent()


Expand All @@ -2781,6 +2891,37 @@ async def k8s_exapp_expose(request: web.Request):
if payload.expose_type == "manual":
upstream_host = payload.upstream_host
upstream_port = int(payload.upstream_port or payload.port)

# Persist the manual upstream config as annotations on the Deployment so
# that _k8s_resolve_exapp_upstream() can re-discover it after a cache
# eviction or HaRP restart (no K8s Service exists for manual type).
# This is best-effort: if K8s is not configured or the PATCH fails, the
# expose still succeeds (upstream is cached in memory).
if K8S_ENABLED and K8S_API_SERVER and _get_k8s_token():
try:
deploy_name = payload.exapp_k8s_name
annotation_patch = {
"metadata": {
"annotations": {
"appapi.nextcloud.com/expose-type": "manual",
"appapi.nextcloud.com/upstream-host": upstream_host,
"appapi.nextcloud.com/upstream-port": str(upstream_port),
},
},
}
status, _, text = await _k8s_request(
"PATCH",
f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deploy_name}",
json_body=annotation_patch,
content_type="application/strategic-merge-patch+json",
)
if status not in (200, 201):
LOGGER.warning(
"Failed to annotate Deployment '%s' with manual upstream (status %s): %s",
deploy_name, status, text[:200] if text else "",
)
except Exception as e:
LOGGER.warning("Best-effort annotation PATCH failed for '%s': %s", payload.name, e)
else:
_ensure_k8s_configured()

Expand Down Expand Up @@ -2817,7 +2958,18 @@ async def k8s_exapp_expose(request: web.Request):
)
elif payload.expose_type == "clusterip":
upstream_port = _k8s_extract_service_port(svc)
upstream_host = payload.upstream_host or _k8s_service_dns_name(service_name, K8S_NAMESPACE)
# Use the actual ClusterIP address instead of DNS name. DNS names
# like "svc.namespace.svc" only resolve inside the cluster, but HaRP
# may run outside (e.g. Docker on the host). ClusterIPs are routable
# via iptables/ipvs from anywhere with kube-proxy access.
if not payload.upstream_host:
cluster_ip = (svc.get("spec") or {}).get("clusterIP")
if cluster_ip and cluster_ip != "None":
upstream_host = cluster_ip
else:
upstream_host = _k8s_service_dns_name(service_name, K8S_NAMESPACE)
else:
upstream_host = payload.upstream_host
else: # loadbalancer
upstream_port = _k8s_extract_service_port(svc)
upstream_host = payload.upstream_host or _k8s_extract_loadbalancer_host(svc)
Expand Down
Loading