Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions horizon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ def __new__(cls, *, prefix=None, is_model=True): # noqa: ARG004
description="Filename for offline mode's policy backup (OPAL's offline mode backup)",
)

CONTROL_PLANE_CONNECTIVITY_DISABLED = confi.bool(
"CONTROL_PLANE_CONNECTIVITY_DISABLED",
False,
description="When true (and ENABLE_OFFLINE_MODE is true), the PDP starts disconnected from the control plane "
"and serves from a local backup. Can be toggled at runtime via the /control-plane/connectivity endpoints.",
)

CONFIG_FETCH_MAX_RETRIES = confi.int(
"CONFIG_FETCH_MAX_RETRIES",
6,
Expand Down
Empty file.
115 changes: 115 additions & 0 deletions horizon/connectivity/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING, Literal

from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel

from horizon.authentication import enforce_pdp_token

if TYPE_CHECKING:
from opal_client.client import OpalClient

logger = logging.getLogger(__name__)


class ConnectivityStatus(BaseModel):
control_plane_connectivity_disabled: bool
Comment thread
omer9564 marked this conversation as resolved.
offline_mode_enabled: bool


class ConnectivityActionResult(BaseModel):
status: Literal[
"enabled",
"disabled",
"already_enabled",
"already_disabled",
]


def init_connectivity_router(opal_client: OpalClient):
router = APIRouter(
prefix="/control-plane",
dependencies=[Depends(enforce_pdp_token)],
Comment thread
omer9564 marked this conversation as resolved.
Comment thread
omer9564 marked this conversation as resolved.
)
_lock = asyncio.Lock()

@router.get(
"/connectivity",
status_code=status.HTTP_200_OK,
response_model=ConnectivityStatus,
summary="Get control plane connectivity status",
description="Returns the current connectivity state to the control plane and whether offline mode is enabled.",
)
async def get_connectivity_status():
return ConnectivityStatus(
control_plane_connectivity_disabled=opal_client.opal_server_connectivity_disabled,
offline_mode_enabled=opal_client.offline_mode_enabled,
)

@router.post(
Comment thread
omer9564 marked this conversation as resolved.
"/connectivity/enable",
status_code=status.HTTP_200_OK,
response_model=ConnectivityActionResult,
responses={
400: {"description": "Offline mode is not enabled"},
500: {"description": "Failed to enable control plane connectivity"},
},
summary="Enable control plane connectivity",
description="Starts the policy and data updaters, reconnecting to the control plane. "
"Triggers a full rehydration (policy refetch + data refetch). "
"Requires offline mode to be enabled.",
)
async def enable_connectivity():
if not opal_client.offline_mode_enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot enable control plane connectivity: offline mode is not enabled",
)
Comment thread
omer9564 marked this conversation as resolved.
async with _lock:
if not opal_client.opal_server_connectivity_disabled:
return ConnectivityActionResult(status="already_enabled")
try:
await opal_client.enable_opal_server_connectivity()
except Exception:
logger.exception("Failed to enable control plane connectivity")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to enable control plane connectivity",
) from None
return ConnectivityActionResult(status="enabled")

@router.post(
"/connectivity/disable",
status_code=status.HTTP_200_OK,
response_model=ConnectivityActionResult,
responses={
400: {"description": "Offline mode is not enabled"},
500: {"description": "Failed to disable control plane connectivity"},
},
summary="Disable control plane connectivity",
description="Stops the policy and data updaters, disconnecting from the control plane. "
"Requires offline mode to be enabled. The policy store continues serving from its current state.",
)
async def disable_connectivity():
if not opal_client.offline_mode_enabled:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot disable control plane connectivity: offline mode is not enabled",
)
async with _lock:
if opal_client.opal_server_connectivity_disabled:
return ConnectivityActionResult(status="already_disabled")
try:
await opal_client.disable_opal_server_connectivity()
except Exception:
logger.exception("Failed to disable control plane connectivity")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to disable control plane connectivity",
) from None
return ConnectivityActionResult(status="disabled")

return router
Comment thread
omer9564 marked this conversation as resolved.
19 changes: 19 additions & 0 deletions horizon/pdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from horizon.authentication import enforce_pdp_token
from horizon.config import MOCK_API_KEY, sidecar_config
from horizon.connectivity.api import init_connectivity_router
from horizon.enforcer.api import init_enforcer_api_router, stats_manager
from horizon.enforcer.opa.config_maker import (
get_opa_authz_policy_file_path,
Expand Down Expand Up @@ -148,6 +149,7 @@ def __init__(self):

self._configure_opal_data_updater()
self._configure_opal_offline_mode()
self._configure_opal_server_connectivity()

if sidecar_config.PRINT_CONFIG_ON_STARTUP:
logger.info(
Expand Down Expand Up @@ -326,6 +328,16 @@ def _configure_opal_offline_mode(self):
Path(sidecar_config.OFFLINE_MODE_BACKUP_DIR) / sidecar_config.OFFLINE_MODE_POLICY_BACKUP_FILENAME
)

def _configure_opal_server_connectivity(self):
"""
configure control plane connectivity when offline mode is enabled.
When both offline mode and connectivity disabled are set, the PDP starts
disconnected from the control plane and serves from a local backup.
"""
opal_client_config.DEFAULT_OPAL_SERVER_CONNECTIVITY_DISABLED = (
sidecar_config.ENABLE_OFFLINE_MODE and sidecar_config.CONTROL_PLANE_CONNECTIVITY_DISABLED
)

def _fix_data_topics(self) -> list[str]:
"""
This is a worksaround for the following issue:
Expand Down Expand Up @@ -411,6 +423,13 @@ def _configure_api_routes(self, app: FastAPI):
include_in_schema=False,
dependencies=[Depends(enforce_pdp_token)],
)
if sidecar_config.ENABLE_OFFLINE_MODE:
connectivity_router = init_connectivity_router(self._opal)
app.include_router(
connectivity_router,
tags=["Control Plane Connectivity"],
dependencies=[Depends(enforce_pdp_token)],
)

# TODO: remove this when clients update sdk version (legacy routes)
@app.post(
Expand Down
119 changes: 119 additions & 0 deletions horizon/tests/test_connectivity_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from unittest.mock import AsyncMock, PropertyMock

from fastapi import FastAPI
from fastapi.testclient import TestClient
from horizon.authentication import enforce_pdp_token
from horizon.connectivity.api import init_connectivity_router


def _noop_auth():
pass


def _create_test_app(opal_client_mock):
"""Create a test FastAPI app with the connectivity router (no auth)."""
app = FastAPI()
router = init_connectivity_router(opal_client_mock)
app.include_router(router)
app.dependency_overrides[enforce_pdp_token] = _noop_auth
return app


def _make_opal_mock(*, offline_mode_enabled=True, connectivity_disabled=False):
mock = AsyncMock()
type(mock).offline_mode_enabled = PropertyMock(return_value=offline_mode_enabled)
type(mock).opal_server_connectivity_disabled = PropertyMock(return_value=connectivity_disabled)
return mock


class TestGetConnectivityStatus:
def test_returns_status(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
client = TestClient(_create_test_app(mock))

resp = client.get("/control-plane/connectivity")
assert resp.status_code == 200
data = resp.json()
assert data["control_plane_connectivity_disabled"] is True
assert data["offline_mode_enabled"] is True

def test_returns_status_when_connected(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
client = TestClient(_create_test_app(mock))

resp = client.get("/control-plane/connectivity")
assert resp.status_code == 200
data = resp.json()
assert data["control_plane_connectivity_disabled"] is False


class TestEnableConnectivity:
def test_enable_success(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/enable")
assert resp.status_code == 200
assert resp.json()["status"] == "enabled"
mock.enable_opal_server_connectivity.assert_awaited_once()

def test_enable_already_enabled(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/enable")
assert resp.status_code == 200
assert resp.json()["status"] == "already_enabled"
mock.enable_opal_server_connectivity.assert_not_awaited()

def test_enable_returns_400_when_offline_mode_disabled(self):
mock = _make_opal_mock(offline_mode_enabled=False)
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/enable")
assert resp.status_code == 400

def test_enable_returns_500_on_opal_error(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
mock.enable_opal_server_connectivity.side_effect = RuntimeError("boom")
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/enable")
assert resp.status_code == 500
assert "Failed to enable" in resp.json()["detail"]


class TestDisableConnectivity:
def test_disable_success(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/disable")
assert resp.status_code == 200
assert resp.json()["status"] == "disabled"
mock.disable_opal_server_connectivity.assert_awaited_once()

def test_disable_already_disabled(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/disable")
assert resp.status_code == 200
assert resp.json()["status"] == "already_disabled"
mock.disable_opal_server_connectivity.assert_not_awaited()

def test_disable_returns_400_when_offline_mode_disabled(self):
mock = _make_opal_mock(offline_mode_enabled=False)
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/disable")
assert resp.status_code == 400

def test_disable_returns_500_on_opal_error(self):
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
mock.disable_opal_server_connectivity.side_effect = RuntimeError("boom")
client = TestClient(_create_test_app(mock))

resp = client.post("/control-plane/connectivity/disable")
assert resp.status_code == 500
assert "Failed to disable" in resp.json()["detail"]
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ httpx>=0.27.0,<1
# google-re2 # use re2 instead of re for regex matching because it's simiplier and safer for user inputted regexes
protobuf>=6.33.5 # pinned to avoid CVE-2026-0994
cryptography>=46.0.5,<47 # pinned to avoid CVE-2026-26007
opal-common==0.8.3
opal-client==0.8.3
opal-common==0.9.4
opal-client==0.9.4
Loading