Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions claude/lightcone/hooks.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
]
}
],
"SessionEnd": [
{
"hooks": [
{
"type": "command",
"command": "bash ${CLAUDE_PROJECT_DIR}/.claude/scripts/session-end.sh",
"timeout": 10
}
]
}
],
"PostToolUse": [
{
"matcher": "Write|Edit",
Expand Down
17 changes: 17 additions & 0 deletions claude/lightcone/scripts/session-end.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# SessionEnd hook: stop the project's session-scoped Dask scheduler.
#
# Best-effort and silent. The scheduler self-shuts on idle-timeout
# (see lightcone.engine.dask_daemon) so failure to fire here only
# delays cleanup; it does not leak resources indefinitely.

input=$(cat)
cwd=$(echo "$input" | jq -r '.cwd // empty')

[ -z "$cwd" ] && exit 0
cd "$cwd" 2>/dev/null || exit 0
[ -f "astra.yaml" ] || exit 0
command -v lc &>/dev/null || exit 0

lc dask stop >/dev/null 2>&1 || true
exit 0
37 changes: 31 additions & 6 deletions src/lightcone/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,10 @@ def run(
) -> None:
"""Materialize outputs declared in astra.yaml.

Always dispatches through a Dask cluster: a ``LocalCluster`` on a
workstation, srun-launched workers inside a SLURM allocation, or an
existing scheduler if ``DASK_SCHEDULER_ADDRESS`` is set.
Always dispatches through a Dask cluster: the session-scoped
scheduler (spawned on first run, reused thereafter), or an existing
one if ``DASK_SCHEDULER_ADDRESS`` is set. See
:mod:`lightcone.engine.dask_daemon` for the lifecycle.
"""
_abort_on_perlmutter_login()

Expand Down Expand Up @@ -474,9 +475,7 @@ def run(
except RunLockBusyError as e:
raise click.ClickException(str(e))

with cluster_for_run(
verbose=verbose, local_directory=str(rundirs.dask_local)
) as scheduler_addr:
with cluster_for_run(project_path=project, verbose=verbose) as scheduler_addr:
env = {
**os.environ,
"DASK_SCHEDULER_ADDRESS": scheduler_addr,
Expand Down Expand Up @@ -817,6 +816,32 @@ def _ensure_images(project: Path, *, runtime: str, force: bool = False) -> None:
raise click.ClickException(str(e))


# =============================================================================
# lc dask
# =============================================================================


@main.group()
def dask() -> None:
"""Manage the session-scoped Dask scheduler."""


@dask.command("stop")
def dask_stop() -> None:
"""Shut down the session-scoped Dask scheduler for this project.

Best-effort: silent when no scheduler is running. Wired to the
SessionEnd Claude Code hook so closing a session frees the
scheduler's resources promptly; otherwise the scheduler self-shuts
after its idle timeout.
"""
from lightcone.engine.dask_daemon import stop

project = _project_root()
if stop(project):
console.print("[dim]Sent SIGTERM to Dask scheduler.[/dim]")


# Register eval subgroup (requires optional 'eval' extra)
try:
from lightcone.eval.cli import eval_group
Expand Down
202 changes: 29 additions & 173 deletions src/lightcone/engine/dask_cluster.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
# mypy: disable-error-code="no-untyped-call"
"""Cluster lifecycle for ``lc run``.
"""Cluster connection point for ``lc run``.

One context manager, three branches:
Two branches:

- ``DASK_SCHEDULER_ADDRESS`` is already set → yield it as-is. We don't own
the cluster, so we don't tear it down.
- ``SLURM_JOB_ID`` is set → start an in-process scheduler via
``LocalCluster(n_workers=0)``, then ``srun`` one ``dask worker`` per node
across the allocation. Workers advertise the node's full resources;
per-rule ``threads`` / ``mem_mb`` / ``gpus`` map to per-task constraints.
- Neither → ``LocalCluster()`` sized to the local machine.
- ``DASK_SCHEDULER_ADDRESS`` set → use it as-is. We don't own the
cluster, so we don't tear it down. Kept as the escape hatch / CI
override / "user has their own scheduler" path.
- Otherwise → :func:`lightcone.engine.dask_daemon.ensure_scheduler`
returns the address of a session-scoped scheduler (spawning the
daemon if needed). The daemon outlives any single ``lc run`` so
successive runs in the same Claude session reuse it.

The scheduler is always in-process (driven by ``lc run`` itself) so its
lifetime equals the run's lifetime — no service to manage, no orphaned
schedulers if the driver crashes.
The node-shape helpers and resource-key constants live here because
both the daemon (when it spins up a cluster) and the executor plugin
(when it requests per-task resources) consume them.
"""

from __future__ import annotations

import logging
import os
import shutil
import socket
import subprocess
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path

# Resource keys advertised by workers and requested per-task. These strings
# form a contract between the worker bootstrap (here) and the executor plugin
# (snakemake_executor_plugin_dask.executor). Dask matches by string equality.
# form a contract between the worker bootstrap (in :mod:`dask_daemon`) and
# the executor plugin (snakemake_executor_plugin_dask.executor). Dask
# matches by string equality.
RESOURCE_CPUS = "cpus"
RESOURCE_MEMORY = "memory"
RESOURCE_GPUS = "gpus"
Expand Down Expand Up @@ -67,9 +65,9 @@ def _resource_dict(shape: _NodeShape) -> dict[str, float]:
"""Resource keys advertised by a worker for this node shape.

Single source of truth for which keys workers expose — both the
in-process LocalCluster and the srun-launched ``dask worker``s
advertise the same set so the executor's per-task requests resolve
on either path.
laptop ``LocalCluster`` and the srun-launched ``dask worker``s
advertise the same set so the executor's per-task requests
resolve on either path.
"""
res: dict[str, float] = {RESOURCE_CPUS: float(shape.cpus)}
if shape.mem_bytes:
Expand All @@ -80,173 +78,31 @@ def _resource_dict(shape: _NodeShape) -> dict[str, float]:


def _resources_arg(shape: _NodeShape) -> str:
"""Format `--resources` for `dask worker`."""
"""Format ``--resources`` for ``dask worker``."""
return " ".join(f"{k}={int(v)}" for k, v in _resource_dict(shape).items())


@contextmanager
def cluster_for_run(
*,
project_path: Path,
verbose: bool = False,
local_directory: str | None = None,
) -> Iterator[str]:
"""Yield a Dask scheduler address valid for the duration of `lc run`.
"""Yield a Dask scheduler address for the duration of one ``lc run``.

*local_directory*, when given, is where dask workers stage their
spilled task data and internal state files. ``lc run`` resolves it
to a path under :mod:`lightcone.engine.scratch` so on NERSC the
spill lands on Lustre instead of DVS-mounted home/CFS (where small-
file I/O is slow and can pressure the gateway nodes).
The scheduler outlives the run — see :mod:`lightcone.engine.dask_daemon`.
No teardown happens here; the daemon self-shuts on idle-timeout, or
on SIGTERM from the SessionEnd hook.
"""
if addr := os.environ.get("DASK_SCHEDULER_ADDRESS"):
if verbose:
print(f"→ Using existing Dask scheduler at {addr}")
yield addr
return

if "SLURM_JOB_ID" in os.environ:
with _slurm_backed_cluster(
verbose=verbose, local_directory=local_directory
) as addr:
yield addr
return

with _local_cluster(
verbose=verbose, local_directory=local_directory
) as addr:
yield addr


@contextmanager
def _local_cluster(
*, verbose: bool, local_directory: str | None
) -> Iterator[str]:
from dask.distributed import LocalCluster

shape = _detect_node_shape()
# Workers must advertise every key the executor may request — Dask
# matches by exact key presence — or rules with ``mem_mb`` /
# ``gpus_per_task`` would never schedule on a workstation.
cluster = LocalCluster(
n_workers=1,
threads_per_worker=shape.cpus,
resources=_resource_dict(shape),
dashboard_address=":0",
local_directory=local_directory,
silence_logs=logging.INFO if verbose else logging.WARNING,
)
if verbose:
print(
f"→ Local Dask cluster ({shape.cpus} threads); "
f"scheduler at {cluster.scheduler_address}"
)
try:
yield cluster.scheduler_address
finally:
cluster.close()


@contextmanager
def _slurm_backed_cluster(
*, verbose: bool, local_directory: str | None
) -> Iterator[str]:
from dask.distributed import LocalCluster

if shutil.which("dask") is None:
raise RuntimeError(
"`dask` CLI is not on PATH inside the SLURM allocation. "
"Install lightcone-cli (and its `distributed` dep) into the "
"environment activated by your sbatch/salloc."
)

shape = _detect_node_shape()
nnodes = int(os.environ.get("SLURM_NNODES") or 1)

# Default LocalCluster binds the scheduler to 127.0.0.1, which workers
# on remote nodes cannot reach. Bind to the driver's hostname so srun-
# launched workers across the allocation can connect. SLURMD_NODENAME
# is the SLURM-canonical name; gethostname() is a sane fallback.
scheduler_host = os.environ.get("SLURMD_NODENAME") or socket.gethostname()
cluster = LocalCluster(
n_workers=0,
host=scheduler_host,
dashboard_address=":0",
local_directory=local_directory,
silence_logs=logging.INFO if verbose else logging.WARNING,
)
addr = cluster.scheduler_address
from lightcone.engine.dask_daemon import ensure_scheduler

addr = ensure_scheduler(project_path)
if verbose:
print(
f"→ SLURM allocation detected ({nnodes} node(s), "
f"{shape.cpus} cpu/node, {shape.gpus} gpu/node); "
f"launching workers via srun. Scheduler: {addr}"
)

worker_cmd = [
"srun",
f"--ntasks={nnodes}",
"--ntasks-per-node=1",
"dask",
"worker",
addr,
"--nthreads",
str(shape.cpus),
"--nworkers",
"1",
"--resources",
_resources_arg(shape),
"--no-dashboard",
# Each srun task is a single run-scoped worker; an auto-restart
# nanny adds no value (srun won't relaunch the task either) and
# logs "Worker process died unexpectedly" when retire_workers
# asks the worker to exit on shutdown.
"--no-nanny",
]
if local_directory:
worker_cmd.extend(["--local-directory", local_directory])
# Hide the worker's INFO-level connection chatter (Nanny start,
# scheduler registration, etc.) — useful only when debugging the
# cluster itself. WARNING+ still surface real issues. The newer
# `dask worker` CLI dropped `--silence-logs`, so we drive it via
# Dask's config env var instead; srun inherits env by default.
worker_env = dict(os.environ)
if not verbose:
worker_env.setdefault("DASK_LOGGING__DISTRIBUTED", "warning")
workers = subprocess.Popen(worker_cmd, env=worker_env)

try:
from dask.distributed import Client

client = Client(addr)
try:
client.wait_for_workers(n_workers=nnodes, timeout=120)
if verbose:
print(f"→ {nnodes} dask worker(s) registered.")
finally:
client.close()
yield addr
finally:
# Graceful shutdown: ask the scheduler to retire workers so each
# `dask worker` process exits on its own. srun then sees its task
# exit with code 0 and terminates silently. SIGTERM-ing srun
# directly (the prior path) prints "srun: forcing job
# termination" / "task 0: Killed" to stderr on every clean run.
try:
client = Client(addr, timeout="10s")
try:
client.retire_workers(close_workers=True, remove=True)
finally:
client.close()
except Exception:
pass
try:
workers.wait(timeout=20)
except subprocess.TimeoutExpired:
workers.terminate()
try:
workers.wait(timeout=10)
except subprocess.TimeoutExpired:
workers.kill()
workers.wait()
cluster.close()
print(f"→ Reusing session scheduler at {addr}")
yield addr
Loading
Loading