Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions cycling_utils/saving.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ class AtomicDirectory:
- `strategy = "sync_any"` (default) will `force_save` the checkpoint if ANY process passes `force_save = True`
- `strategy = "sync_all"` will `force_save` the checkpoint if and only if ALL processes pass `force_save = True`
- `strategy = "async"` will `force_save` the checkpoint if the saving process passes `force_save = True`
- `strategy = "mono"` will `force_save` the checkpoint if the saving process passes `force_save = True`

Each of the strategies "sync_any", "sync_all", and "async"

Further, the `strategy = "mono"` argument should be passed if the AtomicDirectory saver is intended for use outside of a
torchrun distributed process group. In such cases, the user must ensure that all instances of the AtomicDirectory saver are
initialized with a unique 'name'.

Example usage of AtomicDirectory in synchronous mode on the Strong Compute ISC launching with torchrun as follows.

Expand Down Expand Up @@ -118,33 +125,42 @@ def __init__(
self.output_directory = output_directory
self.is_master = is_master
self.keep_last = keep_last
self.rank = os.environ["RANK"]
self.strategy = strategy
self.rank = os.getenv("RANK", "NONE")
self.world_size = os.getenv("WORLD_SIZE", "NONE")

# make sure all processes have been initialized with the same
strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2}
strategy_map = {"sync_any": 0, "sync_all": 1, "async": 2, "mono": 3}
if strategy in strategy_map:
strategy_int = strategy_map[strategy]
else:
raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', or 'async' but rank \
{os.environ['RANK']} was passed '{strategy}'."
raise f"ERROR: AtomicDirectory saver must be initialized with strategy = 'sync_any', 'sync_all', 'async', or 'mono' but rank \
{self.rank} was passed '{strategy}'."

local_strategy_tensor = torch.tensor(
strategy_int, dtype=torch.int64, requires_grad=False, device="cuda"
)
global_strategy_list = [
torch.zeros(1, dtype=torch.int64, requires_grad=False, device="cuda")
for _ in range(int(os.environ["WORLD_SIZE"]))
]
all_gather(global_strategy_list, local_strategy_tensor)
unique_global_strategy_ints = set([t.item() for t in global_strategy_list])
assert (
len(unique_global_strategy_ints) == 1
), "ERROR: AtomicDirectory savers initialized with different strategies."
if strategy != "mono":

self.strategy = strategy
assert (
self.rank != "NONE"
), "ERROR: AtomicDirectory requires RANK environment variable set if strategy is not 'mono'."
assert (
self.world_size != "NONE"
), "ERROR: AtomicDirectory requires WORLD_SIZE environment variable set if strategy is not 'mono'."

if self.strategy == "async":
self.name = name + f"_rank_{os.environ['RANK']}"
local_strategy_tensor = torch.tensor(
strategy_int, dtype=torch.int64, requires_grad=False, device="cuda"
)
global_strategy_list = [
torch.zeros(1, dtype=torch.int64, requires_grad=False, device="cuda")
for _ in range(int(self.world_size))
]
all_gather(global_strategy_list, local_strategy_tensor)
unique_global_strategy_ints = set([t.item() for t in global_strategy_list])
assert (
len(unique_global_strategy_ints) == 1
), "ERROR: AtomicDirectory savers initialized with different strategies."

if strategy == "async":
self.name = name + f"_rank_{self.rank}"
else:
self.name = name

Expand All @@ -159,7 +175,7 @@ def __init__(
raise e

def is_checkpoint_directory(self, path_str):
pattern = r"checkpoint_(\d+)(_force)?$"
pattern = r"_checkpoint_(\d+)(_force)?$"
path = Path(path_str)
match = re.search(pattern, path.name)
if (
Expand Down Expand Up @@ -264,12 +280,12 @@ def prepare_checkpoint_directory(self, force_save=False):

effective_force_save = False
if (global_force.item() > 0 and self.strategy == "sync_any") or (
global_force.item() == int(os.environ["WORLD_SIZE"])
global_force.item() == int(self.world_size)
and self.strategy == "sync_all"
):
effective_force_save = True

elif self.strategy == "async":
else: # self.strategy in ["async", "mono"]
effective_force_save = force_save

if effective_force_save:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "cycling_utils"
version = "0.0.2"
version = "0.0.3"
description = "Utilities to assist with distributed deep learning on Strong Compute."
readme = "README.md"
requires-python = ">= 3.10"
Expand Down