-
Notifications
You must be signed in to change notification settings - Fork 0
chore: moved init stuff to DI #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughComprehensive refactor moving service lifecycle management from LifecycleEnabled base class and Kafka consumers into Dishka DI providers, replacing in-memory state with Redis-backed repositories, and converting services to stateless event handlers accepting dependencies via constructors. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
3366a1f to
9271f12
Compare
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
14 issues found across 59 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/event_bus.py">
<violation number="1" location="backend/app/services/event_bus.py:138">
P2: Synchronous handlers run directly on the event loop, which can block async processing. Offload sync handlers to a thread executor to avoid stalling Kafka message handling.</violation>
</file>
<file name="backend/workers/run_pod_monitor.py">
<violation number="1" location="backend/workers/run_pod_monitor.py:41">
P2: Ensure cleanup runs even when the consumer loop is cancelled or errors. Wrap the loop in a try/finally so container.close executes on shutdown.</violation>
</file>
<file name="backend/app/db/repositories/resource_repository.py">
<violation number="1" location="backend/app/db/repositories/resource_repository.py:145">
P1: Validate resource requests are non-negative before allocation. Negative values would increase the available pool, allowing callers to mint resources.</violation>
</file>
<file name="backend/workers/run_coordinator.py">
<violation number="1" location="backend/workers/run_coordinator.py:42">
P2: Ensure container cleanup runs even if the consumer loop raises by wrapping the loop in a try/finally and moving `container.close()` into the finally block.</violation>
</file>
<file name="backend/workers/run_k8s_worker.py">
<violation number="1" location="backend/workers/run_k8s_worker.py:41">
P2: Wrap the consume loop in `try/finally` so the DI container is closed even when message handling raises, otherwise resources remain open and the worker can leak connections on failures.</violation>
</file>
<file name="backend/app/services/pod_monitor/monitor.py">
<violation number="1" location="backend/app/services/pod_monitor/monitor.py:187">
P2: Pod events are no longer persisted to the EventRepository before producing to Kafka. This removes the audit copy previously stored via KafkaEventService and can break consumers that rely on stored events.</violation>
</file>
<file name="backend/workers/run_event_replay.py">
<violation number="1" location="backend/workers/run_event_replay.py:33">
P2: On shutdown (cancellation), `await asyncio.Event().wait()` raises `CancelledError` and the container is never closed, leaking resources. Wrap the wait in a try/finally so `container.close()` always runs.</violation>
</file>
<file name="backend/app/events/core/consumer.py">
<violation number="1" location="backend/app/events/core/consumer.py:47">
P2: Guard against missing Kafka headers. This line will crash on messages produced without headers (producer sends `headers=None`). Use a fallback for `msg.headers` and decode only when a value is present.</violation>
</file>
<file name="backend/app/db/repositories/execution_queue_repository.py">
<violation number="1" location="backend/app/db/repositories/execution_queue_repository.py:72">
P2: Queue size and per-user limits are checked outside an atomic transaction, so concurrent enqueues can exceed the limits despite the checks.</violation>
<violation number="2" location="backend/app/db/repositories/execution_queue_repository.py:130">
P2: When queue data is missing, the user count is never decremented, which can permanently inflate per-user limits and block future enqueues.</violation>
<violation number="3" location="backend/app/db/repositories/execution_queue_repository.py:221">
P1: cleanup_stale will delete non-stale entries for priorities 0–9 because the threshold uses BACKGROUND’s priority offset, making the score range include all higher-priority items regardless of timestamp.</violation>
</file>
<file name="backend/workers/run_saga_orchestrator.py">
<violation number="1" location="backend/workers/run_saga_orchestrator.py:41">
P2: Cleanup is not guaranteed if message handling raises or the task is cancelled. Wrap the consumer loop in try/finally so container.close() always runs.</violation>
</file>
<file name="backend/app/services/coordinator/coordinator.py">
<violation number="1" location="backend/app/services/coordinator/coordinator.py:115">
P2: After cancellation, the coordinator doesn’t attempt to schedule the next queued execution. With the scheduling loop removed, this can leave the queue stalled until another event triggers scheduling. Consider calling `_try_schedule_next()` after cancellation cleanup.</violation>
</file>
<file name="backend/tests/e2e/test_k8s_worker_create_pod.py">
<violation number="1" location="backend/tests/e2e/test_k8s_worker_create_pod.py:23">
P2: Using the DI-managed worker ignores `test_settings.K8S_NAMESPACE`; the worker’s `K8sWorkerConfig` namespace comes from the environment, so the test may create resources in a different namespace than the one it asserts/cleans up. This can lead to flaky failures or leaked resources when test settings differ from the env.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| requested_memory_mb = requested_memory_mb or default_memory | ||
|
|
||
| # Apply limits | ||
| requested_cpu = min(requested_cpu, self._max_cpu_per_exec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Validate resource requests are non-negative before allocation. Negative values would increase the available pool, allowing callers to mint resources.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/db/repositories/resource_repository.py, line 145:
<comment>Validate resource requests are non-negative before allocation. Negative values would increase the available pool, allowing callers to mint resources.</comment>
<file context>
@@ -0,0 +1,300 @@
+ requested_memory_mb = requested_memory_mb or default_memory
+
+ # Apply limits
+ requested_cpu = min(requested_cpu, self._max_cpu_per_exec)
+ requested_memory_mb = min(requested_memory_mb, self._max_memory_per_exec)
+
</file context>
| async def cleanup_stale(self) -> int: | ||
| """Remove stale entries. Returns count removed. Call periodically.""" | ||
| removed = 0 | ||
| threshold_score = QueuePriority.BACKGROUND.value * 1e12 + (time.time() - self.stale_timeout_seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: cleanup_stale will delete non-stale entries for priorities 0–9 because the threshold uses BACKGROUND’s priority offset, making the score range include all higher-priority items regardless of timestamp.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/db/repositories/execution_queue_repository.py, line 221:
<comment>cleanup_stale will delete non-stale entries for priorities 0–9 because the threshold uses BACKGROUND’s priority offset, making the score range include all higher-priority items regardless of timestamp.</comment>
<file context>
@@ -0,0 +1,234 @@
+ async def cleanup_stale(self) -> int:
+ """Remove stale entries. Returns count removed. Call periodically."""
+ removed = 0
+ threshold_score = QueuePriority.BACKGROUND.value * 1e12 + (time.time() - self.stale_timeout_seconds)
+
+ # Get entries older than threshold
</file context>
| if asyncio.iscoroutinefunction(handler): | ||
| await handler(event) | ||
| else: | ||
| handler(event) # type: ignore[operator] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Synchronous handlers run directly on the event loop, which can block async processing. Offload sync handlers to a thread executor to avoid stalling Kafka message handling.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/event_bus.py, line 138:
<comment>Synchronous handlers run directly on the event loop, which can block async processing. Offload sync handlers to a thread executor to avoid stalling Kafka message handling.</comment>
<file context>
@@ -35,313 +39,102 @@ class Subscription:
+ if asyncio.iscoroutinefunction(handler):
+ await handler(event)
+ else:
+ handler(event) # type: ignore[operator]
+ except Exception as e:
+ self._logger.error(f"Handler failed for {event.event_type}: {e}")
</file context>
| handler(event) # type: ignore[operator] | |
| await asyncio.to_thread(handler, event) |
| async for msg in kafka_consumer: | ||
| await handler.handle(msg) | ||
| await kafka_consumer.commit() | ||
|
|
||
| logger.info("PodMonitor shutdown complete") | ||
|
|
||
| await container.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Ensure cleanup runs even when the consumer loop is cancelled or errors. Wrap the loop in a try/finally so container.close executes on shutdown.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_pod_monitor.py, line 41:
<comment>Ensure cleanup runs even when the consumer loop is cancelled or errors. Wrap the loop in a try/finally so container.close executes on shutdown.</comment>
<file context>
@@ -29,27 +33,18 @@ async def run_pod_monitor(settings: Settings) -> None:
+
+ logger.info("PodMonitor started, consuming events...")
+
+ async for msg in kafka_consumer:
+ await handler.handle(msg)
+ await kafka_consumer.commit()
</file context>
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| logger.info("PodMonitor shutdown complete") | |
| await container.close() | |
| try: | |
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| finally: | |
| logger.info("PodMonitor shutdown complete") | |
| await container.close() |
| async for msg in kafka_consumer: | ||
| await handler.handle(msg) | ||
| await kafka_consumer.commit() | ||
|
|
||
| logger.info("ExecutionCoordinator shutdown complete") | ||
|
|
||
| await container.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Ensure container cleanup runs even if the consumer loop raises by wrapping the loop in a try/finally and moving container.close() into the finally block.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_coordinator.py, line 42:
<comment>Ensure container cleanup runs even if the consumer loop raises by wrapping the loop in a try/finally and moving `container.close()` into the finally block.</comment>
<file context>
@@ -27,27 +34,18 @@ async def run_coordinator(settings: Settings) -> None:
+
+ logger.info("ExecutionCoordinator started, consuming events...")
+
+ async for msg in kafka_consumer:
+ await handler.handle(msg)
+ await kafka_consumer.commit()
</file context>
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| logger.info("ExecutionCoordinator shutdown complete") | |
| await container.close() | |
| try: | |
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| finally: | |
| logger.info("ExecutionCoordinator shutdown complete") | |
| await container.close() |
| ) -> tuple[bool, int | None, str | None]: | ||
| """Add execution to queue. Returns (success, position, error).""" | ||
| # Check queue size | ||
| queue_size = await self._redis.zcard(self.QUEUE_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Queue size and per-user limits are checked outside an atomic transaction, so concurrent enqueues can exceed the limits despite the checks.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/db/repositories/execution_queue_repository.py, line 72:
<comment>Queue size and per-user limits are checked outside an atomic transaction, so concurrent enqueues can exceed the limits despite the checks.</comment>
<file context>
@@ -0,0 +1,234 @@
+ ) -> tuple[bool, int | None, str | None]:
+ """Add execution to queue. Returns (success, position, error)."""
+ # Check queue size
+ queue_size = await self._redis.zcard(self.QUEUE_KEY)
+ if queue_size >= self.max_queue_size:
+ return False, None, "Queue is full"
</file context>
| data_key = f"{self.DATA_KEY_PREFIX}:{execution_id}" | ||
| raw_data = await self._redis.hgetall(data_key) # type: ignore[misc] | ||
|
|
||
| if not raw_data: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: When queue data is missing, the user count is never decremented, which can permanently inflate per-user limits and block future enqueues.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/db/repositories/execution_queue_repository.py, line 130:
<comment>When queue data is missing, the user count is never decremented, which can permanently inflate per-user limits and block future enqueues.</comment>
<file context>
@@ -0,0 +1,234 @@
+ data_key = f"{self.DATA_KEY_PREFIX}:{execution_id}"
+ raw_data = await self._redis.hgetall(data_key) # type: ignore[misc]
+
+ if not raw_data:
+ # Data expired or missing, skip this entry
+ self._logger.warning(f"Queue entry {execution_id} has no data, skipping")
</file context>
| logger.info("SagaOrchestrator started, consuming events...") | ||
|
|
||
| logger.info("Saga orchestrator started and running") | ||
| async for msg in kafka_consumer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Cleanup is not guaranteed if message handling raises or the task is cancelled. Wrap the consumer loop in try/finally so container.close() always runs.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_saga_orchestrator.py, line 41:
<comment>Cleanup is not guaranteed if message handling raises or the task is cancelled. Wrap the consumer loop in try/finally so container.close() always runs.</comment>
<file context>
@@ -27,27 +33,18 @@ async def run_saga_orchestrator(settings: Settings) -> None:
+ logger.info("SagaOrchestrator started, consuming events...")
- logger.info("Saga orchestrator started and running")
+ async for msg in kafka_consumer:
+ await handler.handle(msg)
+ await kafka_consumer.commit()
</file context>
| del self._execution_resources[execution_id] | ||
| # Update metrics | ||
| count = await self._state_repo.get_active_count() | ||
| self._metrics.update_coordinator_active_executions(count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: After cancellation, the coordinator doesn’t attempt to schedule the next queued execution. With the scheduling loop removed, this can leave the queue stalled until another event triggers scheduling. Consider calling _try_schedule_next() after cancellation cleanup.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/coordinator/coordinator.py, line 115:
<comment>After cancellation, the coordinator doesn’t attempt to schedule the next queued execution. With the scheduling loop removed, this can leave the queue stalled until another event triggers scheduling. Consider calling `_try_schedule_next()` after cancellation cleanup.</comment>
<file context>
@@ -20,391 +29,219 @@
- del self._execution_resources[execution_id]
+ # Update metrics
+ count = await self._state_repo.get_active_count()
+ self._metrics.update_coordinator_active_executions(count)
- self._active_executions.discard(execution_id)
</file context>
| self._metrics.update_coordinator_active_executions(count) | |
| self._metrics.update_coordinator_active_executions(count) | |
| await self._try_schedule_next() |
| idem: IdempotencyManager = await scope.get(IdempotencyManager) | ||
| event_metrics: EventMetrics = await scope.get(EventMetrics) | ||
| # Get worker from DI (already configured with dependencies) | ||
| worker: KubernetesWorker = await scope.get(KubernetesWorker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Using the DI-managed worker ignores test_settings.K8S_NAMESPACE; the worker’s K8sWorkerConfig namespace comes from the environment, so the test may create resources in a different namespace than the one it asserts/cleans up. This can lead to flaky failures or leaked resources when test settings differ from the env.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/e2e/test_k8s_worker_create_pod.py, line 23:
<comment>Using the DI-managed worker ignores `test_settings.K8S_NAMESPACE`; the worker’s `K8sWorkerConfig` namespace comes from the environment, so the test may create resources in a different namespace than the one it asserts/cleans up. This can lead to flaky failures or leaked resources when test settings differ from the env.</comment>
<file context>
@@ -25,27 +19,10 @@ async def test_worker_creates_configmap_and_pod(
- idem: IdempotencyManager = await scope.get(IdempotencyManager)
- event_metrics: EventMetrics = await scope.get(EventMetrics)
+ # Get worker from DI (already configured with dependencies)
+ worker: KubernetesWorker = await scope.get(KubernetesWorker)
- cfg = K8sWorkerConfig(namespace=ns, max_concurrent_pods=1)
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 17
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
backend/app/services/idempotency/middleware.py (1)
50-56: Type mismatch:key_strategyisstrbutIdempotencyManagerexpectsKeyStrategyenum.The pipeline failures confirm that
IdempotencyManager.check_and_reserve(),mark_completed(), andmark_failed()expectKeyStrategyenum, notstr. Either:
- Update
IdempotencyManagerto acceptstr, or- Keep
KeyStrategyenum in this module🔧 Option 1: Convert string to enum at call sites
+from app.domain.idempotency import KeyStrategy + # Check idempotency idempotency_result = await self.idempotency_manager.check_and_reserve( event=event, - key_strategy=self.key_strategy, + key_strategy=KeyStrategy(self.key_strategy), custom_key=custom_key, ttl_seconds=self.ttl_seconds, fields=self.fields, )Apply similar changes to
mark_completed()(line 81) andmark_failed()(line 87).backend/app/services/notification_service.py (1)
350-369: Throttle exemption is ineffective and non-exempt notifications get double-counted.
_create_system_for_userchecks throttling and then callscreate_notification, which throttles again. Also,throttle_exempt=Truestill gets throttled increate_notification.🛠️ Suggested fix
- async def create_notification( + async def create_notification( self, user_id: str, subject: str, body: str, tags: list[str], severity: NotificationSeverity = NotificationSeverity.MEDIUM, channel: NotificationChannel = NotificationChannel.IN_APP, scheduled_for: datetime | None = None, action_url: str | None = None, - metadata: dict[str, object] | None = None, + metadata: dict[str, object] | None = None, + skip_throttle: bool = False, ) -> DomainNotification: """Create a new notification.""" if not tags: raise NotificationValidationError("tags must be a non-empty list") @@ - if await self._throttle_cache.check_throttle( + if not skip_throttle and await self._throttle_cache.check_throttle( user_id, severity, window_hours=self._settings.NOTIF_THROTTLE_WINDOW_HOURS, max_per_hour=self._settings.NOTIF_THROTTLE_MAX_PER_HOUR, ): @@ - if not cfg.throttle_exempt: - throttled = await self._throttle_cache.check_throttle( - user_id, - cfg.severity, - window_hours=self._settings.NOTIF_THROTTLE_WINDOW_HOURS, - max_per_hour=self._settings.NOTIF_THROTTLE_MAX_PER_HOUR, - ) - if throttled: - return "throttled" - await self.create_notification( user_id=user_id, subject=title, body=str(base_context.get("message", "Alert")), severity=cfg.severity, tags=tags, channel=NotificationChannel.IN_APP, metadata=base_context, + skip_throttle=cfg.throttle_exempt, ) return "created" + except NotificationThrottledError: + return "throttled"backend/tests/e2e/test_k8s_worker_create_pod.py (1)
47-71: Ensure cleanup runs even if the test fails.If pod creation or assertions raise, cleanup won’t run and resources can leak.
🧹 Suggested fix
- pod = worker._pod_builder.build_pod_manifest(cmd) # noqa: SLF001 - await worker._create_pod(pod) # noqa: SLF001 - - # Verify resources exist - got_cm = worker._v1.read_namespaced_config_map(name=f"script-{exec_id}", namespace=ns) # noqa: SLF001 - assert got_cm is not None - got_pod = worker._v1.read_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) # noqa: SLF001 - assert got_pod is not None - - # Cleanup - worker._v1.delete_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) # noqa: SLF001 - worker._v1.delete_namespaced_config_map(name=f"script-{exec_id}", namespace=ns) # noqa: SLF001 + pod = worker._pod_builder.build_pod_manifest(cmd) # noqa: SLF001 + try: + await worker._create_pod(pod) # noqa: SLF001 + + # Verify resources exist + got_cm = worker._v1.read_namespaced_config_map(name=f"script-{exec_id}", namespace=ns) # noqa: SLF001 + assert got_cm is not None + got_pod = worker._v1.read_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) # noqa: SLF001 + assert got_pod is not None + finally: + if worker._v1 is not None: # noqa: SLF001 + try: + worker._v1.delete_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) # noqa: SLF001 + except ApiException as e: + if e.status != 404: + raise + try: + worker._v1.delete_namespaced_config_map(name=f"script-{exec_id}", namespace=ns) # noqa: SLF001 + except ApiException as e: + if e.status != 404: + raisebackend/app/services/result_processor/processor.py (1)
96-104: Fix ExecutionResultDomain metadata type (MyPy failure).
metadataexpectsEventMetadata | None, butmodel_dump()produces a dict. Either pass theEventMetadatainstance directly or widen the domain type.🔧 Suggested fix
- metadata=event.metadata.model_dump(), + metadata=event.metadata, @@ - metadata=event.metadata.model_dump(), + metadata=event.metadata, @@ - metadata=event.metadata.model_dump(), + metadata=event.metadata,Also applies to: 123-131, 154-162
🤖 Fix all issues with AI agents
In `@backend/app/core/providers.py`:
- Around line 131-170: The provider currently hardcodes limits in
RedisRepositoryProvider; update get_execution_queue_repository and
get_resource_repository to read max_queue_size, max_executions_per_user,
total_cpu_cores, and total_memory_mb from the Settings argument instead of
hardcoded literals; add those fields with sensible defaults to the Settings
class (e.g., max_queue_size, max_executions_per_user, total_cpu_cores,
total_memory_mb), then pass settings.max_queue_size and
settings.max_executions_per_user into ExecutionQueueRepository and
settings.total_cpu_cores and settings.total_memory_mb into ResourceRepository
and keep existing initialize() call.
In `@backend/app/db/repositories/execution_queue_repository.py`:
- Around line 218-229: The cleanup_stale method computes a single huge
threshold_score that causes all non-BACKGROUND priorities to be considered
stale; update cleanup_stale to iterate over each QueuePriority and for each
priority compute min_score = priority.value * 1e12 and max_score =
priority.value * 1e12 + (time.time() - self.stale_timeout_seconds) (or
alternatively fetch and check the stored _enqueue_timestamp in the entry hash)
then call zrangebyscore with that per-priority range, decode entries and call
self.remove(execution_id) as before; ensure you still limit batch size (num=100)
and aggregate removed count.
- Around line 71-101: The current enqueue logic performs zcard and hincrby(...,
0) checks outside the pipeline which allows race conditions; change the
implementation in the enqueue routine to perform the queue-size and
per-user-count checks and the zadd + data hset/hincrby/expire atomically by
moving them into a single Redis Lua script (or by using WATCH on QUEUE_KEY and
USER_COUNT_KEY then MULTI/EXEC). Specifically, include the zcard(QUEUE_KEY) and
hincrby(USER_COUNT_KEY, user_id, 0) checks, the zadd(QUEUE_KEY, execution_id,
score), the creation of data_key and its hset/expire, and the
hincrby(USER_COUNT_KEY, user_id, 1) increment inside the atomic operation so the
old pipeline usage (pipe.zadd, pipe.hset, pipe.expire, pipe.hincrby) is executed
only from within the Lua script or guarded MULTI after WATCH; ensure the script
returns appropriate failure/success values (matching the existing
False/None/"Queue is full" or limit-exceeded responses) so the calling code can
handle errors the same way.
In `@backend/app/db/repositories/resource_repository.py`:
- Around line 216-236: The release method has a race where it reads the
allocation then updates the pool separately; change release to perform the read,
increment and delete atomically via a Redis Lua script (similar to allocate()):
create a script that takes KEYS[1]=self.POOL_KEY and KEYS[2]=alloc_key, reads
'cpu','memory','gpu' from the alloc hash (coerce to numbers, default 0), returns
0 if all zero, otherwise calls HINCRBYFLOAT/HINCRBY on the pool fields and DEL
on the alloc key and returns 1; in release() call that script with the two keys
(using self._redis.eval or the client script API), interpret the numeric return
as the boolean result and remove the existing pipeline-based logic and prior
hgetall read. Ensure you reference ALLOC_KEY_PREFIX when building alloc_key and
keep behavior of logging/returning False when script returns 0.
- Around line 117-193: The allocate method currently does a non-atomic
Python-side existence check before running the Lua script; move that existence
check into the Lua script by testing if alloc_key exists (redis.call('EXISTS',
alloc_key)) at the top of lua_script and return a distinct code (e.g., 2) when
it does, keep returning 0 for insufficient resources and 1 for successful
allocation; then update the Python allocate method (the result handling after
self._redis.eval) to treat result == 2 by reading the existing allocation from
alloc_key and returning a ResourceAllocation built from those fields (same
parsing logic used earlier), otherwise proceed as before for success/failure.
- Around line 174-176: alloc_key entries are being set with a 7200s TTL but pool
resource counters (updated in the allocation code around alloc_key and the pool
keys) are never restored if release() isn't called, causing permanent capacity
leaks; add a reconciliation background task (e.g., reclaim_expired_allocations
or reconcile_expired_allocations) that periodically scans redis for keys
matching "alloc_key:*", detects expired or missing allocation metadata and
atomically restores the pool counters (cpu, memory, gpu) by incrementing the
corresponding pool hash fields, using Redis transactions or a Lua script to
avoid races, and mark or remove reclaimed alloc_key entries to make reclamation
idempotent; alternatively add TTLs to pool keys and implement a periodic
reconcile_pools task that verifies sums against active allocations and fixes
discrepancies—update resource_repository functions (alloc_key handling and
release()) to cooperate with the new reconciler.
In `@backend/app/services/coordinator/coordinator.py`:
- Around line 194-204: The current flow dequeues an execution and, on allocation
failure, removes its claim and requeues it via self._state_repo.remove and
self._queue_repo.enqueue, which loses original queue position and can cause
busy-loops; change this by checking resource availability before dequeuing (use
or add a can_allocate or resources_available check in the coordinator) or move
the execution into a "waiting for resources" state instead of re-enqueueing at
the tail, and if you must requeue implement exponential/backoff delay before
calling self._queue_repo.enqueue or mark the item with a waiting flag so dequeue
logic preserves position and avoids tight requeue cycles (update the code paths
around dequeue, allocation, allocation, _state_repo.remove, and
_queue_repo.enqueue accordingly).
- Around line 97-100: handle_execution_requested currently logs exceptions and
records metrics but does not notify downstream systems; update the except block
in handle_execution_requested to call self._publish_scheduling_failed(event,
str(e)) before/after recording the metric so an ExecutionFailedEvent is
published (mirror the behavior in _schedule_execution), keeping the existing
self._logger.error and
self._metrics.record_coordinator_execution_scheduled("error") calls.
In `@backend/app/services/event_bus.py`:
- Around line 74-82: The current try/except around self._producer.send_and_wait
in event_bus (the block catching Exception as e) only logs failures and swallows
the exception; update the error-handling so callers receive failure info —
either re-raise the caught exception after logging or change the method
signature to return a boolean and return False on failure (and True on success);
locate the send_and_wait call and its except Exception as e handler in the
publish/publish_event method, add the chosen behavior (re-raise e after
self._logger.error(...) or return False), and ensure successful paths return
True if using the success indicator approach.
In `@backend/app/services/notification_service.py`:
- Around line 170-174: The truncation code assumes event_data["stdout"] and
["stderr"] are strings but they can be None; update the logic around event_data
(produced by event.model_dump) to guard against None before slicing—e.g.,
replace direct slicing with a safe truncation that uses a default empty string
when the key is None or missing, and apply the same guard for both "stdout" and
"stderr" to prevent TypeError in NotificationService (look for event_data,
"stdout", and "stderr" usage in notification_service.py).
In `@backend/tests/unit/services/pod_monitor/test_monitor.py`:
- Line 15: Replace the incorrect import of ResourceUsageDomain in
test_monitor.py: locate the module that actually defines/exports
ResourceUsageDomain (instead of app.domain.execution.models) and import
ResourceUsageDomain from that module (update the import statement that currently
references ResourceUsageDomain). Ensure the import uses the exact exported
symbol name ResourceUsageDomain so MyPy recognizes it.
In `@backend/tests/unit/services/sse/test_sse_service.py`:
- Around line 13-14: The import for ResourceUsageDomain is incorrect: update the
import line that currently pulls DomainExecution and ResourceUsageDomain from
app.domain.execution so that DomainExecution remains imported from
app.domain.execution and ResourceUsageDomain is imported from
app.domain.events.typed; specifically adjust the import statement referencing
DomainExecution and ResourceUsageDomain in test_sse_service.py so
DomainExecution stays from app.domain.execution and ResourceUsageDomain is
imported from app.domain.events.typed (keeping SSEExecutionStatusDomain and
ShutdownStatus as-is).
In `@backend/workers/run_coordinator.py`:
- Around line 37-48: The async consumer loop in ExecutionCoordinator using
kafka_consumer and handler can raise exceptions which skip cleanup; wrap the
async for msg in kafka_consumer loop in a try/finally so that await
container.close() (and any kafka_consumer commit/close if needed) always runs,
move the "ExecutionCoordinator shutdown complete" log into the finally block,
and ensure resource-specific cleanup for AIOKafkaConsumer and UnifiedConsumer
happens before closing the container (referencing kafka_consumer, handler,
container, AIOKafkaConsumer, UnifiedConsumer).
In `@backend/workers/run_event_replay.py`:
- Around line 32-35: The forever-blocking await asyncio.Event().wait() prevents
container.close() from ever running; replace the anonymous Event with a named
asyncio.Event (e.g., shutdown_event), register signal handlers for
SIGINT/SIGTERM that set shutdown_event (using loop.add_signal_handler or
signal.signal fallback), await shutdown_event.wait() in run_event_replay.py,
then call await container.close() after the wait so the container is closed
during graceful shutdown; reference the existing asyncio.Event().wait() and
container.close() calls when making the change.
In `@backend/workers/run_k8s_worker.py`:
- Around line 36-47: The consumer loop can raise in handler.handle() or
kafka_consumer.commit() and skip cleanup; wrap the async for msg in
kafka_consumer loop in a try/finally so that await container.close() (and any
other shutdown steps) always run. Specifically, enclose the async for ...: await
handler.handle(msg); await kafka_consumer.commit() inside try: ... finally:
await container.close() and keep the logger.info("KubernetesWorker shutdown
complete") inside the finally; apply the same pattern to the other worker
modules (run_result_processor.py, run_saga_orchestrator.py, run_coordinator.py,
run_pod_monitor.py) referencing kafka_consumer, UnifiedConsumer.handle,
kafka_consumer.commit, and container.close where present.
In `@backend/workers/run_result_processor.py`:
- Around line 36-47: Wrap the consumer loop in a try/finally so cleanup always
runs: start by acquiring kafka_consumer (AIOKafkaConsumer) and handler
(UnifiedConsumer) as before, then run the async for msg in kafka_consumer /
await handler.handle(msg) loop inside a try block; in the finally block ensure
you stop/close the kafka_consumer (call kafka_consumer.stop() or equivalent) and
then await container.close(); re-raise cancellation errors if needed so signal
handling still works. This guarantees the consumer and container are closed even
if handler.handle() raises or the task is cancelled.
In `@backend/workers/run_saga_orchestrator.py`:
- Around line 41-47: Add graceful shutdown and per-message error resilience
around the kafka_consumer loop: install asyncio signal handlers for
SIGINT/SIGTERM that set an asyncio.Event (e.g., shutdown_event) and make the
async for msg in kafka_consumer loop respect that event so it stops cleanly;
wrap message processing so each iteration calls await handler.handle(msg) inside
a try/except that logs the exception via logger.error (including exception
details) and continues (optionally still commit offsets on successful processing
via kafka_consumer.commit), and wrap the whole consumer loop in try/finally to
ensure await container.close() and any kafka_consumer stop/close are always
awaited on shutdown. Reference symbols: kafka_consumer, handler.handle,
kafka_consumer.commit, container.close, logger.
🟡 Minor comments (9)
backend/tests/e2e/idempotency/test_consumer_idempotent.py-81-97 (1)
81-97: Test bypassesIdempotentConsumerWrapper- idempotency not exercised.The
IdempotentConsumerWrapperis instantiated but the consume loop callshandler.handle(msg)directly (line 94), bypassing the wrapper entirely. The test verifies thatseen["n"] >= 1but doesn't verify duplicate blocking.The wrapper's
_wrap_handlers()wraps handlers in the dispatcher, buthandler.handle()dispatches through the dispatcher, which should work. However, the assertionseen["n"] >= 1doesn't verify that duplicates were blocked (should be exactly 1 if idempotency works).💡 Suggested assertion fix to verify idempotency
- assert seen["n"] >= 1 + # Both messages produced were duplicates (same event_id), so idempotency should block one + assert seen["n"] == 1, f"Expected exactly 1 (duplicates blocked), got {seen['n']}"backend/workers/run_pod_monitor.py-36-47 (1)
36-47: Guarantee cleanup on handler failures.
Ifhandler.handle()orkafka_consumer.commit()raises an exception, execution skips thecontainer.close()call. Wrap the loop withtry/finallyto ensure shutdown always runs.🔧 Suggested fix
- async for msg in kafka_consumer: - await handler.handle(msg) - await kafka_consumer.commit() - - logger.info("PodMonitor shutdown complete") - - await container.close() + try: + async for msg in kafka_consumer: + await handler.handle(msg) + await kafka_consumer.commit() + finally: + logger.info("PodMonitor shutdown complete") + await container.close()backend/tests/e2e/events/test_consumer_lifecycle.py-21-45 (1)
21-45: Ensure partitions are assigned before calling seek operations.After
await kafka_consumer.start()(line 37),assignment()may return empty because partition assignment happens asynchronously via group coordination. This causes the test to become a no-op—theif assignment:guard prevents the seek calls from executing.Add a call to trigger assignment and assert it's non-empty:
🔧 Suggested fix
await kafka_consumer.start() try: # Exercise seek functions on AIOKafkaConsumer directly + await kafka_consumer.getmany(timeout_ms=1000) assignment = kafka_consumer.assignment() - if assignment: - await kafka_consumer.seek_to_beginning(*assignment) - await kafka_consumer.seek_to_end(*assignment) + assert assignment, "Expected partition assignment" + await kafka_consumer.seek_to_beginning(*assignment) + await kafka_consumer.seek_to_end(*assignment)backend/app/db/repositories/pod_state_repository.py-138-145 (1)
138-145: Add error handling for malformed datetime strings.If Redis contains corrupted data,
datetime.fromisoformat()will raise aValueError. Consider adding defensive handling.Proposed fix
+ try: + created_at = datetime.fromisoformat(get_str("created_at")) + updated_at = datetime.fromisoformat(get_str("updated_at")) + except ValueError as e: + self._logger.warning(f"Invalid datetime in pod state for {pod_name}: {e}") + return None + return PodState( pod_name=get_str("pod_name"), execution_id=get_str("execution_id"), status=get_str("status"), - created_at=datetime.fromisoformat(get_str("created_at")), - updated_at=datetime.fromisoformat(get_str("updated_at")), + created_at=created_at, + updated_at=updated_at, metadata=metadata, )backend/app/services/event_bus.py-111-120 (1)
111-120: Local subscribers don't receive events published by the same instance.The
handle_kafka_messagefilters out messages from its own instance (source_instance == self._instance_id), which prevents feedback loops in cross-instance pub/sub. However, this means thepublishmethod only distributes to Kafka—local subscribers never receive events published within the same instance.If local subscribers should receive locally-published events, add local distribution in the
publishmethod before sending to Kafka.backend/app/events/core/producer.py-54-54 (1)
54-54: Useget_topic_for_event()helper or handle missing mappings defensively.Direct dict access at line 54 will raise
KeyErrorfor unmapped EventType values.NOTIFICATION_ALL_READis currently unmapped and defined in the EventType enum but will fail if produced. A defensive helper functionget_topic_for_event()already exists in the mappings module with fallback toSYSTEM_EVENTStopic—use it instead of direct dict access, or add explicit error handling for missing types.Recommended fix
-from app.infrastructure.kafka.mappings import EVENT_TYPE_TO_TOPIC +from app.infrastructure.kafka.mappings import get_topic_for_eventThen at line 54:
- topic = f"{self._topic_prefix}{EVENT_TYPE_TO_TOPIC[event_to_produce.event_type]}" + topic = f"{self._topic_prefix}{get_topic_for_event(event_to_produce.event_type)}"backend/app/services/pod_monitor/monitor.py-94-115 (1)
94-115: Harden malformed event handling.
Ifraw_event["object"]isNoneor not pod-like,AttributeError/TypeErrorcan escape and break the worker loop. Consider widening the catch.🩹 Suggested tweak
- except (KeyError, ValueError) as e: + except (KeyError, ValueError, TypeError, AttributeError) as e: self._logger.error(f"Invalid event format: {e}")backend/app/services/saga/saga_orchestrator.py-391-402 (1)
391-402: Same silent exception swallowing issue.This has the same issue as in
_start_saga- the bareexcept Exception: passat lines 401-402 silently ignores errors frombind_dependencies. Apply the same fix to log the warning.🐛 Proposed fix
try: saga.bind_dependencies( producer=self._producer, alloc_repo=self._alloc_repo, publish_commands=bool(getattr(self._config, "publish_commands", False)), ) - except Exception: - pass + except Exception as e: + self._logger.warning(f"Failed to bind dependencies for saga cancellation {saga_instance.saga_name}: {e}")backend/app/services/saga/saga_orchestrator.py-204-212 (1)
204-212: Silent exception swallowing hides potential configuration errors.The bare
except Exception: passat lines 211-212 silently ignores all errors frombind_dependencies. This could mask important configuration issues or bugs. At minimum, log the exception for debugging purposes.🐛 Proposed fix
try: saga.bind_dependencies( producer=self._producer, alloc_repo=self._alloc_repo, publish_commands=bool(getattr(self._config, "publish_commands", False)), ) - except Exception: - pass + except Exception as e: + self._logger.warning(f"Failed to bind dependencies for saga {saga_name}: {e}")
🧹 Nitpick comments (14)
backend/tests/unit/services/sse/test_shutdown_manager.py (1)
67-80: Consider using small non-zero timeouts to reduce test flakiness.Setting
notification_timeout=0.0andforce_close_timeout=0.0can lead to unpredictable async behavior. The permissive assertion on line 80 (phase in ("draining", "complete", "closing", "notifying")) compensates for this but doesn't verify a specific transition path.If the intent is to test rapid transitions, consider using small non-zero values (e.g.,
0.001) to ensure the async machinery has minimal but consistent time to execute.♻️ Suggested improvement
m = SSEShutdownManager( drain_timeout=0.01, - notification_timeout=0.0, - force_close_timeout=0.0, + notification_timeout=0.001, + force_close_timeout=0.001, logger=_test_logger, connection_metrics=connection_metrics, )backend/app/services/kafka_event_service.py (1)
4-4: Prefer lowercasedictfor type hints in Python 3.9+.The codebase uses modern Python syntax (e.g.,
str | Noneon line 44), so importingDictfromtypingis unnecessary. Consider usingdict[str, Any]directly for consistency.♻️ Proposed fix
-from typing import Any, Dict +from typing import AnyThen update all
Dict[...]annotations todict[...]throughout the file.backend/app/services/idempotency/middleware.py (2)
21-23: Inconsistent typing: UsingSet[str]butstrfor key_strategy.If the intent is to accept string-based strategies, consider also using
KeyStrategyenum with string conversion, or document the valid string values as constants. The current approach loses type safety without clear documentation of valid values.
5-5: Prefer lowercasesetanddictfor type hints.Similar to the kafka_event_service.py feedback, Python 3.9+ supports lowercase generics directly.
♻️ Proposed fix
-from typing import Any, Awaitable, Callable, Dict, Set +from typing import Any, Awaitable, CallableThen use
set[str],dict[...]throughout.backend/tests/e2e/events/test_consume_roundtrip.py (1)
47-76: Align auto-commit with manual commits in the test loop.
Line 53 enables auto-commit while Line 76 commits manually; this can advance offsets beforehandler.handlecompletes. Consider disabling auto-commit when you explicitly commit.🔧 Suggested fix
- enable_auto_commit=True, + enable_auto_commit=False,backend/app/services/sse/kafka_redis_bridge.py (1)
51-84: Return a copy of relevant event types and keep status ordering deterministic.This prevents accidental mutation of the module-level set and avoids flaky status output ordering.
♻️ Proposed change
`@staticmethod` def get_relevant_event_types() -> set[EventType]: """Get event types that should be routed to SSE. Helper for worker entrypoint to know which topics to subscribe to. """ - return RELEVANT_EVENT_TYPES + return set(RELEVANT_EVENT_TYPES) @@ async def get_status(self) -> dict[str, list[str]]: """Get bridge status.""" return { - "relevant_event_types": [str(et) for et in RELEVANT_EVENT_TYPES], + "relevant_event_types": sorted(str(et) for et in RELEVANT_EVENT_TYPES), }backend/tests/e2e/events/test_event_dispatcher.py (1)
47-83: Disable auto-commit when manually committing offsets.With
enable_auto_commit=Trueand explicitcommit()calls, auto-commit runs in the background every ~5000ms and may commit offsets beforehandler.handle()completes. This creates a race condition where offset commits become non-deterministic, potentially introducing flakiness in this test. Official aiokafka documentation recommends disabling auto-commit when using manual commit.♻️ Proposed change
kafka_consumer = AIOKafkaConsumer( topic, bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, group_id=group_id, - enable_auto_commit=True, + enable_auto_commit=False, auto_offset_reset="earliest", )backend/app/events/core/producer.py (1)
76-76: Consider lazy logging to avoid string formatting overhead.Using f-strings in logging statements causes the string to be formatted even when the log level is disabled. For debug/error logs in hot paths, prefer lazy formatting.
Example using lazy formatting
- self._logger.debug(f"Message [{event_to_produce}] sent to topic: {topic}") + self._logger.debug("Message [%s] sent to topic: %s", event_to_produce, topic)- self._logger.error(f"Failed to produce message: {e}") + self._logger.error("Failed to produce message: %s", e)Also applies to: 83-83
backend/app/db/repositories/pod_state_repository.py (1)
77-100:track_podoverwritescreated_aton re-tracking.When tracking a pod that was already tracked, this will overwrite the original
created_attimestamp. If preserving the original creation time is important, consider checking existence first or usingHSETNXfor thecreated_atfield.Alternative: preserve original created_at
async def track_pod( self, pod_name: str, execution_id: str, status: str, metadata: dict[str, object] | None = None, ttl_seconds: int = 7200, ) -> None: """Track a pod's state.""" key = f"{self.TRACKED_KEY_PREFIX}:{pod_name}" now = datetime.now(timezone.utc).isoformat() + # Check if already tracked to preserve original created_at + existing = await self._redis.hget(key, "created_at") + created_at = existing.decode() if existing else now + data = { "pod_name": pod_name, "execution_id": execution_id, "status": status, - "created_at": now, + "created_at": created_at, "updated_at": now, "metadata": json.dumps(metadata) if metadata else "{}", }backend/app/services/k8s_worker/worker.py (1)
283-283: Logging level should be DEBUG, not INFO.The log message showing image name sanitization (
before: {image_ref} -> {sanitized_image_ref}) is implementation detail useful for debugging, not operational info. Consider changing to DEBUG level.Suggested change
- self._logger.info(f"DAEMONSET: before: {image_ref} -> {sanitized_image_ref}") + self._logger.debug(f"Image name sanitized: {image_ref} -> {sanitized_image_ref}")backend/app/services/event_bus.py (1)
42-42: Handler type annotation is too loose.Using
objectfor the handler type loses all type safety. Consider using a properCallabletype hint for better IDE support and type checking.Suggested typing improvement
+from collections.abc import Awaitable, Callable +from typing import Union + +EventHandler = Callable[[EventBusEvent], Union[None, Awaitable[None]]] + `@dataclass` class Subscription: """Represents a single event subscription.""" id: str = field(default_factory=lambda: str(uuid4())) pattern: str = "" - handler: object = field(default=None) + handler: EventHandler | None = field(default=None)And update the subscribe signature:
- async def subscribe(self, pattern: str, handler: object) -> str: + async def subscribe(self, pattern: str, handler: EventHandler) -> str:Also applies to: 84-84
backend/app/services/saga/saga_orchestrator.py (1)
69-106: Consider extracting completion events to a class constant to avoid duplication.The
completion_eventsset is defined identically in bothget_trigger_event_types(lines 81-85) andhandle_event(lines 99-103). Extracting this to a class-level constant would prevent potential inconsistencies if one is updated without the other.♻️ Suggested refactor
class SagaOrchestrator: """Stateless saga orchestrator - pure event handler. No lifecycle methods (start/stop) - receives ready-to-use dependencies from DI. All state stored in SagaRepository. Worker entrypoint handles the consume loop. """ + + _COMPLETION_EVENTS: frozenset[EventType] = frozenset({ + EventType.EXECUTION_COMPLETED, + EventType.EXECUTION_FAILED, + EventType.EXECUTION_TIMEOUT, + })Then use
self._COMPLETION_EVENTSin both methods.backend/app/services/coordinator/coordinator.py (1)
279-292: Hardcodedestimated_wait_seconds=None- consider calculating or documenting.The
estimated_wait_secondsfield is set toNone(line 286). If this is intentional for now, consider adding a TODO comment. Otherwise, this could be calculated from queue stats and average execution time to provide useful feedback to users.backend/app/core/providers.py (1)
644-703: Config objects created inline with defaults - consider DI consistency.
K8sWorkerConfig(line 658) andPodMonitorConfig(line 694) are instantiated inline with default values. For consistency with other configuration patterns (e.g.,SagaConfigwhich uses a factory function), consider extracting these to factory functions or providing them via the Settings object if configuration is needed.This is a minor consistency suggestion - the current approach works if defaults are sufficient.
| class RedisRepositoryProvider(Provider): | ||
| """Provides Redis-backed state repositories for stateless services.""" | ||
|
|
||
| scope = Scope.APP | ||
|
|
||
| @provide | ||
| def get_execution_state_repository( | ||
| self, redis_client: redis.Redis, logger: logging.Logger | ||
| ) -> ExecutionStateRepository: | ||
| return ExecutionStateRepository(redis_client, logger) | ||
|
|
||
| @provide | ||
| def get_execution_queue_repository( | ||
| self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings | ||
| ) -> ExecutionQueueRepository: | ||
| return ExecutionQueueRepository( | ||
| redis_client, | ||
| logger, | ||
| max_queue_size=10000, | ||
| max_executions_per_user=100, | ||
| ) | ||
|
|
||
| @provide | ||
| async def get_resource_repository( | ||
| self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings | ||
| ) -> ResourceRepository: | ||
| repo = ResourceRepository( | ||
| redis_client, | ||
| logger, | ||
| total_cpu_cores=32.0, | ||
| total_memory_mb=65536, | ||
| ) | ||
| await repo.initialize() | ||
| return repo | ||
|
|
||
| @provide | ||
| def get_pod_state_repository( | ||
| self, redis_client: redis.Redis, logger: logging.Logger | ||
| ) -> PodStateRepository: | ||
| return PodStateRepository(redis_client, logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Hardcoded resource limits should be configurable via Settings.
The RedisRepositoryProvider has hardcoded values for queue and resource limits:
max_queue_size=10000(line 149)max_executions_per_user=100(line 150)total_cpu_cores=32.0(line 160)total_memory_mb=65536(line 161)
These should be pulled from Settings to allow configuration per environment without code changes.
♻️ Suggested refactor
`@provide`
def get_execution_queue_repository(
self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings
) -> ExecutionQueueRepository:
return ExecutionQueueRepository(
redis_client,
logger,
- max_queue_size=10000,
- max_executions_per_user=100,
+ max_queue_size=settings.QUEUE_MAX_SIZE,
+ max_executions_per_user=settings.QUEUE_MAX_PER_USER,
)
`@provide`
async def get_resource_repository(
self, redis_client: redis.Redis, logger: logging.Logger, settings: Settings
) -> ResourceRepository:
repo = ResourceRepository(
redis_client,
logger,
- total_cpu_cores=32.0,
- total_memory_mb=65536,
+ total_cpu_cores=settings.RESOURCE_TOTAL_CPU_CORES,
+ total_memory_mb=settings.RESOURCE_TOTAL_MEMORY_MB,
)
await repo.initialize()
return repoAdd corresponding fields to your Settings class with sensible defaults.
🤖 Prompt for AI Agents
In `@backend/app/core/providers.py` around lines 131 - 170, The provider currently
hardcodes limits in RedisRepositoryProvider; update
get_execution_queue_repository and get_resource_repository to read
max_queue_size, max_executions_per_user, total_cpu_cores, and total_memory_mb
from the Settings argument instead of hardcoded literals; add those fields with
sensible defaults to the Settings class (e.g., max_queue_size,
max_executions_per_user, total_cpu_cores, total_memory_mb), then pass
settings.max_queue_size and settings.max_executions_per_user into
ExecutionQueueRepository and settings.total_cpu_cores and
settings.total_memory_mb into ResourceRepository and keep existing initialize()
call.
| # Check queue size | ||
| queue_size = await self._redis.zcard(self.QUEUE_KEY) | ||
| if queue_size >= self.max_queue_size: | ||
| return False, None, "Queue is full" | ||
|
|
||
| # Check user limit | ||
| user_count = await self._redis.hincrby(self.USER_COUNT_KEY, user_id, 0) # type: ignore[misc] | ||
| if user_count >= self.max_executions_per_user: | ||
| return False, None, f"User execution limit exceeded ({self.max_executions_per_user})" | ||
|
|
||
| # Score: priority * 1e12 + timestamp (lower = higher priority, earlier = higher priority) | ||
| timestamp = time.time() | ||
| score = priority.value * 1e12 + timestamp | ||
|
|
||
| # Use pipeline for atomicity | ||
| pipe = self._redis.pipeline() | ||
|
|
||
| # Add to sorted set | ||
| pipe.zadd(self.QUEUE_KEY, {execution_id: score}) | ||
|
|
||
| # Store event data | ||
| data_key = f"{self.DATA_KEY_PREFIX}:{execution_id}" | ||
| event_data["_enqueue_timestamp"] = timestamp | ||
| event_data["_priority"] = priority.value | ||
| event_data["_user_id"] = user_id | ||
| pipe.hset(data_key, mapping={k: json.dumps(v) if not isinstance(v, str) else v for k, v in event_data.items()}) | ||
| pipe.expire(data_key, self.stale_timeout_seconds + 60) | ||
|
|
||
| # Increment user count | ||
| pipe.hincrby(self.USER_COUNT_KEY, user_id, 1) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "execution_queue_repository.py" -type fRepository: HardMax71/Integr8sCode
Length of output: 125
🏁 Script executed:
cat -n ./backend/app/db/repositories/execution_queue_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 10578
🏁 Script executed:
grep -r "WATCH\|EVAL\|SCRIPT\|lua" ./backend/app/db/repositories/ --include="*.py" -iRepository: HardMax71/Integr8sCode
Length of output: 8538
🏁 Script executed:
sed -n '1,100p' ./backend/app/db/repositories/resource_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 2822
🏁 Script executed:
sed -n '100,200p' ./backend/app/db/repositories/resource_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 4451
🌐 Web query:
Redis pipeline atomicity vs WATCH MULTI checks race conditions
💡 Result:
-
A Redis pipeline is not atomic by default: it’s just a client-side batching technique to reduce round trips—Redis still executes the individual commands one by one, and commands from other clients can interleave between them. [1]
- Some client libraries offer a “pipeline that is atomic” by wrapping the queued commands in
MULTI/EXEC(that’s no longer “just pipelining”; it’s a transaction sent efficiently).
- Some client libraries offer a “pipeline that is atomic” by wrapping the queued commands in
-
MULTI/EXECprovides atomic group execution: onceEXECstarts, no other client’s commands can run in the middle of the transaction’s command sequence. [2]- But Redis transactions do not provide rollback: if a command errors at runtime (e.g., wrong type), Redis still continues executing the remaining queued commands. [2]
-
Where race conditions actually happen (and how
WATCHfixes them):- The classic race is read–compute–write done from the client (e.g.,
GETthenSET) while another client modifies the key in between. [2] WATCHturnsEXECinto a CAS (check-and-set): if any watched key changes after youWATCHit and beforeEXEC, Redis aborts the transaction (EXECreturns null), and you retry. [2][3]- Key rule: you must
WATCHbefore theGETyou’ll base your update on; otherwise you still have a race window between the read and the watch. The canonical pattern is:WATCH k→GET k→MULTI→ writes →EXEC(retry if aborted). [2][3]
- The classic race is read–compute–write done from the client (e.g.,
-
Alternative for “read + conditional logic + write”: a Lua script (
EVAL) runs without other commands interleaving during its execution, giving you the same “single, isolated operation” feel asMULTI/EXECfor server-side logic. [4]
Move queue/user limit checks into a Lua script or WATCH transaction for atomicity.
The zcard and hincrby(..., 0) checks on lines 72 and 77 occur outside the pipeline, allowing concurrent requests to bypass limits. Between the check and the zadd (line 89), another client can pass its own limit checks and execute its zadd, violating global and per-user quotas.
Wrap both checks and the enqueue operation in a single atomic Lua script (or WATCH the keys, read, then MULTI/EXEC) to prevent this race condition. The codebase already uses Lua scripts in resource_repository.py for similar atomic check-and-write patterns.
🤖 Prompt for AI Agents
In `@backend/app/db/repositories/execution_queue_repository.py` around lines 71 -
101, The current enqueue logic performs zcard and hincrby(..., 0) checks outside
the pipeline which allows race conditions; change the implementation in the
enqueue routine to perform the queue-size and per-user-count checks and the zadd
+ data hset/hincrby/expire atomically by moving them into a single Redis Lua
script (or by using WATCH on QUEUE_KEY and USER_COUNT_KEY then MULTI/EXEC).
Specifically, include the zcard(QUEUE_KEY) and hincrby(USER_COUNT_KEY, user_id,
0) checks, the zadd(QUEUE_KEY, execution_id, score), the creation of data_key
and its hset/expire, and the hincrby(USER_COUNT_KEY, user_id, 1) increment
inside the atomic operation so the old pipeline usage (pipe.zadd, pipe.hset,
pipe.expire, pipe.hincrby) is executed only from within the Lua script or
guarded MULTI after WATCH; ensure the script returns appropriate failure/success
values (matching the existing False/None/"Queue is full" or limit-exceeded
responses) so the calling code can handle errors the same way.
| async def cleanup_stale(self) -> int: | ||
| """Remove stale entries. Returns count removed. Call periodically.""" | ||
| removed = 0 | ||
| threshold_score = QueuePriority.BACKGROUND.value * 1e12 + (time.time() - self.stale_timeout_seconds) | ||
|
|
||
| # Get entries older than threshold | ||
| stale_entries = await self._redis.zrangebyscore(self.QUEUE_KEY, "-inf", threshold_score, start=0, num=100) | ||
|
|
||
| for entry in stale_entries: | ||
| execution_id = entry.decode() if isinstance(entry, bytes) else entry | ||
| if await self.remove(execution_id): | ||
| removed += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and examine the execution_queue_repository.py file
fd -t f "execution_queue_repository.py" --exec wc -l {}Repository: HardMax71/Integr8sCode
Length of output: 129
🏁 Script executed:
# Search for QueuePriority enum definition
rg -t py "class QueuePriority\|enum.*QueuePriority" -A 10Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Search for the cleanup_stale method in full context
rg -t py "def cleanup_stale" -A 20Repository: HardMax71/Integr8sCode
Length of output: 1779
🏁 Script executed:
# Search for enqueue method to understand score calculation
rg -t py "def.*enqueue" -A 15Repository: HardMax71/Integr8sCode
Length of output: 1638
🏁 Script executed:
# Search for QueuePriority import or definition
rg -t py "QueuePriority" -B 3 -A 3 | head -50Repository: HardMax71/Integr8sCode
Length of output: 3986
🏁 Script executed:
# Read the entire execution_queue_repository.py file to understand the context
cat -n backend/app/db/repositories/execution_queue_repository.py | head -100Repository: HardMax71/Integr8sCode
Length of output: 3839
🏁 Script executed:
# Also search for QueuePriority definition across the codebase
rg -t py "class QueuePriority|enum.*Priority" -B 2 -A 15Repository: HardMax71/Integr8sCode
Length of output: 1410
🏁 Script executed:
# Check the remove method to see if there are additional safeguards
rg -t py "async def remove" -A 20 backend/app/db/repositories/execution_queue_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 1145
cleanup_stale deletes all non-BACKGROUND priority items regardless of age.
The threshold calculation uses QueuePriority.BACKGROUND.value * 1e12 (10 * 10^12), which is vastly larger than typical enqueued scores. For example, a CRITICAL item has score ≈ 0 * 10^12 + 1.7×10^9, which always falls below the threshold. This causes zrangebyscore to evict all CRITICAL, HIGH, NORMAL, and LOW priority entries immediately, independent of their age, while only BACKGROUND items are checked for staleness.
Use per-priority cleanup ranges: for each priority level, set min_score = priority.value * 1e12 and max_score = priority.value * 1e12 + (now - stale_timeout_seconds). Alternatively, check _enqueue_timestamp stored in the data hash before removal.
🤖 Prompt for AI Agents
In `@backend/app/db/repositories/execution_queue_repository.py` around lines 218 -
229, The cleanup_stale method computes a single huge threshold_score that causes
all non-BACKGROUND priorities to be considered stale; update cleanup_stale to
iterate over each QueuePriority and for each priority compute min_score =
priority.value * 1e12 and max_score = priority.value * 1e12 + (time.time() -
self.stale_timeout_seconds) (or alternatively fetch and check the stored
_enqueue_timestamp in the entry hash) then call zrangebyscore with that
per-priority range, decode entries and call self.remove(execution_id) as before;
ensure you still limit batch size (num=100) and aggregate removed count.
| async def allocate( | ||
| self, | ||
| execution_id: str, | ||
| language: str, | ||
| requested_cpu: float | None = None, | ||
| requested_memory_mb: int | None = None, | ||
| requested_gpu: int = 0, | ||
| ) -> ResourceAllocation | None: | ||
| """Allocate resources for execution. Returns allocation or None if insufficient.""" | ||
| # Check if already allocated | ||
| alloc_key = f"{self.ALLOC_KEY_PREFIX}:{execution_id}" | ||
| existing = await self._redis.hgetall(alloc_key) # type: ignore[misc] | ||
| if existing: | ||
| self._logger.warning(f"Execution {execution_id} already has allocation") | ||
| return ResourceAllocation( | ||
| execution_id=execution_id, | ||
| cpu_cores=float(existing.get(b"cpu", existing.get("cpu", 0))), | ||
| memory_mb=int(existing.get(b"memory", existing.get("memory", 0))), | ||
| gpu_count=int(existing.get(b"gpu", existing.get("gpu", 0))), | ||
| ) | ||
|
|
||
| # Determine requested resources | ||
| if requested_cpu is None or requested_memory_mb is None: | ||
| default_cpu, default_memory = self.DEFAULT_ALLOCATIONS.get(language, (0.5, 512)) | ||
| requested_cpu = requested_cpu or default_cpu | ||
| requested_memory_mb = requested_memory_mb or default_memory | ||
|
|
||
| # Apply limits | ||
| requested_cpu = min(requested_cpu, self._max_cpu_per_exec) | ||
| requested_memory_mb = min(requested_memory_mb, self._max_memory_per_exec) | ||
|
|
||
| # Atomic allocation using Lua script | ||
| lua_script = """ | ||
| local pool_key = KEYS[1] | ||
| local alloc_key = KEYS[2] | ||
| local req_cpu = tonumber(ARGV[1]) | ||
| local req_memory = tonumber(ARGV[2]) | ||
| local req_gpu = tonumber(ARGV[3]) | ||
| local min_cpu = tonumber(ARGV[4]) | ||
| local min_memory = tonumber(ARGV[5]) | ||
| local avail_cpu = tonumber(redis.call('HGET', pool_key, 'available_cpu') or '0') | ||
| local avail_memory = tonumber(redis.call('HGET', pool_key, 'available_memory') or '0') | ||
| local avail_gpu = tonumber(redis.call('HGET', pool_key, 'available_gpu') or '0') | ||
| local cpu_after = avail_cpu - req_cpu | ||
| local memory_after = avail_memory - req_memory | ||
| local gpu_after = avail_gpu - req_gpu | ||
| if cpu_after < min_cpu or memory_after < min_memory or gpu_after < 0 then | ||
| return 0 | ||
| end | ||
| redis.call('HSET', pool_key, 'available_cpu', tostring(cpu_after)) | ||
| redis.call('HSET', pool_key, 'available_memory', tostring(memory_after)) | ||
| redis.call('HSET', pool_key, 'available_gpu', tostring(gpu_after)) | ||
| redis.call('HSET', alloc_key, 'cpu', tostring(req_cpu), 'memory', tostring(req_memory), | ||
| 'gpu', tostring(req_gpu)) | ||
| redis.call('EXPIRE', alloc_key, 7200) | ||
| return 1 | ||
| """ | ||
|
|
||
| result = await self._redis.eval( # type: ignore[misc] | ||
| lua_script, | ||
| 2, | ||
| self.POOL_KEY, | ||
| alloc_key, | ||
| str(requested_cpu), | ||
| str(requested_memory_mb), | ||
| str(requested_gpu), | ||
| str(self._min_reserve_cpu), | ||
| str(self._min_reserve_memory), | ||
| ) | ||
|
|
||
| if not result: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n backend/app/db/repositories/resource_repository.py | head -250Repository: HardMax71/Integr8sCode
Length of output: 10876
Move the allocation existence check inside the Lua script to prevent concurrent reservation for the same execution_id.
The current implementation checks for an existing allocation on line 128 before executing the Lua script, creating a race condition. Two concurrent callers can both pass the Python check (when no allocation exists yet) and then both execute the Lua script, resulting in duplicate allocations for the same execution_id. The Lua script at lines 149–179 lacks an existence check and only validates resource availability, making the check non-atomic.
Add an EXISTS check at the start of the Lua script and return a distinct code (e.g., 2) when allocation already exists. Then handle that result in Python to fetch and return the existing allocation:
🛠️ Suggested fix (atomic existence check in Lua)
@@
- local req_gpu = tonumber(ARGV[3])
+ local req_gpu = tonumber(ARGV[3])
+ if redis.call('EXISTS', alloc_key) == 1 then
+ return 2
+ end
@@
- if not result:
+ if result == 2:
+ existing = await self._redis.hgetall(alloc_key) # type: ignore[misc]
+ self._logger.warning(f"Execution {execution_id} already has allocation")
+ return ResourceAllocation(
+ execution_id=execution_id,
+ cpu_cores=float(existing.get(b"cpu", existing.get("cpu", 0))),
+ memory_mb=int(existing.get(b"memory", existing.get("memory", 0))),
+ gpu_count=int(existing.get(b"gpu", existing.get("gpu", 0))),
+ )
+ if not result:
pool = await self._redis.hgetall(self.POOL_KEY) # type: ignore[misc]🤖 Prompt for AI Agents
In `@backend/app/db/repositories/resource_repository.py` around lines 117 - 193,
The allocate method currently does a non-atomic Python-side existence check
before running the Lua script; move that existence check into the Lua script by
testing if alloc_key exists (redis.call('EXISTS', alloc_key)) at the top of
lua_script and return a distinct code (e.g., 2) when it does, keep returning 0
for insufficient resources and 1 for successful allocation; then update the
Python allocate method (the result handling after self._redis.eval) to treat
result == 2 by reading the existing allocation from alloc_key and returning a
ResourceAllocation built from those fields (same parsing logic used earlier),
otherwise proceed as before for success/failure.
| redis.call('HSET', alloc_key, 'cpu', tostring(req_cpu), 'memory', tostring(req_memory), | ||
| 'gpu', tostring(req_gpu)) | ||
| redis.call('EXPIRE', alloc_key, 7200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the file and context around the problematic lines
wc -l backend/app/db/repositories/resource_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 120
🏁 Script executed:
# Read the file to understand the context
cat -n backend/app/db/repositories/resource_repository.py | sed -n '160,190p'Repository: HardMax71/Integr8sCode
Length of output: 1416
🏁 Script executed:
# Let's also check the broader scope to understand the allocation mechanism
cat -n backend/app/db/repositories/resource_repository.py | head -50Repository: HardMax71/Integr8sCode
Length of output: 1459
🏁 Script executed:
# Look for release or deallocate methods in the repository
rg -n "def.*release|def.*deallocate|def.*free|def.*cleanup" backend/app/db/repositories/resource_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 125
🏁 Script executed:
# Search for where alloc_key might be deleted or cleaned up
rg -n "DEL|cleanup|reclaim|sweeper|reconcil" backend/app/db/repositories/resource_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Let's read more of the file to see all methods
cat -n backend/app/db/repositories/resource_repository.py | tail -150Repository: HardMax71/Integr8sCode
Length of output: 7171
🏁 Script executed:
# Search for any background jobs or tasks that might handle cleanup
fd -t f "\.py$" backend/app | xargs rg -l "sweeper|reconcil|cleanup.*alloc|expire.*alloc" 2>/dev/null | head -20Repository: HardMax71/Integr8sCode
Length of output: 191
🏁 Script executed:
# Let's check the pod_monitor files for reconciliation logic
cat -n backend/app/services/pod_monitor/monitor.py | head -100Repository: HardMax71/Integr8sCode
Length of output: 3583
🏁 Script executed:
# Search for any cleanup or reconciliation logic related to allocations
rg -n "reclaim|cleanup|orphan|leaked|expired" backend/app/services/pod_monitor/ -A 3 -B 1Repository: HardMax71/Integr8sCode
Length of output: 459
🏁 Script executed:
# Check the entire backend for sweeper/reconciler patterns
rg -l "sweeper|reconciler|cleanup.*alloc|scan.*alloc" backend/appRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Let's see how release() is called in the codebase
rg -n "\.release\(" backend/app -B 2 -A 2 | head -60Repository: HardMax71/Integr8sCode
Length of output: 1650
🏁 Script executed:
# Let's check the coordinator more closely to understand the full lifecycle
cat -n backend/app/services/coordinator/coordinator.py | sed -n '100,160p'Repository: HardMax71/Integr8sCode
Length of output: 2952
🏁 Script executed:
# Search for any background tasks or jobs that might handle cleanup
rg -l "task|schedule|background|cron|timer|interval" backend/app --type py | xargs rg -l "alloc" 2>/dev/nullRepository: HardMax71/Integr8sCode
Length of output: 268
🏁 Script executed:
# Check if there's any handler for expired allocations or reconciliation
rg -n "scan_iter.*ALLOC|expired|orphan" backend/app -A 3 -B 1Repository: HardMax71/Integr8sCode
Length of output: 7490
🏁 Script executed:
# Look for any initialization or startup logic that might launch cleanup jobs
rg -n "startup|shutdown|lifespan|background.*task" backend/app/main.py -A 5 -B 2Repository: HardMax71/Integr8sCode
Length of output: 618
🏁 Script executed:
# Let's check the lifespan for background tasks
cat -n backend/app/core/dishka_lifespan.pyRepository: HardMax71/Integr8sCode
Length of output: 4452
🏁 Script executed:
# Look for any result processor or background job that might handle this
cat -n backend/app/services/result_processor/resource_cleaner.py | head -80Repository: HardMax71/Integr8sCode
Length of output: 3516
🏁 Script executed:
# Search the entire codebase for any background task that might reconcile Redis allocations
rg -n "background_task|create_task|gather|Task\(" backend/app --type py | grep -i "alloc\|resource" -A 2 -B 2Repository: HardMax71/Integr8sCode
Length of output: 507
🏁 Script executed:
# Check if there's any retry/recovery for execution events
rg -n "ExecutionCompleted|ExecutionFailed|ExecutionCancelled" backend/app --type py -B 2 -A 2 | head -80Repository: HardMax71/Integr8sCode
Length of output: 6401
🏁 Script executed:
# Check if there's dead letter queue or event replay for failed events
rg -n "dead.*letter|dlq|replay|retry.*event" backend/app --type py | head -30Repository: HardMax71/Integr8sCode
Length of output: 3092
🏁 Script executed:
# Let's verify that pod monitor is the ONLY source of these events
rg -n "ExecutionCompletedEvent\|ExecutionFailedEvent\|ExecutionCancelledEvent" backend/app --type py -lRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check if there's any monitoring/alerting for orphaned allocations
rg -n "allocation.*orphan\|orphan.*alloc\|leaked\|capacity.*leak" backend/app --type pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check pod_monitor more carefully - it must be creating the events
rg -n "ExecutionCompletedEvent\|ExecutionFailedEvent" backend/app/services/pod_monitor/ --type pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Verify the complete flow - from pod monitor to coordinator
rg -n "class.*Event\|@dataclass.*Event" backend/app/domain/events/typed.py -A 3 | head -40Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check if there's any timeout-based reconciliation
rg -n "timeout\|ttl\|7200" backend/app/db/repositories/resource_repository.py -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Find all files that mention ExecutionCompletedEvent or similar
find backend/app -name "*.py" -type f | xargs grep -l "ExecutionCompletedEvent" 2>/dev/nullRepository: HardMax71/Integr8sCode
Length of output: 331
🏁 Script executed:
# Check the pod_monitor event_mapper more carefully
cat -n backend/app/services/pod_monitor/event_mapper.py | grep -A 10 "def _map_completed"Repository: HardMax71/Integr8sCode
Length of output: 635
🏁 Script executed:
# Look at the complete resource_repository to see ALL methods
cat -n backend/app/db/repositories/resource_repository.py | sed -n '130,180p'Repository: HardMax71/Integr8sCode
Length of output: 2679
Allocation TTL creates permanent pool capacity leak without recovery mechanism.
The 7200-second TTL on alloc_key (line 176) will automatically expire the allocation record, but the decremented pool resources (lines 170-172) are never restored if release() is not called. While normal completions are handled via pod monitor events → coordinator release, any failure path that prevents event publication (pod undetected crash, lost event, coordinator crash before release) will permanently leak capacity.
No sweeper or reconciliation job exists to reclaim expired allocations. Either implement a background task to scan for expired alloc_key:* entries and restore pool resources, or add a TTL to the pool keys themselves with periodic reconciliation.
🤖 Prompt for AI Agents
In `@backend/app/db/repositories/resource_repository.py` around lines 174 - 176,
alloc_key entries are being set with a 7200s TTL but pool resource counters
(updated in the allocation code around alloc_key and the pool keys) are never
restored if release() isn't called, causing permanent capacity leaks; add a
reconciliation background task (e.g., reclaim_expired_allocations or
reconcile_expired_allocations) that periodically scans redis for keys matching
"alloc_key:*", detects expired or missing allocation metadata and atomically
restores the pool counters (cpu, memory, gpu) by incrementing the corresponding
pool hash fields, using Redis transactions or a Lua script to avoid races, and
mark or remove reclaimed alloc_key entries to make reclamation idempotent;
alternatively add TTLs to pool keys and implement a periodic reconcile_pools
task that verifies sums against active allocations and fixes
discrepancies—update resource_repository functions (alloc_key handling and
release()) to cooperate with the new reconciler.
| kafka_consumer = await container.get(AIOKafkaConsumer) | ||
| handler = await container.get(UnifiedConsumer) | ||
|
|
||
| logger.info("ExecutionCoordinator started, consuming events...") | ||
|
|
||
| async for msg in kafka_consumer: | ||
| await handler.handle(msg) | ||
| await kafka_consumer.commit() | ||
|
|
||
| logger.info("ExecutionCoordinator shutdown complete") | ||
|
|
||
| await container.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "run_coordinator.py" | head -5Repository: HardMax71/Integr8sCode
Length of output: 102
🏁 Script executed:
cat -n backend/workers/run_coordinator.pyRepository: HardMax71/Integr8sCode
Length of output: 3053
🏁 Script executed:
find . -type f -name "container.py" | grep -i coreRepository: HardMax71/Integr8sCode
Length of output: 97
🏁 Script executed:
find . -type f -name "*container*" | head -10Repository: HardMax71/Integr8sCode
Length of output: 140
🏁 Script executed:
rg "class.*container|def create_coordinator_container" -t py | head -20Repository: HardMax71/Integr8sCode
Length of output: 167
🏁 Script executed:
cat -n backend/app/core/container.py | head -150Repository: HardMax71/Integr8sCode
Length of output: 5288
🏁 Script executed:
rg "create_coordinator_container" -A 30 backend/app/core/container.pyRepository: HardMax71/Integr8sCode
Length of output: 1035
🏁 Script executed:
rg "class.*Provider|AIOKafkaConsumer" backend/app/core/providers.py -A 5 | head -100Repository: HardMax71/Integr8sCode
Length of output: 2866
🏁 Script executed:
find . -type f -name "providers.py" | xargs wc -l | sort -nRepository: HardMax71/Integr8sCode
Length of output: 101
🏁 Script executed:
rg "CoordinatorProvider|class.*Provider" backend/app/core/providers.py | grep -A 15 "CoordinatorProvider"Repository: HardMax71/Integr8sCode
Length of output: 294
🏁 Script executed:
rg "AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "class CoordinatorProvider" backend/app/core/providers.py -A 30Repository: HardMax71/Integr8sCode
Length of output: 1043
🏁 Script executed:
rg "KafkaServicesProvider" backend/app/core/providers.py -A 30Repository: HardMax71/Integr8sCode
Length of output: 957
🏁 Script executed:
rg "AIOKafkaConsumer" backend/app/core/providers.py -B 10 -A 10Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "AIOKafkaConsumer" backend/ --type pyRepository: HardMax71/Integr8sCode
Length of output: 2707
🏁 Script executed:
rg "provide.*AIOKafkaConsumer|async def.*kafka.*consumer" backend/app/core/providers.py -B 5 -A 20Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "MessagingProvider\|EventProvider" backend/app/core/providers.py -A 50 | head -150Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "def.*kafka" backend/app/core/providers.py -B 3 -A 15Repository: HardMax71/Integr8sCode
Length of output: 1852
🏁 Script executed:
sed -n '1,818p' backend/app/core/providers.py | grep -n "kafka\|Kafka" -iRepository: HardMax71/Integr8sCode
Length of output: 2108
🏁 Script executed:
rg "class EventProvider" backend/app/core/providers.py -A 100 | head -120Repository: HardMax71/Integr8sCode
Length of output: 3239
🏁 Script executed:
rg "UnifiedConsumer\|AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "class MessagingProvider" backend/app/core/providers.py -A 150 | head -180Repository: HardMax71/Integr8sCode
Length of output: 4874
🏁 Script executed:
rg "def.*consumer|async def.*consumer" backend/app/core/providers.py -B 3 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "AIOKafkaConsumer\|UnifiedConsumer" backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "provide.*Consumer|@provide" backend/app/core/providers.py | grep -i consumerRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
find . -type f -name "*.py" -path "*/app/core/*" | xargs grep -l "AIOKafkaConsumer\|provide.*consumer" -iRepository: HardMax71/Integr8sCode
Length of output: 50
🏁 Script executed:
rg "UnifiedConsumer" backend/ -lRepository: HardMax71/Integr8sCode
Length of output: 618
🏁 Script executed:
rg "def.*UnifiedConsumer\|class UnifiedConsumer" backend/ -B 2 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
cat -n backend/app/events/core/consumer.py | head -200Repository: HardMax71/Integr8sCode
Length of output: 3047
🏁 Script executed:
rg "provide" backend/app/events/core/consumer.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "@provide" backend/app/core/providers.py | grep -i "consumer\|unified"Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "UnifiedConsumer" backend/app/core/providers.py -B 5 -A 5Repository: HardMax71/Integr8sCode
Length of output: 48
Wrap the consumer loop in try/finally to ensure cleanup on exceptions.
Any exception in the async for loop (lines 42-44) skips await container.close(), leaving the Kafka consumer and other resources open. The codebase establishes a clear pattern—resources like AIOKafkaProducer, K8sClients, and IdempotencyManager all use try/finally for cleanup. Apply the same pattern here:
Suggested fix
- async for msg in kafka_consumer:
- await handler.handle(msg)
- await kafka_consumer.commit()
-
- logger.info("ExecutionCoordinator shutdown complete")
-
- await container.close()
+ try:
+ async for msg in kafka_consumer:
+ await handler.handle(msg)
+ await kafka_consumer.commit()
+ finally:
+ await kafka_consumer.stop()
+ await container.close()
+ logger.info("ExecutionCoordinator shutdown complete")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| kafka_consumer = await container.get(AIOKafkaConsumer) | |
| handler = await container.get(UnifiedConsumer) | |
| logger.info("ExecutionCoordinator started, consuming events...") | |
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| logger.info("ExecutionCoordinator shutdown complete") | |
| await container.close() | |
| kafka_consumer = await container.get(AIOKafkaConsumer) | |
| handler = await container.get(UnifiedConsumer) | |
| logger.info("ExecutionCoordinator started, consuming events...") | |
| try: | |
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| finally: | |
| await kafka_consumer.stop() | |
| await container.close() | |
| logger.info("ExecutionCoordinator shutdown complete") |
🤖 Prompt for AI Agents
In `@backend/workers/run_coordinator.py` around lines 37 - 48, The async consumer
loop in ExecutionCoordinator using kafka_consumer and handler can raise
exceptions which skip cleanup; wrap the async for msg in kafka_consumer loop in
a try/finally so that await container.close() (and any kafka_consumer
commit/close if needed) always runs, move the "ExecutionCoordinator shutdown
complete" log into the finally block, and ensure resource-specific cleanup for
AIOKafkaConsumer and UnifiedConsumer happens before closing the container
(referencing kafka_consumer, handler, container, AIOKafkaConsumer,
UnifiedConsumer).
| # Service is HTTP-driven, wait for external shutdown | ||
| await asyncio.Event().wait() | ||
|
|
||
| await asyncio.Event().wait() | ||
| await container.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unreachable container.close() - missing signal handling.
asyncio.Event().wait() blocks forever since the event is never set. Line 35 await container.close() will never execute, causing potential resource leaks when the process terminates.
Consider adding signal handling for graceful shutdown:
🔧 Proposed fix with signal handling
+import signal
+
async def run_replay_service(settings: Settings) -> None:
"""Run the event replay service."""
container = create_event_replay_container(settings)
-
logger = await container.get(logging.Logger)
logger.info("Starting EventReplayService with DI container...")
db = await container.get(Database)
await init_beanie(database=db, document_models=ALL_DOCUMENTS)
logger.info("Event replay service initialized and ready")
- # Service is HTTP-driven, wait for external shutdown
- await asyncio.Event().wait()
-
- await container.close()
+ # Wait for shutdown signal
+ shutdown_event = asyncio.Event()
+ loop = asyncio.get_running_loop()
+ for sig in (signal.SIGTERM, signal.SIGINT):
+ loop.add_signal_handler(sig, shutdown_event.set)
+
+ await shutdown_event.wait()
+ logger.info("Shutdown signal received, closing container...")
+ await container.close()🤖 Prompt for AI Agents
In `@backend/workers/run_event_replay.py` around lines 32 - 35, The
forever-blocking await asyncio.Event().wait() prevents container.close() from
ever running; replace the anonymous Event with a named asyncio.Event (e.g.,
shutdown_event), register signal handlers for SIGINT/SIGTERM that set
shutdown_event (using loop.add_signal_handler or signal.signal fallback), await
shutdown_event.wait() in run_event_replay.py, then call await container.close()
after the wait so the container is closed during graceful shutdown; reference
the existing asyncio.Event().wait() and container.close() calls when making the
change.
| kafka_consumer = await container.get(AIOKafkaConsumer) | ||
| handler = await container.get(UnifiedConsumer) | ||
|
|
||
| logger.info("KubernetesWorker started, consuming events...") | ||
|
|
||
| async for msg in kafka_consumer: | ||
| await handler.handle(msg) | ||
| await kafka_consumer.commit() | ||
|
|
||
| logger.info("KubernetesWorker shutdown complete") | ||
|
|
||
| await container.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the actual file around the specified lines
cd /tmp && find . -name "run_k8s_worker.py" -type f 2>/dev/nullRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Search for the file in the repository
git ls-files | grep -i "run_k8s_worker"Repository: HardMax71/Integr8sCode
Length of output: 99
🏁 Script executed:
# Look for worker-related files
git ls-files | grep -i worker | head -20Repository: HardMax71/Integr8sCode
Length of output: 856
🏁 Script executed:
# List backend directory structure
git ls-files | grep "^backend/" | head -30Repository: HardMax71/Integr8sCode
Length of output: 999
🏁 Script executed:
# Read the actual file
cat -n backend/workers/run_k8s_worker.pyRepository: HardMax71/Integr8sCode
Length of output: 3010
🏁 Script executed:
# Check AIOKafkaConsumer API and usage patterns in codebase
rg "AIOKafkaConsumer" -A 5 -B 5 | head -100Repository: HardMax71/Integr8sCode
Length of output: 6496
🏁 Script executed:
# Check container.close() implementation
rg "def close" backend/app/core/container.py -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Look at other worker files to see if they handle cleanup similarly
ls -la backend/workers/run_*.pyRepository: HardMax71/Integr8sCode
Length of output: 580
🏁 Script executed:
# Check one of the other worker files for comparison
cat -n backend/workers/run_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 3054
🏁 Script executed:
# Find the container.close() implementation
find . -name "container.py" -type f | xargs cat -n | head -200Repository: HardMax71/Integr8sCode
Length of output: 6533
🏁 Script executed:
# Search for container close implementation more specifically
rg "async def close" backend/app/core/ -A 20Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check how container manages Kafka consumer lifecycle
rg "AIOKafkaConsumer" backend/app/core/container.py -B 5 -A 10Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Find providers that register Kafka consumer
rg "K8sWorkerProvider|KafkaServicesProvider|EventProvider" backend/app/core/providers.py -A 20 | head -150Repository: HardMax71/Integr8sCode
Length of output: 2219
🏁 Script executed:
# Search for how AIOKafkaConsumer is provided/registered
rg "AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Look at all providers
ls backend/app/core/Repository: HardMax71/Integr8sCode
Length of output: 275
🏁 Script executed:
# Search for AIOKafkaConsumer registration in providers
rg "AIOKafkaConsumer" backend/app/core/providers.py -B 10 -A 10Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Look at the full providers.py file to understand Kafka consumer registration
wc -l backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 99
🏁 Script executed:
# Read the providers file to find how Kafka consumer is set up
head -300 backend/app/core/providers.py | tail -100Repository: HardMax71/Integr8sCode
Length of output: 3289
🏁 Script executed:
# Search for provide decorator and Kafka consumer
rg "@provide" backend/app/core/providers.py | grep -i kafkaRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Search more broadly for get_aiokafka_consumer or consumer registration
rg "get_aiokafka_consumer|AIOKafkaConsumer" backend/app/core/providers.py -B 5 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Read the entire providers.py to find consumer registration
cat backend/app/core/providers.py | grep -A 20 -B 5 "AIOKafkaConsumer"Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Search for UnifiedConsumer which is what gets the kafka_consumer
rg "UnifiedConsumer" backend/app/core/providers.py -B 5 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check where AIOKafkaConsumer is imported from and how it's used
rg "from.*AIOKafkaConsumer|import.*AIOKafkaConsumer" backend/app/ -B 2 -A 8Repository: HardMax71/Integr8sCode
Length of output: 902
🏁 Script executed:
# Search for where UnifiedConsumer is defined and how it gets the kafka_consumer
rg "class UnifiedConsumer" backend/app/ -A 30Repository: HardMax71/Integr8sCode
Length of output: 2307
🏁 Script executed:
# Look for how kafka_consumer is provided - check EventProvider more carefully
cat backend/app/core/providers.py | grep -A 50 "class EventProvider"Repository: HardMax71/Integr8sCode
Length of output: 1662
🏁 Script executed:
# Search for async generator patterns with kafka
rg "AsyncIterator\[AIOKafkaConsumer\]" backend/app/ -B 10 -A 10Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Search all of providers.py for `@provide` methods that mention consumer
cat backend/app/core/providers.py | grep -B 2 -A 20 "@provide" | grep -B 2 -A 20 -i "consumer"Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Look for where workers get their kafka_consumer from - check the actual container.get call
rg "container.get.*AIOKafkaConsumer" backend/workers/ -B 5 -A 5Repository: HardMax71/Integr8sCode
Length of output: 4104
🏁 Script executed:
# Check if kafka consumer is provided as part of EventProvider or elsewhere
grep -n "kafka_consumer\|AIOKafkaConsumer" backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Let's search the entire providers.py file line by line for kafka
cat backend/app/core/providers.py | grep -i kafkaRepository: HardMax71/Integr8sCode
Length of output: 1948
🏁 Script executed:
# Check if there are other provider files that might define it
find backend/app -name "*provider*" -type fRepository: HardMax71/Integr8sCode
Length of output: 95
🏁 Script executed:
# Search entire app for where AIOKafkaConsumer is provided/created
rg "get_aiokafka_consumer|AsyncIterator.*Kafka" backend/app/ -B 5 -A 15Repository: HardMax71/Integr8sCode
Length of output: 1340
🏁 Script executed:
# Let's check the dishka_lifespan.py which might have setup
cat backend/app/core/dishka_lifespan.pyRepository: HardMax71/Integr8sCode
Length of output: 3773
🏁 Script executed:
# Search for where AIOKafkaConsumer is provided as a dependency with proper async cleanup
rg "get_.*consumer|AIOKafkaConsumer" backend/app/core/providers.py -B 8 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Let's look at the entire EventProvider class in providers.py
sed -n '/class EventProvider/,/^class /p' backend/app/core/providers.py | head -150Repository: HardMax71/Integr8sCode
Length of output: 1071
🏁 Script executed:
# Check if there's a consumer provider we missed
grep -n "class.*Provider" backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 1016
🏁 Script executed:
# Check CoordinatorProvider for consumer setup
sed -n '/^class CoordinatorProvider/,/^class /p' backend/app/core/providers.py | head -50Repository: HardMax71/Integr8sCode
Length of output: 1007
🏁 Script executed:
# Check K8sWorkerProvider for consumer setup
sed -n '/^class K8sWorkerProvider/,/^class /p' backend/app/core/providers.py | head -50Repository: HardMax71/Integr8sCode
Length of output: 1056
🏁 Script executed:
# Search across all providers for any method that returns AIOKafkaConsumer
rg "@provide" backend/app/core/providers.py -A 20 | grep -B 5 -A 15 "AIOKafkaConsumer"Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Search the entire providers file for anything related to "consumer"
grep -n "consumer\|Consumer" backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Let's check the EventProvider more thoroughly - maybe it provides the consumer
sed -n '283,313p' backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 1035
🏁 Script executed:
# Search for UnifiedConsumer provider
grep -n "UnifiedConsumer" backend/app/core/providers.py -B 5 -A 10Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Maybe consumer is created somewhere else - search all event files
rg "get_aiokafka_consumer|@provide.*consumer" backend/app/events/ -B 3 -A 15Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Let's check if there's a consumer factory or if it's in __init__ files
rg "AIOKafkaConsumer" backend/app/ --type py -B 3 -A 3 | head -200Repository: HardMax71/Integr8sCode
Length of output: 1775
🏁 Script executed:
# Check if EventProvider does more than what we saw
cat backend/app/core/providers.py | sed -n '283,312p'Repository: HardMax71/Integr8sCode
Length of output: 1034
🏁 Script executed:
# Search for any file that creates or provides AIOKafkaConsumer
find backend/app -name "*.py" -exec grep -l "AIOKafkaConsumer" {} \;Repository: HardMax71/Integr8sCode
Length of output: 146
🏁 Script executed:
# Check startup.py which might initialize the consumer
cat backend/app/core/startup.pyRepository: HardMax71/Integr8sCode
Length of output: 1589
🏁 Script executed:
# Look at the full consumer.py to understand how it's meant to be used
cat backend/app/events/core/consumer.pyRepository: HardMax71/Integr8sCode
Length of output: 2564
🏁 Script executed:
# Check if there's a consumer factory
rg "def.*consumer|class.*Consumer" backend/app/events/core/ -B 2 -A 15Repository: HardMax71/Integr8sCode
Length of output: 5289
🏁 Script executed:
# Search for UnifiedConsumer provider - it must be provided somewhere
rg "@provide.*UnifiedConsumer|def.*unified_consumer" backend/app/core/providers.py -B 5 -A 20Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Let's check the entire EventReplayProvider to see if it provides consumer
sed -n '800,830p' backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 637
🏁 Script executed:
# Search for all `@provide` methods in EventProvider
sed -n '283,313p' backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 1035
🏁 Script executed:
# Maybe there's a consumer factory in the __init__ of events
cat backend/app/events/__init__.pyRepository: HardMax71/Integr8sCode
Length of output: 48
Wrap the consumer loop in try/finally to guarantee cleanup on exceptions.
An exception in handle() or commit() will skip await container.close(), leaving resources open. The async for loop (lines 41-43) should be wrapped to ensure cleanup always executes.
🔧 Suggested fix
- async for msg in kafka_consumer:
- await handler.handle(msg)
- await kafka_consumer.commit()
-
- logger.info("KubernetesWorker shutdown complete")
-
- await container.close()
+ try:
+ async for msg in kafka_consumer:
+ await handler.handle(msg)
+ await kafka_consumer.commit()
+ finally:
+ await kafka_consumer.stop()
+ await container.close()
+ logger.info("KubernetesWorker shutdown complete")This pattern affects all worker files (run_result_processor.py, run_saga_orchestrator.py, run_coordinator.py, run_pod_monitor.py).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| kafka_consumer = await container.get(AIOKafkaConsumer) | |
| handler = await container.get(UnifiedConsumer) | |
| logger.info("KubernetesWorker started, consuming events...") | |
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| logger.info("KubernetesWorker shutdown complete") | |
| await container.close() | |
| kafka_consumer = await container.get(AIOKafkaConsumer) | |
| handler = await container.get(UnifiedConsumer) | |
| logger.info("KubernetesWorker started, consuming events...") | |
| try: | |
| async for msg in kafka_consumer: | |
| await handler.handle(msg) | |
| await kafka_consumer.commit() | |
| finally: | |
| await kafka_consumer.stop() | |
| await container.close() | |
| logger.info("KubernetesWorker shutdown complete") |
🤖 Prompt for AI Agents
In `@backend/workers/run_k8s_worker.py` around lines 36 - 47, The consumer loop
can raise in handler.handle() or kafka_consumer.commit() and skip cleanup; wrap
the async for msg in kafka_consumer loop in a try/finally so that await
container.close() (and any other shutdown steps) always run. Specifically,
enclose the async for ...: await handler.handle(msg); await
kafka_consumer.commit() inside try: ... finally: await container.close() and
keep the logger.info("KubernetesWorker shutdown complete") inside the finally;
apply the same pattern to the other worker modules (run_result_processor.py,
run_saga_orchestrator.py, run_coordinator.py, run_pod_monitor.py) referencing
kafka_consumer, UnifiedConsumer.handle, kafka_consumer.commit, and
container.close where present.
| kafka_consumer = await container.get(AIOKafkaConsumer) | ||
| handler = await container.get(UnifiedConsumer) | ||
|
|
||
| logger.info("ResultProcessor started, consuming events...") | ||
|
|
||
| async for msg in kafka_consumer: | ||
| await handler.handle(msg) | ||
| await kafka_consumer.commit() | ||
|
|
||
| logger.info("ResultProcessor shutdown complete") | ||
|
|
||
| await container.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "run_result_processor.py" -type fRepository: HardMax71/Integr8sCode
Length of output: 107
🏁 Script executed:
cat -n ./backend/workers/run_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 3054
🏁 Script executed:
# Check the container implementation to understand how it manages lifecycle
find . -name "container.py" -path "*/core/*" -type fRepository: HardMax71/Integr8sCode
Length of output: 97
🏁 Script executed:
cat -n ./backend/app/core/container.pyRepository: HardMax71/Integr8sCode
Length of output: 6533
🏁 Script executed:
# Check the MessagingProvider to see how AIOKafkaConsumer is configured
find . -path "*/core/providers.py" -o -path "*/core/providers/*" -type f | head -20Repository: HardMax71/Integr8sCode
Length of output: 97
🏁 Script executed:
cat -n ./backend/app/core/providers.py | head -150Repository: HardMax71/Integr8sCode
Length of output: 7420
🏁 Script executed:
grep -n "AIOKafkaConsumer" ./backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
wc -l ./backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 101
🏁 Script executed:
grep -n "class.*Provider" ./backend/app/core/providers.py | grep -i "messaging\|kafka"Repository: HardMax71/Integr8sCode
Length of output: 182
🏁 Script executed:
sed -n '224,270p' ./backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 1536
🏁 Script executed:
sed -n '201,223p' ./backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 778
🏁 Script executed:
grep -n "AIOKafkaConsumer\|get.*consumer" ./backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
rg "AIOKafkaConsumer" --type py -A 5 -B 5Repository: HardMax71/Integr8sCode
Length of output: 21734
🏁 Script executed:
sed -n '280,400p' ./backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 3802
🏁 Script executed:
grep -n "provide.*consumer\|provide.*kafka" ./backend/app/core/providers.py -iRepository: HardMax71/Integr8sCode
Length of output: 227
🏁 Script executed:
sed -n '508,650p' ./backend/app/core/providers.pyRepository: HardMax71/Integr8sCode
Length of output: 4603
Wrap the consumer loop in try/finally to guarantee cleanup on failure and cancellation.
The consumer loop lacks exception/cancellation handling. If handle() raises or the consumer task is cancelled (e.g., via SIGTERM), the async for loop exits abnormally and await container.close() is never executed, leaving the consumer open. This pattern affects all worker files (run_pod_monitor.py, run_saga_orchestrator.py, run_k8s_worker.py, run_coordinator.py).
The producer already demonstrates correct cleanup with explicit stop() in a finally block. Apply the same pattern to the consumer:
🔧 Suggested fix
- async for msg in kafka_consumer:
- await handler.handle(msg)
- await kafka_consumer.commit()
-
- logger.info("ResultProcessor shutdown complete")
-
- await container.close()
+ try:
+ async for msg in kafka_consumer:
+ await handler.handle(msg)
+ await kafka_consumer.commit()
+ finally:
+ await kafka_consumer.stop()
+ await container.close()
+ logger.info("ResultProcessor shutdown complete")🤖 Prompt for AI Agents
In `@backend/workers/run_result_processor.py` around lines 36 - 47, Wrap the
consumer loop in a try/finally so cleanup always runs: start by acquiring
kafka_consumer (AIOKafkaConsumer) and handler (UnifiedConsumer) as before, then
run the async for msg in kafka_consumer / await handler.handle(msg) loop inside
a try block; in the finally block ensure you stop/close the kafka_consumer (call
kafka_consumer.stop() or equivalent) and then await container.close(); re-raise
cancellation errors if needed so signal handling still works. This guarantees
the consumer and container are closed even if handler.handle() raises or the
task is cancelled.
| async for msg in kafka_consumer: | ||
| await handler.handle(msg) | ||
| await kafka_consumer.commit() | ||
|
|
||
| try: | ||
| # Wait for shutdown signal or service to stop | ||
| while orchestrator.is_running and not shutdown_event.is_set(): | ||
| await asyncio.sleep(1) | ||
| finally: | ||
| # Container cleanup stops everything | ||
| logger.info("Initiating graceful shutdown...") | ||
| await container.close() | ||
| logger.info("SagaOrchestrator shutdown complete") | ||
|
|
||
| logger.warning("Saga orchestrator stopped") | ||
| await container.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing graceful shutdown handling and error resilience.
Several issues with the main loop:
-
No graceful shutdown: Signal handling (SIGTERM/SIGINT) was removed. In containerized environments, workers need to handle shutdown signals to stop consuming and exit cleanly.
-
No error handling: If
handler.handle(msg)raises an exception (whichUnifiedConsumer.handledoes after logging), the worker crashes. A single poison message could take down the worker. -
Container cleanup not guaranteed:
container.close()won't be called if an exception occurs in the loop.
Proposed fix with shutdown handling and error resilience
+import signal
+from contextlib import suppress
+
async def run_saga_orchestrator(settings: Settings) -> None:
"""Run the saga orchestrator service."""
container = create_saga_orchestrator_container(settings)
logger = await container.get(logging.Logger)
logger.info("Starting SagaOrchestrator with DI container...")
db = await container.get(Database)
await init_beanie(database=db, document_models=ALL_DOCUMENTS)
schema_registry = await container.get(SchemaRegistryManager)
await initialize_event_schemas(schema_registry)
kafka_consumer = await container.get(AIOKafkaConsumer)
handler = await container.get(UnifiedConsumer)
+ shutdown_event = asyncio.Event()
+ loop = asyncio.get_running_loop()
+ for sig in (signal.SIGTERM, signal.SIGINT):
+ loop.add_signal_handler(sig, shutdown_event.set)
+
logger.info("SagaOrchestrator started, consuming events...")
- async for msg in kafka_consumer:
- await handler.handle(msg)
- await kafka_consumer.commit()
-
- logger.info("SagaOrchestrator shutdown complete")
-
- await container.close()
+ try:
+ async for msg in kafka_consumer:
+ if shutdown_event.is_set():
+ break
+ try:
+ await handler.handle(msg)
+ await kafka_consumer.commit()
+ except Exception as e:
+ logger.exception(f"Error processing message: {e}")
+ # Message not committed - will be redelivered
+ finally:
+ logger.info("SagaOrchestrator shutting down...")
+ with suppress(Exception):
+ await kafka_consumer.stop()
+ await container.close()
+ logger.info("SagaOrchestrator shutdown complete")🤖 Prompt for AI Agents
In `@backend/workers/run_saga_orchestrator.py` around lines 41 - 47, Add graceful
shutdown and per-message error resilience around the kafka_consumer loop:
install asyncio signal handlers for SIGINT/SIGTERM that set an asyncio.Event
(e.g., shutdown_event) and make the async for msg in kafka_consumer loop respect
that event so it stops cleanly; wrap message processing so each iteration calls
await handler.handle(msg) inside a try/except that logs the exception via
logger.error (including exception details) and continues (optionally still
commit offsets on successful processing via kafka_consumer.commit), and wrap the
whole consumer loop in try/finally to ensure await container.close() and any
kafka_consumer stop/close are always awaited on shutdown. Reference symbols:
kafka_consumer, handler.handle, kafka_consumer.commit, container.close, logger.



Summary by cubic
Moved service startup and lifecycle management into DI and refactored workers/services into stateless handlers. Added Redis-backed repositories to replace in-memory managers, enabling horizontal scaling.
Refactors
New Features
Written for commit 9271f12. Summary will update on new commits.
Summary by CodeRabbit
Release Notes
Refactor
Chores
✏️ Tip: You can customize this high-level summary in your review settings.