-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_distributed.py
More file actions
190 lines (149 loc) · 7.1 KB
/
test_distributed.py
File metadata and controls
190 lines (149 loc) · 7.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""Unit tests for distributed training configuration (DeepSpeed/FSDP)."""
import json
import os
import yaml
from forgelm.config import (
DistributedConfig,
ForgeConfig,
load_config,
)
# --- DistributedConfig ---
class TestDistributedConfig:
def test_defaults(self):
d = DistributedConfig()
assert d.strategy is None
assert d.deepspeed_config is None
assert d.fsdp_strategy == "full_shard"
assert d.fsdp_auto_wrap is True
assert d.fsdp_offload is False
def test_deepspeed_strategy(self):
d = DistributedConfig(strategy="deepspeed", deepspeed_config="zero2")
assert d.strategy == "deepspeed"
assert d.deepspeed_config == "zero2"
def test_fsdp_strategy(self):
d = DistributedConfig(strategy="fsdp", fsdp_strategy="shard_grad_op")
assert d.strategy == "fsdp"
assert d.fsdp_strategy == "shard_grad_op"
def test_fsdp_with_offload(self):
d = DistributedConfig(strategy="fsdp", fsdp_offload=True)
assert d.fsdp_offload is True
# --- ForgeConfig with distributed ---
class TestForgeConfigDistributed:
def test_no_distributed(self, minimal_config):
cfg = ForgeConfig(**minimal_config())
assert cfg.distributed is None
def test_deepspeed_config(self, minimal_config):
cfg = ForgeConfig(**minimal_config(distributed={"strategy": "deepspeed", "deepspeed_config": "zero3"}))
assert cfg.distributed.strategy == "deepspeed"
assert cfg.distributed.deepspeed_config == "zero3"
def test_fsdp_config(self, minimal_config):
cfg = ForgeConfig(**minimal_config(distributed={"strategy": "fsdp", "fsdp_strategy": "hybrid_shard"}))
assert cfg.distributed.strategy == "fsdp"
assert cfg.distributed.fsdp_strategy == "hybrid_shard"
def test_unsloth_distributed_raises(self, minimal_config):
"""Unsloth + distributed should raise ValueError."""
import pytest
with pytest.raises((ValueError, TypeError)):
ForgeConfig(
**minimal_config(
model={"name_or_path": "org/model", "backend": "unsloth"},
distributed={"strategy": "deepspeed"},
)
)
def test_zero3_qlora_warning(self, caplog, minimal_config):
"""QLoRA + ZeRO-3 should produce a warning."""
import logging
with caplog.at_level(logging.WARNING, logger="forgelm.config"):
ForgeConfig(
**minimal_config(
model={"name_or_path": "org/model", "load_in_4bit": True},
distributed={"strategy": "deepspeed", "deepspeed_config": "zero3"},
)
)
assert "QLoRA (4-bit) with DeepSpeed ZeRO-3" in caplog.text
def test_zero2_qlora_no_warning(self, caplog, minimal_config):
"""QLoRA + ZeRO-2 should NOT produce the ZeRO-3 warning."""
import logging
with caplog.at_level(logging.WARNING, logger="forgelm.config"):
ForgeConfig(
**minimal_config(
model={"name_or_path": "org/model", "load_in_4bit": True},
distributed={"strategy": "deepspeed", "deepspeed_config": "zero2"},
)
)
assert "QLoRA (4-bit) with DeepSpeed ZeRO-3" not in caplog.text
# --- YAML load with distributed ---
class TestLoadConfigDistributed:
def test_load_deepspeed_yaml(self, tmp_path, minimal_config):
data = minimal_config(distributed={"strategy": "deepspeed", "deepspeed_config": "zero2"})
cfg_path = str(tmp_path / "config.yaml")
with open(cfg_path, "w") as f:
yaml.dump(data, f)
cfg = load_config(cfg_path)
assert cfg.distributed.strategy == "deepspeed"
def test_load_fsdp_yaml(self, tmp_path, minimal_config):
data = minimal_config(
distributed={
"strategy": "fsdp",
"fsdp_strategy": "full_shard",
"fsdp_offload": True,
}
)
cfg_path = str(tmp_path / "config.yaml")
with open(cfg_path, "w") as f:
yaml.dump(data, f)
cfg = load_config(cfg_path)
assert cfg.distributed.strategy == "fsdp"
assert cfg.distributed.fsdp_offload is True
# --- DeepSpeed config resolution ---
class TestDeepSpeedConfigResolution:
def test_preset_files_exist(self):
"""Verify that all DeepSpeed preset JSON files are valid."""
configs_dir = os.path.join(os.path.dirname(__file__), "..", "configs", "deepspeed")
for preset in ["zero2.json", "zero3.json", "zero3_offload.json"]:
path = os.path.join(configs_dir, preset)
assert os.path.isfile(path), f"Missing preset: {path}"
with open(path) as f:
data = json.load(f)
assert "zero_optimization" in data
assert "stage" in data["zero_optimization"]
def test_zero2_is_stage_2(self):
path = os.path.join(os.path.dirname(__file__), "..", "configs", "deepspeed", "zero2.json")
with open(path) as f:
data = json.load(f)
assert data["zero_optimization"]["stage"] == 2
def test_zero3_is_stage_3(self):
path = os.path.join(os.path.dirname(__file__), "..", "configs", "deepspeed", "zero3.json")
with open(path) as f:
data = json.load(f)
assert data["zero_optimization"]["stage"] == 3
def test_zero3_offload_has_cpu_offload(self):
path = os.path.join(os.path.dirname(__file__), "..", "configs", "deepspeed", "zero3_offload.json")
with open(path) as f:
data = json.load(f)
assert data["zero_optimization"]["stage"] == 3
assert data["zero_optimization"]["offload_optimizer"]["device"] == "cpu"
assert data["zero_optimization"]["offload_param"]["device"] == "cpu"
def test_all_presets_use_auto_values(self):
"""Ensure presets use 'auto' so HF Trainer resolves values from TrainingArguments."""
configs_dir = os.path.join(os.path.dirname(__file__), "..", "configs", "deepspeed")
for preset in ["zero2.json", "zero3.json", "zero3_offload.json"]:
with open(os.path.join(configs_dir, preset)) as f:
data = json.load(f)
assert data["train_batch_size"] == "auto"
assert data["train_micro_batch_size_per_gpu"] == "auto"
assert data["gradient_accumulation_steps"] == "auto"
# --- Dry-run with distributed ---
class TestDryRunDistributed:
def test_dry_run_json_shows_distributed(self, tmp_path, capsys, minimal_config):
from forgelm.cli import _run_dry_run
cfg = ForgeConfig(**minimal_config(distributed={"strategy": "deepspeed", "deepspeed_config": "zero3"}))
_run_dry_run(cfg, "json")
result = json.loads(capsys.readouterr().out)
assert result["distributed"] == "deepspeed"
def test_dry_run_json_no_distributed(self, capsys, minimal_config):
from forgelm.cli import _run_dry_run
cfg = ForgeConfig(**minimal_config())
_run_dry_run(cfg, "json")
result = json.loads(capsys.readouterr().out)
assert result["distributed"] is None