From 864fac98be68fa79d4d24b0c0ccfc02f35ae3819 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 11:02:22 +1100 Subject: [PATCH 01/11] feat: offline strategy --- cycling_utils/saving.py | 56 ++++++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/cycling_utils/saving.py b/cycling_utils/saving.py index 93e0c28..604dd36 100644 --- a/cycling_utils/saving.py +++ b/cycling_utils/saving.py @@ -61,6 +61,13 @@ class AtomicDirectory: - `strategy = "sync_any"` (default) will `force_save` the checkpoint if ANY process passes `force_save = True` - `strategy = "sync_all"` will `force_save` the checkpoint if and only if ALL processes pass `force_save = True` - `strategy = "async"` will `force_save` the checkpoint if the saving process passes `force_save = True` + - `strategy = "offline"` will `force_save` the checkpoint if the saving process passes `force_save = True` + + Each of the strategies "sync_any", "sync_all", and "async" + + Further, the `strategy = "offline"` argument should be passed if the AtomicDirectory saver is intended for use outside of a + torchrun distributed process group. In such cases, the user must ensure that all instances of the AtomicDirectory saver are + initialized with a unique 'name'. Example usage of AtomicDirectory in synchronous mode on the Strong Compute ISC launching with torchrun as follows. @@ -118,32 +125,41 @@ def __init__( self.output_directory = output_directory self.is_master = is_master self.keep_last = keep_last - self.rank = os.environ["RANK"] + self.strategy = strategy + self.rank = os.getenv("RANK", "NONE") + self.world_size = os.getenv("WORLD_SIZE", "NONE") # make sure all processes have been initialized with the same - strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2} + strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2, "offline": 3} if strategy in strategy_map: strategy_int = strategy_map[strategy] else: - raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', or 'async' but rank \ - {os.environ['RANK']} was passed '{strategy}'." + raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', 'async', or 'offline' but rank \ + {self.rank} was passed '{strategy}'." - local_strategy_tensor = torch.tensor( - strategy_int, dtype=torch.int64, requires_grad=False, device="cuda" - ) - global_strategy_list = [ - torch.zeros(1, dtype=torch.int64, requires_grad=False, device="cuda") - for _ in range(int(os.environ["WORLD_SIZE"])) - ] - all_gather(global_strategy_list, local_strategy_tensor) - unique_global_strategy_ints = set([t.item() for t in global_strategy_list]) - assert ( - len(unique_global_strategy_ints) == 1 - ), "ERROR: AtomicDirectory savers initialized with different strategies." + if strategy != "offline": - self.strategy = strategy + assert ( + self.rank != "NONE" + ), "ERROR: AtomicDirectory requires RANK environment variable set if strategy is not 'offline'." + assert ( + self.world_size != "NONE" + ), "ERROR: AtomicDirectory requires WORLD_SIZE environment variable set if strategy is not 'offline'." + + local_strategy_tensor = torch.tensor( + strategy_int, dtype=torch.int64, requires_grad=False, device="cuda" + ) + global_strategy_list = [ + torch.zeros(1, dtype=torch.int64, requires_grad=False, device="cuda") + for _ in range(int(os.environ["WORLD_SIZE"])) + ] + all_gather(global_strategy_list, local_strategy_tensor) + unique_global_strategy_ints = set([t.item() for t in global_strategy_list]) + assert ( + len(unique_global_strategy_ints) == 1 + ), "ERROR: AtomicDirectory savers initialized with different strategies." - if self.strategy == "async": + if strategy == "async": self.name = name + f"_rank_{os.environ['RANK']}" else: self.name = name @@ -159,7 +175,7 @@ def __init__( raise e def is_checkpoint_directory(self, path_str): - pattern = r"checkpoint_(\d+)(_force)?$" + pattern = r"_checkpoint_(\d+)(_force)?$" path = Path(path_str) match = re.search(pattern, path.name) if ( @@ -269,7 +285,7 @@ def prepare_checkpoint_directory(self, force_save=False): ): effective_force_save = True - elif self.strategy == "async": + elif self.strategy in ["async", "offline"]: effective_force_save = force_save if effective_force_save: From a8869627034b6f340b20ce5315158ab4ca05d3b2 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 11:12:14 +1100 Subject: [PATCH 02/11] fix: editable install support --- pyproject.toml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index cd14135..6b65e89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,4 +11,10 @@ requires-python = ">= 3.10" authors = [ {name = "Adam Peaston", email = "adam@strongcompute.com"}, {name = "Fennecs Swaddle", email = "fennecs@strongcompute.com"} -] \ No newline at end of file +] + +[tool.setuptools] +package-dir = {"" = "cycling_utils"} + +[tool.setuptools.packages.find] +where = ["cycling_utils"] From 7b3e49c863df89b5ac544b5b053f6f90c0c8f798 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 11:22:23 +1100 Subject: [PATCH 03/11] wip: import issues --- cycling_utils/saving.py | 14 +++++++------- pyproject.toml | 5 +---- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/cycling_utils/saving.py b/cycling_utils/saving.py index 604dd36..177bd99 100644 --- a/cycling_utils/saving.py +++ b/cycling_utils/saving.py @@ -2,15 +2,13 @@ import re from pathlib import Path from shutil import rmtree -import torch -from torch.distributed import barrier, all_reduce, all_gather -def atomic_torch_save(obj, f: str | Path, **kwargs): - f = str(f) - temp_f = f + ".temp" - torch.save(obj, temp_f, **kwargs) - os.replace(temp_f, f) +# def atomic_torch_save(obj, f: str | Path, **kwargs): +# f = str(f) +# temp_f = f + ".temp" +# torch.save(obj, temp_f, **kwargs) +# os.replace(temp_f, f) class AtomicDirectory: @@ -138,6 +136,8 @@ def __init__( {self.rank} was passed '{strategy}'." if strategy != "offline": + import torch + from torch.distributed import barrier, all_reduce, all_gather assert ( self.rank != "NONE" diff --git a/pyproject.toml b/pyproject.toml index 6b65e89..56e6bfc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,8 +13,5 @@ authors = [ {name = "Fennecs Swaddle", email = "fennecs@strongcompute.com"} ] -[tool.setuptools] -package-dir = {"" = "cycling_utils"} - [tool.setuptools.packages.find] -where = ["cycling_utils"] +where = ["."] From 28ca4185563de1d84151becd90182d31d345bc8b Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 11:28:01 +1100 Subject: [PATCH 04/11] fix: just import torch --- cycling_utils/saving.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cycling_utils/saving.py b/cycling_utils/saving.py index 177bd99..604dd36 100644 --- a/cycling_utils/saving.py +++ b/cycling_utils/saving.py @@ -2,13 +2,15 @@ import re from pathlib import Path from shutil import rmtree +import torch +from torch.distributed import barrier, all_reduce, all_gather -# def atomic_torch_save(obj, f: str | Path, **kwargs): -# f = str(f) -# temp_f = f + ".temp" -# torch.save(obj, temp_f, **kwargs) -# os.replace(temp_f, f) +def atomic_torch_save(obj, f: str | Path, **kwargs): + f = str(f) + temp_f = f + ".temp" + torch.save(obj, temp_f, **kwargs) + os.replace(temp_f, f) class AtomicDirectory: @@ -136,8 +138,6 @@ def __init__( {self.rank} was passed '{strategy}'." if strategy != "offline": - import torch - from torch.distributed import barrier, all_reduce, all_gather assert ( self.rank != "NONE" From 97aea98a42b36867440d70cc8021f0c4c0b34115 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 12:16:28 +1100 Subject: [PATCH 05/11] fix: properly do stuff --- pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 56e6bfc..12f13f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,3 +15,5 @@ authors = [ [tool.setuptools.packages.find] where = ["."] + +[tool.setuptools_scm] From 52f82ed285b07ce7c7c8fb910fd54180abf5e7b6 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 12:31:19 +1100 Subject: [PATCH 06/11] fix: please work please --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 12f13f9..a0f3e3b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools", "setuptools-scm"] +requires = ["setuptools>=61", "setuptools-scm"] build-backend = "setuptools.build_meta" [project] From cd6dba2eab5fd4d1470659c9bf428fb72e0c45eb Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 12:42:47 +1100 Subject: [PATCH 07/11] fix: yeah look I give up --- pyproject.toml | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a0f3e3b..cd14135 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools>=61", "setuptools-scm"] +requires = ["setuptools", "setuptools-scm"] build-backend = "setuptools.build_meta" [project] @@ -11,9 +11,4 @@ requires-python = ">= 3.10" authors = [ {name = "Adam Peaston", email = "adam@strongcompute.com"}, {name = "Fennecs Swaddle", email = "fennecs@strongcompute.com"} -] - -[tool.setuptools.packages.find] -where = ["."] - -[tool.setuptools_scm] +] \ No newline at end of file From 5606758e3c67f0a7964bae5f4c68f36ef6a3f94c Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 12:46:42 +1100 Subject: [PATCH 08/11] fix: bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index cd14135..c71059d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cycling_utils" -version = "0.0.2" +version = "0.0.3" description = "Utilities to assist with distributed deep learning on Strong Compute." readme = "README.md" requires-python = ">= 3.10" From 54572b6f7a1b11b3d81576e7096969eb825d98f9 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 13:38:28 +1100 Subject: [PATCH 09/11] fix: env vars stored --- cycling_utils/saving.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cycling_utils/saving.py b/cycling_utils/saving.py index 604dd36..0efd016 100644 --- a/cycling_utils/saving.py +++ b/cycling_utils/saving.py @@ -151,7 +151,7 @@ def __init__( ) global_strategy_list = [ torch.zeros(1, dtype=torch.int64, requires_grad=False, device="cuda") - for _ in range(int(os.environ["WORLD_SIZE"])) + for _ in range(int(self.world_size)) ] all_gather(global_strategy_list, local_strategy_tensor) unique_global_strategy_ints = set([t.item() for t in global_strategy_list]) @@ -160,7 +160,7 @@ def __init__( ), "ERROR: AtomicDirectory savers initialized with different strategies." if strategy == "async": - self.name = name + f"_rank_{os.environ['RANK']}" + self.name = name + f"_rank_{self.rank}" else: self.name = name @@ -280,7 +280,7 @@ def prepare_checkpoint_directory(self, force_save=False): effective_force_save = False if (global_force.item() > 0 and self.strategy == "sync_any") or ( - global_force.item() == int(os.environ["WORLD_SIZE"]) + global_force.item() == int(self.world_size) and self.strategy == "sync_all" ): effective_force_save = True From 62fcd5861bfdb5867c2d8b4621b89e9bedbeaeeb Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 13:43:24 +1100 Subject: [PATCH 10/11] fix: simplify case --- cycling_utils/saving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cycling_utils/saving.py b/cycling_utils/saving.py index 0efd016..2b6a1e3 100644 --- a/cycling_utils/saving.py +++ b/cycling_utils/saving.py @@ -285,7 +285,7 @@ def prepare_checkpoint_directory(self, force_save=False): ): effective_force_save = True - elif self.strategy in ["async", "offline"]: + else: # self.strategy in ["async", "offline"] effective_force_save = force_save if effective_force_save: From 498b6285af7baf6469784788315ad482deb89674 Mon Sep 17 00:00:00 2001 From: StrongAdam Date: Wed, 15 Oct 2025 13:57:19 +1100 Subject: [PATCH 11/11] fix: mono --- cycling_utils/saving.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cycling_utils/saving.py b/cycling_utils/saving.py index 2b6a1e3..d852749 100644 --- a/cycling_utils/saving.py +++ b/cycling_utils/saving.py @@ -61,11 +61,11 @@ class AtomicDirectory: - `strategy = "sync_any"` (default) will `force_save` the checkpoint if ANY process passes `force_save = True` - `strategy = "sync_all"` will `force_save` the checkpoint if and only if ALL processes pass `force_save = True` - `strategy = "async"` will `force_save` the checkpoint if the saving process passes `force_save = True` - - `strategy = "offline"` will `force_save` the checkpoint if the saving process passes `force_save = True` + - `strategy = "mono"` will `force_save` the checkpoint if the saving process passes `force_save = True` Each of the strategies "sync_any", "sync_all", and "async" - Further, the `strategy = "offline"` argument should be passed if the AtomicDirectory saver is intended for use outside of a + Further, the `strategy = "mono"` argument should be passed if the AtomicDirectory saver is intended for use outside of a torchrun distributed process group. In such cases, the user must ensure that all instances of the AtomicDirectory saver are initialized with a unique 'name'. @@ -130,21 +130,21 @@ def __init__( self.world_size = os.getenv("WORLD_SIZE", "NONE") # make sure all processes have been initialized with the same - strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2, "offline": 3} + strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2, "mono": 3} if strategy in strategy_map: strategy_int = strategy_map[strategy] else: - raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', 'async', or 'offline' but rank \ + raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', 'async', or 'mono' but rank \ {self.rank} was passed '{strategy}'." - if strategy != "offline": + if strategy != "mono": assert ( self.rank != "NONE" - ), "ERROR: AtomicDirectory requires RANK environment variable set if strategy is not 'offline'." + ), "ERROR: AtomicDirectory requires RANK environment variable set if strategy is not 'mono'." assert ( self.world_size != "NONE" - ), "ERROR: AtomicDirectory requires WORLD_SIZE environment variable set if strategy is not 'offline'." + ), "ERROR: AtomicDirectory requires WORLD_SIZE environment variable set if strategy is not 'mono'." local_strategy_tensor = torch.tensor( strategy_int, dtype=torch.int64, requires_grad=False, device="cuda" @@ -285,7 +285,7 @@ def prepare_checkpoint_directory(self, force_save=False): ): effective_force_save = True - else: # self.strategy in ["async", "offline"] + else: # self.strategy in ["async", "mono"] effective_force_save = force_save if effective_force_save: