From 9f919ec8c95c9cce14b72ed3c30e4451fa186616 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Sun, 10 Aug 2025 17:42:24 +1000 Subject: [PATCH 1/4] feat: readiness check --- cycling_utils/__init__.py | 2 ++ cycling_utils/readiness.py | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 cycling_utils/readiness.py diff --git a/cycling_utils/__init__.py b/cycling_utils/__init__.py index f594a8a..035ab18 100644 --- a/cycling_utils/__init__.py +++ b/cycling_utils/__init__.py @@ -7,6 +7,7 @@ from .datasets import DistributedShardedDataset from .saving import AtomicDirectory, atomic_torch_save from .timing import TimestampedTimer +from .readiness import torch_distributed_readiness __all__ = [ "InterruptableDistributedSampler", @@ -17,4 +18,5 @@ "AtomicDirectory", "MetricsTracker", "TimestampedTimer", + "torch_distributed_readiness", ] diff --git a/cycling_utils/readiness.py b/cycling_utils/readiness.py new file mode 100644 index 0000000..77168ab --- /dev/null +++ b/cycling_utils/readiness.py @@ -0,0 +1,18 @@ +import os +import torch +import torch.distributed as dist + + +def torch_distributed_readiness(): + dist.init_process_group("nccl") + global_rank = int(os.environ["RANK"]) + local_rank = int(os.environ["LOCAL_RANK"]) + world_size = int(os.environ["WORLD_SIZE"]) + world_size_check = torch.tensor([global_rank + 1], device=local_rank) + dist.all_reduce(world_size_check) + all_reduce_result = world_size_check.item() + expected_result = world_size * (world_size + 1) / 2 + assert ( + all_reduce_result == expected_result + ), f"All reduce check failed, expected_result: {expected_result}, all_reduce_result: {all_reduce_result}" + dist.destroy_process_group() From 49bd8ef8f432bca7fd0713e9b85148b3be5bfc95 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Sun, 10 Aug 2025 18:06:43 +1000 Subject: [PATCH 2/4] chore: docs --- cycling_utils/readiness.py | 56 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/cycling_utils/readiness.py b/cycling_utils/readiness.py index 77168ab..467e4fe 100644 --- a/cycling_utils/readiness.py +++ b/cycling_utils/readiness.py @@ -16,3 +16,59 @@ def torch_distributed_readiness(): all_reduce_result == expected_result ), f"All reduce check failed, expected_result: {expected_result}, all_reduce_result: {all_reduce_result}" dist.destroy_process_group() + + +""" Useage: +Alongside user project files, create two additional files (examples below): + +1. dist_readiness_check.py +2. torch_dist_readiness_check.sh + +Then in the experiment launch file, after sourcing venv and before launching torchrun, insert: + +"bash /path/to/torch_dist_readiness_check.sh &&" + +The idea here is that the shell script essentially launches a mini project with torchrun before the +main project starts. The mini project initializes a process group and completes an all-reduce. +Upon failure of the mini-project, it will re-try up to 5 times before failing completely. + +The hypothesis here is that something in torch may not yet be ready when torchrun starts, so we +run the mini project to catch this failure if it's going to happen. + +# - dist_readiness_check.py - # + +from cycling_utils import torch_distributed_readiness +torch_distributed_readiness() + +# - torch_dist_readiness_check.sh - # + +#!/bin/bash + +MAX_RETRIES=5 +RETRY_COUNT=0 + +# Function to run the Python script +run_python_script() { + torchrun --nnodes=$NNODES --nproc-per-node=$N_PROC --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT --node_rank=$RANK /path/to/dist_readiness_check.py + return $? +} + +# Attempt to run the Python script with retries +while [ $RETRY_COUNT -lt $MAX_RETRIES ]; do + echo "Attempt $((RETRY_COUNT + 1)) of $MAX_RETRIES" + if run_python_script; then + echo "Python script executed successfully" + exit 0 + else + RETRY_COUNT=$((RETRY_COUNT + 1)) + if [ $RETRY_COUNT -lt $MAX_RETRIES ]; then + echo "Python script failed. Retrying in 5 seconds..." + sleep 5 + fi + fi +done + +# If we reached here, all attempts failed +echo "Error: Python script failed after $MAX_RETRIES attempts" >&2 +exit 1 +""" From 1fe604192a64b2234988e81e405a59261e81f0b2 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Sun, 10 Aug 2025 19:25:23 +1000 Subject: [PATCH 3/4] chore: tidy --- cycling_utils/readiness.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cycling_utils/readiness.py b/cycling_utils/readiness.py index 467e4fe..2fddef1 100644 --- a/cycling_utils/readiness.py +++ b/cycling_utils/readiness.py @@ -4,6 +4,7 @@ def torch_distributed_readiness(): + assert torch.cuda.is_available(), "CUDA not available." dist.init_process_group("nccl") global_rank = int(os.environ["RANK"]) local_rank = int(os.environ["LOCAL_RANK"]) @@ -21,12 +22,12 @@ def torch_distributed_readiness(): """ Useage: Alongside user project files, create two additional files (examples below): -1. dist_readiness_check.py -2. torch_dist_readiness_check.sh +1. readiness_check.py +2. readiness_check.sh Then in the experiment launch file, after sourcing venv and before launching torchrun, insert: -"bash /path/to/torch_dist_readiness_check.sh &&" +"bash /path/to/readiness_check.sh &&" The idea here is that the shell script essentially launches a mini project with torchrun before the main project starts. The mini project initializes a process group and completes an all-reduce. @@ -35,12 +36,12 @@ def torch_distributed_readiness(): The hypothesis here is that something in torch may not yet be ready when torchrun starts, so we run the mini project to catch this failure if it's going to happen. -# - dist_readiness_check.py - # +# - readiness_check.py - # from cycling_utils import torch_distributed_readiness torch_distributed_readiness() -# - torch_dist_readiness_check.sh - # +# - readiness_check.sh - # #!/bin/bash @@ -49,7 +50,7 @@ def torch_distributed_readiness(): # Function to run the Python script run_python_script() { - torchrun --nnodes=$NNODES --nproc-per-node=$N_PROC --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT --node_rank=$RANK /path/to/dist_readiness_check.py + torchrun --nnodes=$NNODES --nproc-per-node=$N_PROC --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT --node_rank=$RANK /path/to/readiness_check.py return $? } From 115aeae11563e95c34c90b445dc529439a51a062 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Mon, 11 Aug 2025 08:05:06 +1000 Subject: [PATCH 4/4] chore: better docs --- cycling_utils/readiness.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/cycling_utils/readiness.py b/cycling_utils/readiness.py index 2fddef1..825de15 100644 --- a/cycling_utils/readiness.py +++ b/cycling_utils/readiness.py @@ -25,9 +25,24 @@ def torch_distributed_readiness(): 1. readiness_check.py 2. readiness_check.sh -Then in the experiment launch file, after sourcing venv and before launching torchrun, insert: - -"bash /path/to/readiness_check.sh &&" +Then in the experiment launch file, after sourcing venv and before launching torchrun, insert +"bash /path/to/readiness_check.sh &&" as follows + +# - fashion_mnist.isc - # + +isc_project_id = "" +experiment_name = "fashion_mnist" +gpus = 16 +compute_mode = "burst" +dataset_id_list = ["uds-decorous-field-baritone-250513"] +command = ''' +source /root/.fashion/bin/activate && +bash /path/to/readiness_check.sh && +torchrun --nnodes=$NNODES --nproc-per-node=$N_PROC +--master_addr=$MASTER_ADDR --master_port=$MASTER_PORT --node_rank=$RANK +/root/isc-demos/fashion_mnist/train.py +--dataset-id uds-decorous-field-baritone-250513 +--lr 0.001 --batch-size 16''' The idea here is that the shell script essentially launches a mini project with torchrun before the main project starts. The mini project initializes a process group and completes an all-reduce.