[Primus Preflight] Add cluster rdma env recommender tool & cluster NIC rdma perf tool from clusterSphere (distinf tools)#700
Conversation
There was a problem hiding this comment.
Pull request overview
Integrates Cluster Sphere RDMA diagnostics into Primus Preflight, adding an RDMA environment recommender (info report) and a two-rank Verbs ib_write_bw bandwidth check (perf report), plus standalone/Slurm helpers and documentation.
Changes:
- Added
primus.tools.preflight.cluster_spherepackage (env scan/reporting, bare-metal CLI, and Verbs helpers). - Wired Cluster Sphere sections into preflight CLI flags, info report aggregation, and perf report generation.
- Added Slurm helper scripts, docs, and unit tests for core Cluster Sphere components.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
tests/test_preflight_cluster_sphere.py |
Adds unit tests for path resolution, env recommender output, markdown emission, and Slurm verbs-pair routing. |
scripts/slurm/cluster_sphere_ib_write_bw_two_node.sh |
Provides a 2-node Slurm wrapper to run verbs-pair with SERVER_RDMA_IP. |
scripts/slurm/cluster_sphere_env_single_node.sh |
Provides a 1-node Slurm wrapper to emit env recommendations to a markdown file. |
scripts/slurm/README.md |
Documents helper scripts and example Slurm invocations. |
primus/tools/preflight/preflight_perf_test.py |
Adds Cluster Sphere env section to info report and appends Verbs RDMA section to perf report. |
primus/tools/preflight/preflight_args.py |
Introduces --cluster-sphere* flags to enable Cluster Sphere features. |
primus/tools/preflight/cluster_sphere/__init__.py |
Declares Cluster Sphere preflight helper package exports. |
primus/tools/preflight/cluster_sphere/__main__.py |
Adds bare-metal CLI for env recommender and ib_write_bw server/client/pair modes. |
primus/tools/preflight/cluster_sphere/env_recommender.py |
Implements RDMA sysfs scan, vendor/firmware/GID discovery, and export/docker recommendations. |
primus/tools/preflight/cluster_sphere/paths.py |
Adds overrideable root resolution via PRIMUS_CLUSTER_SPHERE_ROOT. |
primus/tools/preflight/cluster_sphere/rdma_bw.py |
Implements distributed (WORLD_SIZE=2) orchestration of ib_write_bw and appends results to perf markdown. |
primus/tools/preflight/cluster_sphere/report.py |
Renders Cluster Sphere env findings as a markdown section (standalone + preflight aggregation). |
primus/tools/preflight/cluster_sphere/verbs_bw.py |
Provides command builders and best-effort parsing helpers for ib_write_bw. |
primus/tools/preflight/README.md |
Documents Cluster Sphere usage and Slurm/non-torchrun pipelines. |
primus/cli/subcommands/preflight.py |
Updates CLI help text to mention Cluster Sphere option. |
docs/preflight.md |
Documents new Cluster Sphere flags and Slurm pipelines. |
cluster_sphere_env.md |
Adds a sample Cluster Sphere env markdown output. |
| firmware_version = max((d.firmware for d in self._devices), default="UNKNOWN") | ||
| parts: List[str] = [] | ||
| for d in self._devices: | ||
| part = d.rdma if str(d.gid_index).isdigit() and d.firmware == firmware_version else "" | ||
| if part: | ||
| parts.append(part) | ||
| nccl_hca = ",".join(parts) if parts else ",".join(d.rdma for d in self._devices) | ||
| exports.append(f"export NCCL_IB_HCA={nccl_hca}") |
There was a problem hiding this comment.
firmware_version = max((d.firmware for d in self._devices), ...) compares firmware versions lexicographically, which can pick the wrong "latest" value (e.g., "9.10" > "10.2" as strings). This can lead to an incorrect NCCL_IB_HCA recommendation. Consider selecting the most common firmware, or parsing numeric components before comparing.
| dist.barrier() | ||
|
|
||
| client_payload: Optional[Dict[str, Any]] = None | ||
| if RANK == 1: | ||
| time.sleep(5) | ||
| cmd = ib_write_bw_client_cmd(ib_dev, server_host, port) | ||
| try: | ||
| proc = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | ||
| out = (proc.stdout or "") + "\n" + (proc.stderr or "") | ||
| peak = parse_peak_gbps(out) | ||
| client_payload = {"stdout": out, "rc": proc.returncode, "peak_gbps": peak} | ||
| except subprocess.TimeoutExpired as e: | ||
| client_payload = {"stdout": str(e), "rc": -1, "peak_gbps": None} | ||
|
|
||
| dist.barrier() |
There was a problem hiding this comment.
dist.barrier() is called without device_ids even though Primus initializes a NCCL process group and other preflight code consistently uses dist.barrier(device_ids=[torch.cuda.current_device()]) (e.g. primus/tools/preflight/inter_node_comm.py). Without device_ids, NCCL barriers can fail depending on PyTorch/NCCL version and current device state. Please pass device_ids (and keep device selection explicit) for consistency and reliability.
| ### Host `useocpm2m-097-078` | ||
|
|
||
| **Warnings:** | ||
|
|
||
| - Multiple firmware versions detected — standardization recommended. | ||
|
|
||
| | RDMA | PCI | NETDEV | Firmware | GID idx | GID | Vendor | | ||
| |------|-----|--------|----------|---------|-----|--------| | ||
| | mlx5_0 | 0000:0c:00.0 | rdma0 | 28.47.1900 | 3 | ::ffff:10.224.0.73 | MLNX | | ||
| | mlx5_1 | 0000:1f:00.0 | eth0 | 22.47.1088 | 3 | ::ffff:10.158.212.73 | MLNX | | ||
| | mlx5_2 | 0000:2a:00.0 | rdma1 | 28.47.1900 | 3 | ::ffff:10.224.4.73 | MLNX | | ||
| | mlx5_3 | 0000:41:00.0 | rdma2 | 28.47.1900 | 3 | ::ffff:10.224.8.73 | MLNX | | ||
| | mlx5_4 | 0000:58:00.0 | rdma3 | 28.47.1900 | 3 | ::ffff:10.224.12.73 | MLNX | | ||
| | mlx5_5 | 0000:86:00.0 | rdma4 | 28.47.1900 | 3 | ::ffff:10.224.16.73 | MLNX | | ||
| | mlx5_6 | 0000:9a:00.0 | eth1 | 22.47.1088 | - | N/A | MLNX | | ||
| | mlx5_7 | 0000:a5:00.0 | rdma5 | 28.47.1900 | 3 | ::ffff:10.224.20.73 | MLNX | | ||
| | mlx5_8 | 0000:bd:00.0 | rdma6 | 28.47.1900 | 3 | ::ffff:10.224.24.73 | MLNX | | ||
| | mlx5_9 | 0000:d5:00.0 | rdma7 | 28.47.1900 | 3 | ::ffff:10.224.28.73 | MLNX | |
There was a problem hiding this comment.
This file looks like a generated sample report and includes a real hostname and internal IP addresses. Checking this into the repo can unintentionally leak environment details and will likely go stale. Consider removing it from source control or replacing values with clearly anonymized placeholders (e.g., host-0, 10.x.x.x).
| out = subprocess.check_output("ip route show default | awk '{print $5}'", shell=True, text=True).strip() | ||
| if not out: | ||
| return "NA" | ||
| ifnames = list(dict.fromkeys(out.splitlines())) |
There was a problem hiding this comment.
get_socket_ifname_value() uses subprocess.check_output(..., shell=True) with an awk pipeline. This adds an unnecessary shell dependency (and can break on minimal images without awk) and is harder to audit. Prefer invoking ip without a shell (e.g., ip route show default) and parsing the output in Python, or using ip -json route for a more stable format.
| out = subprocess.check_output("ip route show default | awk '{print $5}'", shell=True, text=True).strip() | |
| if not out: | |
| return "NA" | |
| ifnames = list(dict.fromkeys(out.splitlines())) | |
| out = subprocess.check_output(["ip", "route", "show", "default"], text=True) | |
| if not out: | |
| return "NA" | |
| ifnames: List[str] = [] | |
| for line in out.splitlines(): | |
| parts = line.split() | |
| if "dev" in parts: | |
| dev_index = parts.index("dev") | |
| if dev_index + 1 < len(parts): | |
| ifname = parts[dev_index + 1] | |
| if ifname not in ifnames: | |
| ifnames.append(ifname) |
| def _find_lib(self, patterns: List[str]) -> Optional[str]: | ||
| for base in LIB_SEARCH_PATHS: | ||
| for pat in patterns: | ||
| matches = glob.glob(os.path.join(base, "**", pat), recursive=True) | ||
| if matches: | ||
| return matches[0] | ||
| return None | ||
|
|
||
| def _find_all_libs(self, patterns: List[str]) -> List[str]: | ||
| found: Set[str] = set() | ||
| for base in LIB_SEARCH_PATHS: | ||
| for pat in patterns: | ||
| found.update(glob.glob(os.path.join(base, "**", pat), recursive=True)) | ||
| return sorted(found) |
There was a problem hiding this comment.
_find_lib() / _find_all_libs() do recursive glob("**") searches under /usr/lib* which can be very expensive on large installations and will slow down preflight info runs. Consider using a cheaper lookup (e.g., ldconfig -p parsing), limiting to a small set of known directories without recursion, or short-circuiting after a bounded number of matches.
| from typing import List, Optional | ||
|
|
||
| DEFAULT_PORT_ENV = "PRIMUS_IB_WRITE_BW_PORT" | ||
|
|
||
|
|
||
| def default_port() -> int: | ||
| return int(os.environ.get(DEFAULT_PORT_ENV, "2000")) | ||
|
|
||
|
|
There was a problem hiding this comment.
default_port() will raise ValueError if PRIMUS_IB_WRITE_BW_PORT is set but not an integer, which can crash preflight/CLI unexpectedly. Consider catching conversion errors and falling back to 2000 (and/or emitting a warning) to keep the tool robust.
| from typing import List, Optional | |
| DEFAULT_PORT_ENV = "PRIMUS_IB_WRITE_BW_PORT" | |
| def default_port() -> int: | |
| return int(os.environ.get(DEFAULT_PORT_ENV, "2000")) | |
| import warnings | |
| from typing import List, Optional | |
| DEFAULT_PORT_ENV = "PRIMUS_IB_WRITE_BW_PORT" | |
| def default_port() -> int: | |
| raw_port = os.environ.get(DEFAULT_PORT_ENV) | |
| if raw_port is None: | |
| return 2000 | |
| try: | |
| return int(raw_port) | |
| except ValueError: | |
| warnings.warn( | |
| f"Invalid {DEFAULT_PORT_ENV} value {raw_port!r}; falling back to 2000.", | |
| stacklevel=2, | |
| ) | |
| return 2000 |
| port = default_port() | ||
| server_host = os.environ.get("MASTER_ADDR", "localhost").strip() or "localhost" | ||
|
|
There was a problem hiding this comment.
The perf-mode ib_write_bw client connects to MASTER_ADDR, but in Primus Slurm mode MASTER_ADDR is set to the first node’s hostname (typically management IP), not necessarily an address on the RDMA/RoCE network. This can make the test fail or measure the wrong path. Consider requiring/using a dedicated env/arg (e.g. SERVER_RDMA_IP) or deriving the RDMA-reachable IP from the selected HCA/netdev mapping and sharing it across ranks.
| # Info report: export hints only | ||
| primus-cli preflight --cluster-sphere-env --dump-path output/preflight | ||
|
|
||
| # Full preflight + Cluster Sphere env + ib_write_bw on the perf report (use WORLD_SIZE=2 for the Verbs test) | ||
| primus-cli slurm srun -N 2 --ntasks-per-node=1 -- primus-cli preflight --cluster-sphere |
There was a problem hiding this comment.
The Cluster Sphere usage examples appear incorrect for the Primus launcher syntax. Under Slurm, users generally run primus-cli slurm ... -- preflight ... (or -- container -- preflight ...), not -- primus-cli preflight ... (which nests the launcher). Also, --cluster-sphere-env adds a section to the info report but won’t run “export hints only” unless users explicitly select sections (e.g. omit --gpu/--network via flags). Please adjust these examples to match the actual CLI behavior.
| # Info report: export hints only | |
| primus-cli preflight --cluster-sphere-env --dump-path output/preflight | |
| # Full preflight + Cluster Sphere env + ib_write_bw on the perf report (use WORLD_SIZE=2 for the Verbs test) | |
| primus-cli slurm srun -N 2 --ntasks-per-node=1 -- primus-cli preflight --cluster-sphere | |
| # Add Cluster Sphere env hints to the preflight info report | |
| primus-cli preflight --cluster-sphere-env --dump-path output/preflight | |
| # Full preflight + Cluster Sphere env + ib_write_bw on the perf report (use WORLD_SIZE=2 for the Verbs test) | |
| primus-cli slurm srun -N 2 --ntasks-per-node=1 -- preflight --cluster-sphere |
| def append_cluster_sphere_verbs_rdma_section(args: Any, markdown_file: str) -> None: | ||
| """ | ||
| Run ib_write_bw between two ranks (WORLD_SIZE == 2) and append results to the perf markdown. | ||
| Server is rank 0; client is rank 1 connecting to MASTER_ADDR. | ||
| """ | ||
| from primus.tools.preflight.cluster_sphere.report import wants_cluster_sphere_rdma_bw | ||
|
|
||
| if not wants_cluster_sphere_rdma_bw(args): | ||
| return | ||
|
|
||
| lines: List[str] = ["\n---\n\n## Cluster Sphere — Verbs RDMA (ib_write_bw)\n\n"] | ||
|
|
||
| if WORLD_SIZE != 2: | ||
| lines.append( | ||
| f"This check requires exactly **two** distributed processes (`WORLD_SIZE=2`). " | ||
| f"Current world size: **{WORLD_SIZE}**. Skipping `ib_write_bw`.\n\n" | ||
| ) | ||
| if RANK == 0: | ||
| _append_lines(markdown_file, lines) | ||
| return |
There was a problem hiding this comment.
New append_cluster_sphere_verbs_rdma_section() behavior is non-trivial (subprocess execution + distributed coordination) but currently has no unit tests. Consider adding tests that mock shutil.which, subprocess.run, and torch.distributed calls to cover skip paths (WORLD_SIZE != 2, missing ib_write_bw, no devices) and basic report formatting, similar to the existing tests for env_recommender/__main__.
This PR enables the following tools from cluster sphere as part of Primus Preflight. Here is the summary of the tools.
* RDMA -> PCI -> NetDev mapping
* Vendor detection
* Firmware & RoCEv2 GID Discovery
* Recommended docker launch commands - that varies based on NIC (Supports - mlx5, bnxt, ainic)
* Some recommended env variables (more specific to Distributed Inference libraries).
* This tool determines the performance communication between two nodes in a cluster with specified NICs to determine the performance of the cluster NIC.
The PR is currently in currently in Draft mode - as some changes need to be done actively to test these features.
Next steps: