Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 1 addition & 28 deletions backend/app/api/routes/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
from fastapi import APIRouter, Request
from sse_starlette.sse import EventSourceResponse

from app.domain.sse import SSEHealthDomain
from app.schemas_pydantic.sse import (
ShutdownStatusResponse,
SSEExecutionEventData,
SSEHealthResponse,
SSENotificationEventData,
)
from app.schemas_pydantic.sse import SSEExecutionEventData, SSENotificationEventData
from app.services.auth_service import AuthService
from app.services.sse.sse_service import SSEService

Expand Down Expand Up @@ -38,24 +32,3 @@ async def execution_events(
return EventSourceResponse(
sse_service.create_execution_stream(execution_id=execution_id, user_id=current_user.user_id)
)


@router.get("/health", response_model=SSEHealthResponse)
async def sse_health(
request: Request,
sse_service: FromDishka[SSEService],
auth_service: FromDishka[AuthService],
) -> SSEHealthResponse:
"""Get SSE service health status."""
_ = await auth_service.get_current_user(request)
domain: SSEHealthDomain = await sse_service.get_health_status()
return SSEHealthResponse(
status=domain.status,
kafka_enabled=domain.kafka_enabled,
active_connections=domain.active_connections,
active_executions=domain.active_executions,
active_consumers=domain.active_consumers,
max_connections_per_user=domain.max_connections_per_user,
shutdown=ShutdownStatusResponse(**vars(domain.shutdown)),
timestamp=domain.timestamp,
)
33 changes: 6 additions & 27 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from contextlib import asynccontextmanager
from typing import AsyncGenerator

import redis.asyncio as redis
Expand All @@ -15,7 +15,6 @@
from app.core.startup import initialize_rate_limits
from app.core.tracing import init_tracing
from app.db.docs import ALL_DOCUMENTS
from app.events.event_store_consumer import EventStoreConsumer
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.notification_service import NotificationService
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
Expand Down Expand Up @@ -76,43 +75,23 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
)

# Phase 1: Resolve all DI dependencies in parallel
(
schema_registry,
database,
redis_client,
rate_limit_metrics,
sse_bridge,
event_store_consumer,
notification_service,
) = await asyncio.gather(
# Resolve DI dependencies in parallel (fail fast on config issues)
schema_registry, database, redis_client, rate_limit_metrics, _, _ = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(SSEKafkaRedisBridge),
container.get(EventStoreConsumer),
container.get(NotificationService),
)

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
# Initialize infrastructure in parallel
await asyncio.gather(
initialize_event_schemas(schema_registry),
init_beanie(database=database, document_models=ALL_DOCUMENTS),
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
)
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")

# Phase 3: Start Kafka consumers in parallel (providers already started them via async with,
# but __aenter__ is idempotent so this is safe and explicit)
async with AsyncExitStack() as stack:
stack.push_async_callback(sse_bridge.aclose)
stack.push_async_callback(event_store_consumer.aclose)
stack.push_async_callback(notification_service.aclose)
await asyncio.gather(
sse_bridge.__aenter__(),
event_store_consumer.__aenter__(),
notification_service.__aenter__(),
)
logger.info("SSE bridge, EventStoreConsumer, and NotificationService started")
yield
yield
# Container close handles all cleanup automatically
62 changes: 0 additions & 62 deletions backend/app/core/lifecycle.py

This file was deleted.

Loading
Loading