diff --git a/cycling_utils/saving.py b/cycling_utils/saving.py index 93e0c28..d852749 100644 --- a/cycling_utils/saving.py +++ b/cycling_utils/saving.py @@ -61,6 +61,13 @@ class AtomicDirectory: - `strategy = "sync_any"` (default) will `force_save` the checkpoint if ANY process passes `force_save = True` - `strategy = "sync_all"` will `force_save` the checkpoint if and only if ALL processes pass `force_save = True` - `strategy = "async"` will `force_save` the checkpoint if the saving process passes `force_save = True` + - `strategy = "mono"` will `force_save` the checkpoint if the saving process passes `force_save = True` + + Each of the strategies "sync_any", "sync_all", and "async" + + Further, the `strategy = "mono"` argument should be passed if the AtomicDirectory saver is intended for use outside of a + torchrun distributed process group. In such cases, the user must ensure that all instances of the AtomicDirectory saver are + initialized with a unique 'name'. Example usage of AtomicDirectory in synchronous mode on the Strong Compute ISC launching with torchrun as follows. @@ -118,33 +125,42 @@ def __init__( self.output_directory = output_directory self.is_master = is_master self.keep_last = keep_last - self.rank = os.environ["RANK"] + self.strategy = strategy + self.rank = os.getenv("RANK", "NONE") + self.world_size = os.getenv("WORLD_SIZE", "NONE") # make sure all processes have been initialized with the same - strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2} + strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2, "mono": 3} if strategy in strategy_map: strategy_int = strategy_map[strategy] else: - raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', or 'async' but rank \ - {os.environ['RANK']} was passed '{strategy}'." + raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', 'async', or 'mono' but rank \ + {self.rank} was passed '{strategy}'." - local_strategy_tensor = torch.tensor( - strategy_int, dtype=torch.int64, requires_grad=False, device="cuda" - ) - global_strategy_list = [ - torch.zeros(1, dtype=torch.int64, requires_grad=False, device="cuda") - for _ in range(int(os.environ["WORLD_SIZE"])) - ] - all_gather(global_strategy_list, local_strategy_tensor) - unique_global_strategy_ints = set([t.item() for t in global_strategy_list]) - assert ( - len(unique_global_strategy_ints) == 1 - ), "ERROR: AtomicDirectory savers initialized with different strategies." + if strategy != "mono": - self.strategy = strategy + assert ( + self.rank != "NONE" + ), "ERROR: AtomicDirectory requires RANK environment variable set if strategy is not 'mono'." + assert ( + self.world_size != "NONE" + ), "ERROR: AtomicDirectory requires WORLD_SIZE environment variable set if strategy is not 'mono'." - if self.strategy == "async": - self.name = name + f"_rank_{os.environ['RANK']}" + local_strategy_tensor = torch.tensor( + strategy_int, dtype=torch.int64, requires_grad=False, device="cuda" + ) + global_strategy_list = [ + torch.zeros(1, dtype=torch.int64, requires_grad=False, device="cuda") + for _ in range(int(self.world_size)) + ] + all_gather(global_strategy_list, local_strategy_tensor) + unique_global_strategy_ints = set([t.item() for t in global_strategy_list]) + assert ( + len(unique_global_strategy_ints) == 1 + ), "ERROR: AtomicDirectory savers initialized with different strategies." + + if strategy == "async": + self.name = name + f"_rank_{self.rank}" else: self.name = name @@ -159,7 +175,7 @@ def __init__( raise e def is_checkpoint_directory(self, path_str): - pattern = r"checkpoint_(\d+)(_force)?$" + pattern = r"_checkpoint_(\d+)(_force)?$" path = Path(path_str) match = re.search(pattern, path.name) if ( @@ -264,12 +280,12 @@ def prepare_checkpoint_directory(self, force_save=False): effective_force_save = False if (global_force.item() > 0 and self.strategy == "sync_any") or ( - global_force.item() == int(os.environ["WORLD_SIZE"]) + global_force.item() == int(self.world_size) and self.strategy == "sync_all" ): effective_force_save = True - elif self.strategy == "async": + else: # self.strategy in ["async", "mono"] effective_force_save = force_save if effective_force_save: diff --git a/pyproject.toml b/pyproject.toml index cd14135..c71059d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cycling_utils" -version = "0.0.2" +version = "0.0.3" description = "Utilities to assist with distributed deep learning on Strong Compute." readme = "README.md" requires-python = ">= 3.10"