Change asyncio.gather to TaskGroup in the Harbor generator to correctly propagate cancellation signals.#1160
Change asyncio.gather to TaskGroup in the Harbor generator to correctly propagate cancellation signals.#1160alexgshaw wants to merge 6 commits intoNovaSky-AI:mainfrom
Conversation
|
Thanks @alexgshaw !
No worry, I can edit the PR directly. Let me know when it's ready! |
|
Okay, I might add some more changes later, but this should be ready. Pretty much all of my logic changes are in the generate method. |
|
(By more changes later I mean additional PRs, this one is ready to merge!) |
…n propagation Fixes laude-institute/harbor#656 — when users interrupt RL with Ctrl+C during sandbox rollout, sandboxes now properly terminate instead of requiring manual cleanup. asyncio.TaskGroup propagates cancellation to all child tasks, ensuring Harbor trials (Daytona/Modal sandboxes) are cleaned up on KeyboardInterrupt. Based on NovaSky-AI#1160 with lint reverted. Co-Authored-By: Alex Shaw <alexgshaw64@gmail.com>
|
/gemini review |
…n propagation Fixes laude-institute/harbor#656 — when users interrupt RL with Ctrl+C during sandbox rollout, sandboxes now properly terminate instead of requiring manual cleanup. asyncio.TaskGroup propagates cancellation to all child tasks, ensuring Harbor trials (Daytona/Modal sandboxes) are cleaned up on KeyboardInterrupt. Also wraps the TaskGroup in try/finally to ensure the progress bar is always closed, even if the TaskGroup raises an ExceptionGroup. Ports the change to examples/train_integrations/harbor/ for the skyrl migration (NovaSky-AI#1145). Co-Authored-By: Alex Shaw <alexgshaw64@gmail.com>
When the driver process receives KeyboardInterrupt (Ctrl+C), `ray.get()` raises
the exception but does NOT cancel the remote worker task. The worker keeps
running until Ray eventually kills it (often with SIGKILL), skipping all async
cleanup — including Harbor sandbox teardown.
Fix: catch KeyboardInterrupt at the `ray.get()` call site and explicitly invoke
`ray.cancel(ref, force=False)`, which delivers KeyboardInterrupt to the worker.
asyncio.run() has built-in handling for KeyboardInterrupt: it cancels the main
task and runs all finally blocks, allowing Trial.run() to delete sandboxes.
Additionally, add a SIGTERM→KeyboardInterrupt handler in BasePPOExp.run() as
defense-in-depth for when Ray terminates the worker with SIGTERM directly.
Without this, SIGTERM raises SystemExit which asyncio.run() does not handle
gracefully (cleanup callbacks are skipped).
Tested E2E:
- KeyboardInterrupt (Ctrl+C): 0/50 sandboxes leaked (was 27/50 before fix)
- Unhandled exception: 0/64 sandboxes leaked (was 4/64 before fix,
remaining leaks fixed by harbor asyncio.shield() change in harbor#656)
Fixes NovaSky-AI#1160
See also: laude-institute/harbor#656
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…trl+C Ray's C code sets SIGINT to SIG_IGN during initialization, which prevents Python from ever seeing KeyboardInterrupt. The previous fix (ray.cancel + raise) failed at production scale because the driver exited before the worker could finish cleanup. Key changes: - Reinstall SIGINT/SIGTERM handlers AFTER initialize_ray() to override Ray's SIG_IGN - Replace blocking ray.get() with time.sleep() + non-blocking ray.wait() polling loop — time.sleep() is interruptible by signals whereas ray.get() blocks in C code - After first Ctrl+C: temporarily ignore signals (uv sends SIGTERM when it dies), call ray.cancel(force=False), then wait up to 2 minutes for the worker to clean up sandboxes - Second Ctrl+C force-kills the worker Tested: 50 Daytona sandboxes created → 0 leaked after SIGINT. Also tested: exception during generation → 0 leaked. See harbor#656 / SkyRL#1160. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the asynchronous task handling in the Harbor generator to use asyncio.TaskGroup instead of asyncio.gather. This is a great improvement for ensuring correct cancellation signal propagation. The changes also include robust signal handling in the main entrypoint to gracefully manage ray worker lifecycle and cleanup, which is crucial for stability.
My review identifies a critical bug in the SIGTERM handling logic that could lead to a crash, and also points out opportunities for code deduplication and style improvements in the signal handling implementation. Overall, the changes are in the right direction for making the training process more robust.
| signal.signal(signal.SIGINT, signal.default_int_handler) | ||
| signal.signal(signal.SIGTERM, lambda s, f: signal.default_int_handler(s, f)) | ||
|
|
||
| try: | ||
| # Poll with time.sleep() (interruptible by signals) + non-blocking | ||
| # ray.wait(). We cannot use ray.get() or ray.wait(timeout=N>0) | ||
| # because those block in C code and may re-mask SIGINT. | ||
| while True: | ||
| time.sleep(1) | ||
| # Reinstall handlers in case ray.wait() overrode them | ||
| signal.signal(signal.SIGINT, signal.default_int_handler) | ||
| signal.signal(signal.SIGTERM, lambda s, f: signal.default_int_handler(s, f)) |
There was a problem hiding this comment.
The signal handling logic has a few issues:
- Critical Bug: The
SIGTERMhandlerlambda s, f: signal.default_int_handler(s, f)is incorrect.signal.default_int_handler()takes no arguments, so passingsandfwill cause aTypeErrorwhen the handler is invoked. The process will crash instead of gracefully handlingSIGTERM. - Code Duplication: The signal handler installation logic is repeated multiple times (here, and later in the
exceptblock on lines 118-119 and 125). This makes the code harder to read and maintain.
I suggest refactoring this into a helper function and correcting the SIGTERM handler. This will fix the bug and improve code quality. You can then use this helper function in all places where signal handlers are set or reset, which will also fix an inconsistency on line 125 where only the SIGINT handler is re-installed.
| signal.signal(signal.SIGINT, signal.default_int_handler) | |
| signal.signal(signal.SIGTERM, lambda s, f: signal.default_int_handler(s, f)) | |
| try: | |
| # Poll with time.sleep() (interruptible by signals) + non-blocking | |
| # ray.wait(). We cannot use ray.get() or ray.wait(timeout=N>0) | |
| # because those block in C code and may re-mask SIGINT. | |
| while True: | |
| time.sleep(1) | |
| # Reinstall handlers in case ray.wait() overrode them | |
| signal.signal(signal.SIGINT, signal.default_int_handler) | |
| signal.signal(signal.SIGTERM, lambda s, f: signal.default_int_handler(s, f)) | |
| def _reinstall_signal_handlers(): | |
| """Restores default signal handlers for SIGINT and SIGTERM.""" | |
| def _sigterm_handler(signum, frame): | |
| raise KeyboardInterrupt | |
| signal.signal(signal.SIGINT, signal.default_int_handler) | |
| signal.signal(signal.SIGTERM, _sigterm_handler) | |
| _reinstall_signal_handlers() | |
| try: | |
| # Poll with time.sleep() (interruptible by signals) + non-blocking | |
| # ray.wait(). We cannot use ray.get() or ray.wait(timeout=N>0) | |
| # because those block in C code and may re-mask SIGINT. | |
| while True: | |
| time.sleep(1) | |
| # Reinstall handlers in case ray.wait() overrode them | |
| _reinstall_signal_handlers() |
| import signal | ||
| import time |
1. Refactor duplicated signal installation into _install_interrupt_handlers() helper function (Gemini suggestion). 2. Fix SIGTERM not reinstalled in cleanup wait loop — previously only SIGINT was reinstalled after ray.wait(), leaving SIGTERM ineffective during the 120s cleanup wait (Devin review, valid). 3. Move `import signal` and `import time` to module top level (PEP 8). 4. Wrap _worker() in try/except Exception so that a single failing trajectory (e.g. tokenizer error) does not cancel the entire TaskGroup batch. CancelledError still propagates for cleanup. (Devin review, valid concern about TaskGroup semantics.) Note: Gemini's claim that signal.default_int_handler takes no arguments is incorrect — it takes (signalnum, frame). The lambda is correct. Tested: Ctrl+C (50 sandboxes → 0 leaked), exception (0 leaked). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Per user preference, one unexpected error canceling the entire TaskGroup batch is fine. The try/except in harbor_agent_loop's retry loop already handles expected failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
/gemini review |
| # | ||
| # See harbor#656 / SkyRL#1160. | ||
| signal.signal(signal.SIGTERM, signal.SIG_IGN) | ||
| signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
| trainer = self._setup_trainer() |
There was a problem hiding this comment.
🟡 SIGINT masked too early makes the entire setup phase uninterruptible by Ctrl+C
In HarborExp.run(), signal.signal(signal.SIGINT, signal.SIG_IGN) is set before self._setup_trainer() is called. The SIGINT handler is only restored later inside the event loop by loop.add_signal_handler(signal.SIGINT, _cancel_main) within _run_with_graceful_shutdown. This means that during the entire _setup_trainer() phase — which includes building inference engines, creating the trainer, and calling trainer.build_models() — Ctrl+C is completely ignored.
Root cause and impact
The intent (documented in the comment at line 96-98) is to prevent asyncio.run() from installing its own _on_sigint handler. However, placing the SIG_IGN call at line 103 — before _setup_trainer() at line 104 — has the unintended side effect of also masking SIGINT during setup.
_setup_trainer() (skyrl/train/entrypoints/main_base.py:370-414) performs heavyweight operations: initializing trackers, starting inference engines (get_inference_client()), building models (trainer.build_models()), etc. This can take several minutes with large models. During this entire period, the user has no way to interrupt the process with Ctrl+C.
The fix is to move the SIG_IGN calls to just before asyncio.run(), after _setup_trainer() returns:
trainer = self._setup_trainer()
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)The same issue exists in the duplicate file at skyrl-train/examples/harbor/entrypoints/main_harbor.py:100-104.
| # | |
| # See harbor#656 / SkyRL#1160. | |
| signal.signal(signal.SIGTERM, signal.SIG_IGN) | |
| signal.signal(signal.SIGINT, signal.SIG_IGN) | |
| trainer = self._setup_trainer() | |
| trainer = self._setup_trainer() | |
| signal.signal(signal.SIGTERM, signal.SIG_IGN) | |
| signal.signal(signal.SIGINT, signal.SIG_IGN) | |
Was this helpful? React with 👍 or 👎 to provide feedback.
See #1194
Fix Daytona sandbox leak on Ctrl+C and unhandled exceptions
Motivation
When a user presses Ctrl+C during Harbor RL training, Daytona sandboxes are not
cleaned up. They remain running indefinitely and must be manually deleted (or up
to a user-specified timeout like 30 mins). The same issue occurs when an unhandled
exception kills the training loop. In a run with 50+ concurrent sandboxes, all of them leak.
This PR fixes the full chain of issues — from the asyncio task structure in the
generator, through the Ray worker's signal handling, to the driver's cleanup
logic. Each fix addresses a specific failure mode discovered through E2E testing.
What's changed
1.
harbor_generator.py— TaskGroup instead of asyncio.gatherSymptom: When one trajectory raises an exception,
asyncio.gatherpropagatesthe error but leaves all sibling coroutines running as orphans. Nobody awaits
them, so their
finallyblocks (sandbox teardown) may never execute.Fix: Replace
tqdm.asyncio.tqdm.gatherwithasyncio.TaskGroup. When onetask fails or the group is cancelled, TaskGroup cancels all siblings and waits
for their cleanup (
finallyblocks) before propagating the exception.2.
main_base.py— SIGTERM → KeyboardInterrupt in the workerSymptom: When the driver calls
ray.cancel(ref, force=False), Ray raisesKeyboardInterruptin the worker. But when Ray sendsSIGTERMto terminatethe worker (e.g. on force-cancel or cluster shutdown), the default handler
raises
SystemExit.asyncio.run()does not handleSystemExit— itexits immediately without cancelling tasks or running
finallyblocks.Sandboxes leak.
Fix: Install a SIGTERM handler in
BasePPOExp.run()that raisesKeyboardInterruptinstead. This letsasyncio.run()do its proper shutdown:cancel the main task, propagate
CancelledErrorto subtasks, and run allfinallyblocks.3.
main_harbor.py— Driver waits for worker cleanupThree issues were discovered in the driver (the process that calls
ray.get()):3a. Ray masks SIGINT during
ray.get()Symptom:
ray.get()blocks in C code where SIGINT is set toSIG_IGN.Pressing Ctrl+C has no effect — the driver hangs forever.
Fix: Replace
ray.get(ref)with a polling loop:time.sleep(1)(which isinterruptible by signals) followed by a non-blocking
ray.wait([ref], timeout=0).Signal handlers are reinstalled after every
ray.wait()call since Ray mayre-mask them.
3b. Driver exits before worker finishes cleanup
Symptom: The old code did
ray.cancel(ref, force=False); raise. The driverexits immediately after calling cancel. The worker starts cleanup (deleting
sandboxes) but the driver is already gone. At scale (50+ sandboxes), the worker
needs time to delete them all. Sandboxes leak.
Fix: After
ray.cancel(), pollray.wait()for up to 120 seconds, waitingfor the worker to finish cleanup. A second Ctrl+C force-kills the worker for
users who want to exit immediately.
3c. uv sends SIGTERM when it dies
Symptom: When Ctrl+C sends SIGINT to the process group,
uv(the parentprocess) also receives SIGINT, dies, and sends SIGTERM to the Python child.
The driver receives: SIGINT → first KeyboardInterrupt → enters cleanup handler
→ SIGTERM from uv → second KeyboardInterrupt → force-kills worker before
cleanup can start.
Fix: Temporarily set SIGINT and SIGTERM to
SIG_IGNupon entering thecleanup handler, call
ray.cancel(), wait 0.5s for uv's SIGTERM to beabsorbed, then restore handlers so a deliberate second Ctrl+C still works.
Testing
Both tests run with 2× H100 GPUs, Qwen3-1.7B, 8 prompts × 8 samples = 64
trajectories, 50 concurrent Daytona sandboxes.
Companion PR
Harbor fix (asyncio.shield in
_create_sandbox): laude-institute/harbor#819