diff --git a/cnn_comparator/README.md b/cnn_comparator/README.md new file mode 100644 index 0000000..d19ef29 --- /dev/null +++ b/cnn_comparator/README.md @@ -0,0 +1,17 @@ +Both of these work in parallel on Perlmutter and uses GPUs on one node, but does not work across nodes. + +pytorch_cnn_NCCL_parallel.py: Parallelize with torch.distributed.all_reduce +pytorch_cnn_NCCL_with_DDP.py: Parallelize with torch.nn.parallel.DistributedDataParallel + +**pytorch_cnn_NCCL_parallel.py** +libEnsemble would send to generator to do the all reduce where it does +dist.all_reduce(param.grad, op=dist.ReduceOp.SUM) + +**pytorch_cnn_NCCL_with_DDP.py** +Implicit parallelism. + +**simf_cnn_NCCL_parallel.py** +Attempt to integrate into a libEnsemble sim_f. + +Needed for libEnsemble: +An alloc thats starts by calling sim, and fills in H from sim. diff --git a/cnn_comparator/pytorch_cnn_NCCL_parallel.py b/cnn_comparator/pytorch_cnn_NCCL_parallel.py new file mode 100644 index 0000000..64c5165 --- /dev/null +++ b/cnn_comparator/pytorch_cnn_NCCL_parallel.py @@ -0,0 +1,83 @@ +import os +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader, Subset +from torchvision import datasets, transforms + +# Define a Convolutional Neural Network +class MNISTCNN(nn.Module): + def __init__(self): + super(MNISTCNN, self).__init__() + self.conv1 = nn.Conv2d(1, 32, kernel_size=3) # 32 filters, 3x3 kernel + self.conv2 = nn.Conv2d(32, 64, kernel_size=3) # 64 filters, 3x3 kernel + self.pool = nn.MaxPool2d(2) # 2x2 pooling + self.fc1 = nn.Linear(64 * 12 * 12, 128) # Flattened features to 128 units + self.fc2 = nn.Linear(128, 10) # Output layer for 10 classes + + def forward(self, x): + x = torch.relu(self.conv1(x)) # First convolution + activation + x = torch.relu(self.conv2(x)) # Second convolution + activation + x = self.pool(x) # Pooling + x = x.view(-1, 64 * 12 * 12) # Flatten the features + x = torch.relu(self.fc1(x)) # Fully connected layer + return self.fc2(x) # Output layer + +def train(rank, world_size, epochs, batch_size, use_gpu): + # Redirect stdout to a log file for each rank + log_file = f"rank_{rank}_output.log" + with open(log_file, "w") as f: + os.dup2(f.fileno(), 1) # Redirect stdout + os.dup2(f.fileno(), 2) # Redirect stderr + # Initialize process group + dist.init_process_group("nccl") + + # Toggle device based on use_gpu flag + device = torch.device(f"cuda:{rank % torch.cuda.device_count()}" if use_gpu else "cpu") + + # Prepare dataset and DataLoader + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.5,), (0.5,)) + ]) + dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=False) + train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=8) + # Initialize model and optimizer + model = MNISTCNN().to(device) + optimizer = optim.SGD(model.parameters(), lr=0.01) + + for epoch in range(epochs): + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) + + optimizer.zero_grad() + output = model(data) + loss = nn.CrossEntropyLoss()(output, target) + loss.backward() + + # Synchronize gradients across processes + for param in model.parameters(): + dist.all_reduce(param.grad, op=dist.ReduceOp.SUM) + param.grad /= world_size + + optimizer.step() + + print(f"Rank {rank}, Epoch {epoch}, Loss: {loss.item()}", flush=True) + + # Clean up process group + dist.destroy_process_group() + +def main(): + # Use environment variables set by torchrun + rank = int(os.environ["RANK"]) + world_size = int(os.environ["WORLD_SIZE"]) + + epochs = 3 + batch_size = 32 + use_gpu = torch.cuda.is_available() + + train(rank, world_size, epochs, batch_size, use_gpu) + +if __name__ == "__main__": + main() diff --git a/cnn_comparator/pytorch_cnn_NCCL_with_DDP.py b/cnn_comparator/pytorch_cnn_NCCL_with_DDP.py new file mode 100644 index 0000000..009d3a9 --- /dev/null +++ b/cnn_comparator/pytorch_cnn_NCCL_with_DDP.py @@ -0,0 +1,77 @@ +import os +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader, Subset +from torchvision import datasets, transforms +from torch.nn.parallel import DistributedDataParallel as DDP + +# Define a Convolutional Neural Network +class MNISTCNN(nn.Module): + def __init__(self): + super(MNISTCNN, self).__init__() + self.conv1 = nn.Conv2d(1, 32, kernel_size=3) # 32 filters, 3x3 kernel + self.conv2 = nn.Conv2d(32, 64, kernel_size=3) # 64 filters, 3x3 kernel + self.pool = nn.MaxPool2d(2) # 2x2 pooling + self.fc1 = nn.Linear(64 * 12 * 12, 128) # Flattened features to 128 units + self.fc2 = nn.Linear(128, 10) # Output layer for 10 classes + + def forward(self, x): + x = torch.relu(self.conv1(x)) # First convolution + activation + x = torch.relu(self.conv2(x)) # Second convolution + activation + x = self.pool(x) # Pooling + x = x.view(-1, 64 * 12 * 12) # Flatten the features + x = torch.relu(self.fc1(x)) # Fully connected layer + return self.fc2(x) # Output layer + +def train(rank, world_size, epochs, batch_size, use_gpu): + # Initialize process group + dist.init_process_group("nccl") + + # Toggle device based on use_gpu flag + device = torch.device(f"cuda:{rank % torch.cuda.device_count()}" if use_gpu else "cpu") + + # Prepare dataset and DataLoader + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.5,), (0.5,)) + ]) + dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=False) + train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=8) + + # Initialize model, wrap with DDP, and optimizer + model = MNISTCNN().to(device) + model = DDP(model, device_ids=[rank % torch.cuda.device_count()]) if use_gpu else DDP(model) + optimizer = optim.SGD(model.parameters(), lr=0.01) + + criterion = nn.CrossEntropyLoss() + + for epoch in range(epochs): + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) + + optimizer.zero_grad() + output = model(data) + loss = criterion(output, target) + loss.backward() + optimizer.step() + + print(f"Rank {rank}, Epoch {epoch}, Loss: {loss.item()}", flush=True) + + # Clean up process group + dist.destroy_process_group() + +def main(): + # Use environment variables set by torchrun + rank = int(os.environ["RANK"]) + world_size = int(os.environ["WORLD_SIZE"]) + + epochs = 3 + batch_size = 32 + use_gpu = torch.cuda.is_available() + + train(rank, world_size, epochs, batch_size, use_gpu) + +if __name__ == "__main__": + main() diff --git a/cnn_comparator/simf_cnn_NCCL_parallel.py b/cnn_comparator/simf_cnn_NCCL_parallel.py new file mode 100644 index 0000000..4c58148 --- /dev/null +++ b/cnn_comparator/simf_cnn_NCCL_parallel.py @@ -0,0 +1,126 @@ +import os +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader, Subset +from torchvision import datasets, transforms +from libensemble.message_numbers import PERSIS_STOP, STOP_TAG, EVAL_SIM_TAG, WORKER_DONE + +# Define a Convolutional Neural Network +class MNISTCNN(nn.Module): + def __init__(self): + super(MNISTCNN, self).__init__() + self.conv1 = nn.Conv2d(1, 32, kernel_size=3) # 32 filters, 3x3 kernel + self.conv2 = nn.Conv2d(32, 64, kernel_size=3) # 64 filters, 3x3 kernel + self.pool = nn.MaxPool2d(2) # 2x2 pooling + self.fc1 = nn.Linear(64 * 12 * 12, 128) # Flattened features to 128 units + self.fc2 = nn.Linear(128, 10) # Output layer for 10 classes + + def forward(self, x): + x = torch.relu(self.conv1(x)) # First convolution + activation + x = torch.relu(self.conv2(x)) # Second convolution + activation + x = self.pool(x) # Pooling + x = x.view(-1, 64 * 12 * 12) # Flatten the features + x = torch.relu(self.fc1(x)) # Fully connected layer + return self.fc2(x) # Output layer + + +def train(rank, group_size, epochs, batch_size, use_gpu): + # Redirect stdout to a log file for each rank + log_file = f"rank_{rank}_output.log" + with open(log_file, "w") as f: + os.dup2(f.fileno(), 1) # Redirect stdout + os.dup2(f.fileno(), 2) # Redirect stderr + # Initialize process group + dist.init_process_group("nccl") + + # Toggle device based on use_gpu flag + device = torch.device(f"cuda:{rank % torch.cuda.device_count()}" if use_gpu else "cpu") + + # Prepare dataset and DataLoader + transform = transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.5,), (0.5,)) + ]) + dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=False) + train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=8) + # Initialize model and optimizer + model = MNISTCNN().to(device) + optimizer = optim.SGD(model.parameters(), lr=0.01) + + for epoch in range(epochs): + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) + + optimizer.zero_grad() + output = model(data) + loss = nn.CrossEntropyLoss()(output, target) + loss.backward() + + # Synchronize gradients across processes + for param in model.parameters(): + dist.all_reduce(param.grad, op=dist.ReduceOp.SUM) + param.grad /= group_size # SH averaging? + + # SH TODO - consider inheriting dist (torch.distributed) + # and overriding dist.all_reduce() to do libE stuff + all_grads = [] + for param in model.parameters(): + all_grads.append(param.grad.detach().cpu().numpy().ravel()) + + # libE lines + H_o = np.zeros(1, dtype=sim_specs["out"]) + H_o["grads"] = np.concatenate(all_grads) + tag, Work, calc_in = ps.send_recv(H_o) + # SH TODO do we want final step - keep going till the sim is done!!! + while tag not in [STOP_TAG, PERSIS_STOP]: + offset = 0 + for param in model.parameters(): + grad_shape = param.grad.shape + grad_size = param.grad.numel() + + # SH - speculatively called "grads_in" + # Extract the corresponding section and reshape + param.grad.data.copy_( + torch.tensor(calc_in["grads_in"][offset:offset + grad_size].reshape(grad_shape), + device=param.grad.device) + ) + offset += grad_size + + # Now run optimzier on combined data + optimizer.step() + + print(f"Rank {rank}, Epoch {epoch}, Loss: {loss.item()}", flush=True) + + # Clean up process group + dist.destroy_process_group() + + +def persis_cnn(H, persis_info, sim_specs, libE_info): + +# def main(): + # Use environment variables set by torchrun + # rank = int(os.environ["RANK"]) + # group_size = int(os.environ["WORLD_SIZE"]) + group_size = 4 + epochs = 3 + batch_size = 32 + # use_gpu = torch.cuda.is_available() + use_gpu = True + + ps = PersistentSupport(libE_info, EVAL_SIM_TAG) + processes = [] + + # Run parallel on this node + for rank in range(group_size): + p = Process(target=train, args=(rank, group_size, epochs, batch_size, use_gpu, ps)) + p.start() + processes.append(p) + + for p in processes: + p.join() + + +if __name__ == "__main__": + main()