From ad960a1090fd102d80f5eea72f167ef8e75e8e71 Mon Sep 17 00:00:00 2001 From: lxzlxzliuxuzhao Date: Fri, 20 Mar 2026 12:31:52 +0000 Subject: [PATCH 1/2] Add CoreX BI-V150 compatibility support --- .../Hardware Support/corex_usage.md | 161 +++++++++++ .../Hardware Support/corex_usage_zh.md | 161 +++++++++++ ...gent_val_frozen_lake_multi_nodes_demo.yaml | 2 +- ...gent_val_frozen_lake_single_node_demo.yaml | 8 +- ...c_pipeline_frozen_lake_multi_nodes_demo.sh | 7 +- ...c_pipeline_frozen_lake_single_node_demo.sh | 7 +- roll/configs/base_config.py | 7 +- roll/distributed/scheduler/initialize.py | 37 ++- .../distributed/scheduler/resource_manager.py | 21 +- roll/distributed/strategy/vllm_strategy.py | 16 ++ roll/platforms/__init__.py | 41 ++- roll/platforms/corex.py | 9 + roll/platforms/platform.py | 252 +++++++++++++++++- roll/third_party/megatron/optimizer.py | 55 +++- roll/third_party/vllm/__init__.py | 28 +- .../distributed/scheduler/test_initialize.py | 24 +- .../scheduler/test_resource_manager.py | 24 ++ tests/platforms/test_platform_init.py | 47 ++++ tests/platforms/test_platform_memory.py | 95 +++++++ .../megatron/test_optimizer_compat.py | 69 +++++ 20 files changed, 1032 insertions(+), 39 deletions(-) create mode 100644 docs_roll/docs/User Guides/Hardware Support/corex_usage.md create mode 100644 docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md create mode 100644 roll/platforms/corex.py create mode 100644 tests/platforms/test_platform_init.py create mode 100644 tests/platforms/test_platform_memory.py create mode 100644 tests/third_party/megatron/test_optimizer_compat.py diff --git a/docs_roll/docs/User Guides/Hardware Support/corex_usage.md b/docs_roll/docs/User Guides/Hardware Support/corex_usage.md new file mode 100644 index 000000000..9a6196c2b --- /dev/null +++ b/docs_roll/docs/User Guides/Hardware Support/corex_usage.md @@ -0,0 +1,161 @@ +# ROLL x CoreX + +Last updated: 03/20/2026. + +This document records the CoreX-specific adaptations currently integrated in the local ROLL workspace for Iluvatar devices such as `Iluvatar BI-V150`. + +## Current Scope + +The current adaptation targets a CUDA-like CoreX software stack where: + +- `torch.cuda` is available +- Ray exposes the accelerator as `GPU` +- the NVML-compatible monitoring interface is provided by `libixml.so` +- vendor-patched `torch`, `megatron-core`, and `vllm` may diverge from upstream behavior + +This is a practical compatibility layer for running ROLL on the current machine. It is not yet a full official upstream hardware support package. + +## What Was Adapted + +### 1. Platform Detection + +ROLL previously treated `Iluvatar BI-V150` as an unknown CUDA device. The platform initialization logic now detects CoreX-style device names and creates a dedicated `CorexPlatform` instead of falling back to `UnknownPlatform`. + +Current detection keywords include: + +- `ILUVATAR` +- `COREX` +- `BI-V` + +Implementation: + +- `roll/platforms/corex.py` +- `roll/platforms/__init__.py` + +### 2. Safe CUDA Platform Initialization + +On this vendor stack, subprocesses can hit a state where: + +- `torch.cuda.is_available()` is effectively usable +- but `device_count() == 0` in the current visibility scope + +Directly calling `torch.cuda.get_device_name()` in that state can raise `AssertionError: Invalid device id`. + +The platform bootstrap now checks `device_count()` first and only queries the CUDA device name when at least one visible device exists. + +Implementation: + +- `roll/platforms/__init__.py` + +### 3. Ray GPU Resource Registration + +Ray did not automatically register CoreX GPUs as `GPU` resources on this machine, even though `torch` could see the devices. That caused the scheduler to believe the cluster had zero usable GPU nodes. + +ROLL now starts Ray with explicit accelerator resources so the cluster exposes the expected `GPU` count. + +Implementation: + +- `roll/distributed/scheduler/initialize.py` +- `roll/distributed/scheduler/resource_manager.py` + +### 4. NVML-Compatible Memory Monitoring Through `libixml.so` + +The vendor stack does not provide `libnvidia-ml.so.1`, so upstream `torch.cuda.device_memory_used()` fails when it tries to initialize NVML. However, CoreX exposes an NVML-compatible API through `libixml.so`. + +ROLL now: + +1. Tries the upstream `torch.cuda.device_memory_used()` +2. If that fails, tries to load an NVML-compatible library +3. Falls back in this order: + - standard NVML if present + - `libixml.so` +4. Calls: + - `nvmlInit_v2` / `nvmlInit` + - `nvmlDeviceGetHandleByIndex_v2` / `nvmlDeviceGetHandleByIndex` + - `nvmlDeviceGetMemoryInfo` +5. Maps logical device index through `CUDA_VISIBLE_DEVICES` before querying the physical handle + +An override is also supported: + +```bash +export ROLL_NVML_COMPAT_LIB=/path/to/libixml.so +``` + +Implementation: + +- `roll/platforms/platform.py` + +### 5. Megatron Optimizer Compatibility + +The installed vendor-patched `megatron-core` on this machine uses a newer `_get_param_groups_and_buffers()` signature than the previous ROLL compatibility layer expected. + +ROLL now inspects the runtime function signature and passes the new required arguments when needed, while staying compatible with older variants. + +Implementation: + +- `roll/third_party/megatron/optimizer.py` + +### 6. vLLM Sleep/Offload Compatibility + +The current CoreX stack does not expose the allocator backend required by vLLM sleep mode (`cumem`). When ROLL forced sleep mode on this machine, vLLM crashed during initialization. + +ROLL now: + +- disables vLLM sleep/offload automatically when the allocator backend is unavailable +- warns explicitly when this means `actor_infer` will stay resident on GPU +- warns again if the user is still using a colocated train/infer layout + +Implementation: + +- `roll/third_party/vllm/__init__.py` +- `roll/distributed/strategy/vllm_strategy.py` + +### 7. Single-Node Demo Layout Adjustment + +On this CoreX stack, colocating `actor_train` and `actor_infer` on the same GPU was not stable for the default frozen-lake agentic demo after vLLM sleep mode became unavailable. The main failure mode was OOM during the Megatron optimizer step. + +The single-node frozen-lake demo was adjusted to a 2-GPU disaggregated layout: + +- `actor_train` on GPU 0 +- `actor_infer` on GPU 1 +- lower vLLM `gpu_memory_utilization` + +Implementation: + +- `examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml` + +## Validation Performed + +The following checks were performed on the current machine: + +- `torch.cuda.get_device_name(0)` returned `Iluvatar BI-V150` +- `ldconfig -p` exposed `libixml.so` +- direct `ctypes` calls to `libixml.so` succeeded for: + - `nvmlInit_v2` + - `nvmlDeviceGetHandleByIndex_v2` + - `nvmlDeviceGetMemoryInfo` +- `current_platform.device_memory_used()` successfully reported memory through the NVML-compatible path +- the frozen-lake single-node pipeline ran past step 0 and step 1 after the disaggregated layout change + +## Tests Added or Updated + +- `tests/platforms/test_platform_init.py` +- `tests/platforms/test_platform_memory.py` +- `tests/distributed/scheduler/test_initialize.py` +- `tests/distributed/scheduler/test_resource_manager.py` +- `tests/third_party/megatron/test_optimizer_compat.py` + +## Known Limitations + +- CoreX is currently integrated as a CUDA-like platform, not as a fully separate backend with vendor-specific kernels or scheduling behavior. +- vLLM sleep mode is still disabled on the current stack because the required allocator backend is unavailable. +- The current adaptation favors reliable execution over preserving the original single-GPU colocated demo topology. +- Existing long-running processes must be restarted to pick up the latest platform and monitoring changes. + +## Recommended Run Command + +```bash +conda activate ROLL +ray stop +bash examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh +``` diff --git a/docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md b/docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md new file mode 100644 index 000000000..841a4aaba --- /dev/null +++ b/docs_roll/docs/User Guides/Hardware Support/corex_usage_zh.md @@ -0,0 +1,161 @@ +# ROLL x CoreX + +最后更新:2026年3月20日 + +本文档记录了当前本地 ROLL 工作区中为象帝先设备(如 `Iluvatar BI-V150`)集成的 CoreX 特定适配。 + +## 当前范围 + +当前适配针对的是类似 CUDA 的 CoreX 软件栈,在该栈中: + +- `torch.cuda` 可用 +- Ray 将加速器暴露为 `GPU` +- 由 `libixml.so` 提供兼容 NVML 的监控接口 +- 供应商打过补丁的 `torch`、`megatron-core` 和 `vllm` 的行为可能与其上游版本不同 + +这是一个在当前机器上运行 ROLL 的实用兼容层。它还不是一个完整的官方上游硬件支持包。 + +## 已适配内容 + +### 1. 平台检测 + +ROLL 此前将 `Iluvatar BI-V150` 视为未知的 CUDA 设备。平台初始化逻辑现在能检测 CoreX 风格的设备名称,并创建一个专用的 `CorexPlatform`,而不是回退到 `UnknownPlatform`。 + +当前检测关键词包括: + +- `ILUVATAR` +- `COREX` +- `BI-V` + +实现: + +- `roll/platforms/corex.py` +- `roll/platforms/__init__.py` + +### 2. 安全的 CUDA 平台初始化 + +在此供应商栈上,子进程可能遇到以下状态: + +- `torch.cuda.is_available()` 实际可用 +- 但在当前可见性范围内 `device_count() == 0` + +在该状态下直接调用 `torch.cuda.get_device_name()` 可能会引发 `AssertionError: Invalid device id`。 + +平台引导程序现在会首先检查 `device_count()`,并且仅在存在至少一个可见设备时才查询 CUDA 设备名称。 + +实现: + +- `roll/platforms/__init__.py` + +### 3. Ray GPU 资源注册 + +即使 `torch` 可以看到设备,Ray 在此机器上也不会自动将 CoreX GPU 注册为 `GPU` 资源。这导致调度器认为集群拥有零个可用 GPU 节点。 + +ROLL 现在在启动 Ray 时会显式指定加速器资源,以便集群暴露预期的 `GPU` 计数。 + +实现: + +- `roll/distributed/scheduler/initialize.py` +- `roll/distributed/scheduler/resource_manager.py` + +### 4. 通过 `libixml.so` 实现 NVML 兼容的内存监控 + +供应商栈未提供 `libnvidia-ml.so.1`,因此上游的 `torch.cuda.device_memory_used()` 在尝试初始化 NVML 时会失败。但是,CoreX 通过 `libixml.so` 暴露了一个兼容 NVML 的 API。 + +ROLL 现在会: + +1. 首先尝试使用上游的 `torch.cuda.device_memory_used()` +2. 如果失败,则尝试加载一个兼容 NVML 的库 +3. 按以下顺序回退: + - 标准 NVML(如果存在) + - `libixml.so` +4. 调用: + - `nvmlInit_v2` / `nvmlInit` + - `nvmlDeviceGetHandleByIndex_v2` / `nvmlDeviceGetHandleByIndex` + - `nvmlDeviceGetMemoryInfo` +5. 在查询物理句柄之前,通过 `CUDA_VISIBLE_DEVICES` 映射逻辑设备索引 + +也支持通过以下方式覆盖库路径: + +```bash +export ROLL_NVML_COMPAT_LIB=/path/to/libixml.so +``` + +实现: + +- `roll/platforms/platform.py` + +### 5. Megatron 优化器兼容性 + +此机器上安装的供应商打过补丁的 `megatron-core` 使用了比之前 ROLL 兼容层所期望的更新的 `_get_param_groups_and_buffers()` 签名。 + +ROLL 现在会检查运行时函数的签名,并在需要时传递新的必需参数,同时保持与旧版本的兼容性。 + +实现: + +- `roll/third_party/megatron/optimizer.py` + +### 6. vLLM 休眠/卸载兼容性 + +当前的 CoreX 栈未暴露 vLLM 休眠模式所需的分配器后端(`cumem`)。当 ROLL 在此机器上强制启用休眠模式时,vLLM 会在初始化期间崩溃。 + +ROLL 现在会: + +- 当分配器后端不可用时,自动禁用 vLLM 休眠/卸载功能 +- 当这意味着 `actor_infer` 将常驻 GPU 时,发出明确的警告 +- 如果用户仍在使用了共置的训练/推理布局,再次发出警告 + +实现: + +- `roll/third_party/vllm/__init__.py` +- `roll/distributed/strategy/vllm_strategy.py` + +### 7. 单节点演示布局调整 + +在此 CoreX 栈上,在 vLLM 休眠模式变得不可用之后,将 `actor_train` 和 `actor_infer` 共置于同一 GPU 上对于默认的 frozen-lake 代理演示来说不稳定。主要故障模式是在 Megatron 优化器步骤中发生 OOM。 + +frozen-lake 单节点演示调整为 2-GPU 分离布局: + +- `actor_train` 在 GPU 0 上 +- `actor_infer` 在 GPU 1 上 +- 降低 vLLM 的 `gpu_memory_utilization` + +实现: + +- `examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml` + +## 已执行的验证 + +在当前机器上执行了以下检查: + +- `torch.cuda.get_device_name(0)` 返回 `Iluvatar BI-V150` +- `ldconfig -p` 暴露了 `libixml.so` +- 对 `libixml.so` 的直接 `ctypes` 调用成功执行了: + - `nvmlInit_v2` + - `nvmlDeviceGetHandleByIndex_v2` + - `nvmlDeviceGetMemoryInfo` +- `current_platform.device_memory_used()` 成功通过 NVML 兼容路径报告了内存使用情况 +- 在更改为分离布局后,frozen-lake 单节点流水线成功运行了第 0 步和第 1 步 + +## 新增或更新的测试 + +- `tests/platforms/test_platform_init.py` +- `tests/platforms/test_platform_memory.py` +- `tests/distributed/scheduler/test_initialize.py` +- `tests/distributed/scheduler/test_resource_manager.py` +- `tests/third_party/megatron/test_optimizer_compat.py` + +## 已知限制 + +- CoreX 目前作为类似 CUDA 的平台集成,而非具有供应商特定内核或调度行为的完全独立后端。 +- vLLM 休眠模式在当前栈上仍被禁用,因为所需的分配器后端不可用。 +- 当前的适配优先保证可靠执行,而不是保留原始的单 GPU 共置演示拓扑。 +- 必须重启现有的长时间运行进程,才能应用最新的平台和监控更改。 + +## 推荐运行命令 + +```bash +conda activate ROLL +ray stop +bash examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh +``` \ No newline at end of file diff --git a/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml b/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml index 5e88e736a..86f30e078 100644 --- a/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml +++ b/examples/agentic_demo/agent_val_frozen_lake_multi_nodes_demo.yaml @@ -107,7 +107,7 @@ actor_infer: strategy_args: strategy_name: vllm strategy_config: - gpu_memory_utilization: 0.8 + gpu_memory_utilization: 0.6 block_size: 16 load_format: auto device_mapping: list(range(0,4)) diff --git a/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml b/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml index b4435dd44..00a77c40e 100644 --- a/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml +++ b/examples/agentic_demo/agent_val_frozen_lake_single_node_demo.yaml @@ -37,7 +37,7 @@ checkpoint_config: type: file_system output_dir: ./output/render -num_gpus_per_node: 1 +num_gpus_per_node: 2 max_steps: 100 save_steps: 10000 @@ -107,10 +107,10 @@ actor_infer: strategy_args: strategy_name: vllm strategy_config: - gpu_memory_utilization: 0.8 + gpu_memory_utilization: 0.6 block_size: 16 load_format: auto - device_mapping: list(range(0,1)) + device_mapping: list(range(1,2)) infer_batch_size: 1 reference: @@ -157,4 +157,4 @@ custom_envs: SokobanDifferentGridVocab: ${custom_env.SokobanDifferentGridVocab} FrozenLake: - ${custom_env.FrozenLake} \ No newline at end of file + ${custom_env.FrozenLake} diff --git a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh index e6e1b83e4..6c14dd476 100755 --- a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh +++ b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_multi_nodes_demo.sh @@ -1,7 +1,8 @@ #!/bin/bash set +x -ROLL_PATH="/workspace/ROLL-main" -CONFIG_PATH=$(basename $(dirname $0)) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROLL_PATH="$(cd "${SCRIPT_DIR}/../.." && pwd)" +CONFIG_PATH="$(basename "${SCRIPT_DIR}")" export PYTHONPATH="$ROLL_PATH:$PYTHONPATH" -python examples/start_agentic_pipeline.py --config_path $CONFIG_PATH --config_name agent_val_frozen_lake_multi_nodes_demo +python "$ROLL_PATH/examples/start_agentic_pipeline.py" --config_path "$CONFIG_PATH" --config_name agent_val_frozen_lake_multi_nodes_demo diff --git a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh index 74a4f3fdf..8800efbcc 100755 --- a/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh +++ b/examples/agentic_demo/run_agentic_pipeline_frozen_lake_single_node_demo.sh @@ -1,7 +1,8 @@ #!/bin/bash set +x -ROLL_PATH="/workspace/ROLL-main" -CONFIG_PATH=$(basename $(dirname $0)) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROLL_PATH="$(cd "${SCRIPT_DIR}/../.." && pwd)" +CONFIG_PATH="$(basename "${SCRIPT_DIR}")" export PYTHONPATH="$ROLL_PATH:$PYTHONPATH" -python examples/start_agentic_pipeline.py --config_path $CONFIG_PATH --config_name agent_val_frozen_lake_single_node_demo +python "$ROLL_PATH/examples/start_agentic_pipeline.py" --config_path "$CONFIG_PATH" --config_name agent_val_frozen_lake_single_node_demo diff --git a/roll/configs/base_config.py b/roll/configs/base_config.py index 80949874c..f5a753d46 100644 --- a/roll/configs/base_config.py +++ b/roll/configs/base_config.py @@ -320,7 +320,12 @@ def __post_init__(self): os.environ.update(self.system_envs) from ..platforms import current_platform - self.num_gpus_per_node = current_platform.device_count() + available_gpus_per_node = current_platform.device_count() + if available_gpus_per_node > 0: + assert self.num_gpus_per_node <= available_gpus_per_node, ( + f"num_gpus_per_node={self.num_gpus_per_node} exceeds visible devices on this node " + f"({available_gpus_per_node})." + ) if hasattr(self, 'actor_train') and isinstance(self.actor_train, WorkerConfig): self.actor_train.system_envs.update({k: v for k, v in self.system_envs.items() if k not in self.actor_train.system_envs}) diff --git a/roll/distributed/scheduler/initialize.py b/roll/distributed/scheduler/initialize.py index 877e4ef18..5e5b5ff52 100644 --- a/roll/distributed/scheduler/initialize.py +++ b/roll/distributed/scheduler/initialize.py @@ -1,9 +1,11 @@ +import json import os import subprocess import sys import time import ray +import torch from roll.distributed.scheduler.driver_utils import ( get_driver_rank, @@ -24,6 +26,30 @@ logger = get_logger() +def _get_ray_start_resource_args() -> str: + ray_device_key = getattr(current_platform, "ray_device_key", "") + if ray_device_key in ("", "CPU"): + return "" + + device_module = getattr(torch, current_platform.device_type, None) + device_count_fn = getattr(device_module, "device_count", None) + if not callable(device_count_fn): + logger.warning( + "Current platform %s does not expose a usable device_count() method. Ray will rely on auto-detection.", + current_platform.device_type, + ) + return "" + + device_count = device_count_fn() + if device_count <= 0: + return "" + + if ray_device_key == "GPU": + return f" --num-gpus={device_count}" + + return f" --resources='{json.dumps({ray_device_key: device_count})}'" + + def start_ray_cluster(): rank = get_driver_rank() world_size = get_driver_world_size() @@ -36,12 +62,19 @@ def start_ray_cluster(): logger.info("Ray cluster already initialized") return False + resource_args = _get_ray_start_resource_args() if rank == 0: - cmd = f"ray start --head --port={master_port} --node-name={node_name} --dashboard-port={dashboard_port}" + cmd = ( + f"ray start --head --port={master_port} --node-name={node_name} " + f"--dashboard-port={dashboard_port}{resource_args}" + ) else: # fix: 处理大规模下可能会出现的head/worker node创建顺序不一致问题 time.sleep(5) - cmd = f"ray start --address={master_addr}:{master_port} --node-name={node_name} --dashboard-port={dashboard_port}" + cmd = ( + f"ray start --address={master_addr}:{master_port} --node-name={node_name} " + f"--dashboard-port={dashboard_port}{resource_args}" + ) logger.info(f"Starting ray cluster: {cmd}") ret = subprocess.run(cmd, shell=True, capture_output=True) diff --git a/roll/distributed/scheduler/resource_manager.py b/roll/distributed/scheduler/resource_manager.py index ee2abc93f..b6b65d01f 100644 --- a/roll/distributed/scheduler/resource_manager.py +++ b/roll/distributed/scheduler/resource_manager.py @@ -3,6 +3,7 @@ from typing import Dict, List, Tuple, Optional import ray +import torch from ray.util.placement_group import PlacementGroup from roll.platforms import current_platform @@ -31,8 +32,24 @@ def __init__(self, num_gpus_per_node, num_nodes): if num_nodes is None: num_nodes = ray_num_nodes - assert num_nodes <= ray_num_nodes, (f"The Ray clusters(ray_num_nodes: {ray_num_nodes}) cannot meet the " - f"required number of nodes (`num_nodes`{num_nodes}).") + device_module = getattr(torch, current_platform.device_type, None) + device_count_fn = getattr(device_module, "device_count", None) + visible_device_count = device_count_fn() if callable(device_count_fn) else 0 + accelerator_hint = "" + if current_platform.ray_device_key != "CPU" and visible_device_count > 0 and available_gpu == 0: + accelerator_hint = ( + f" torch sees {visible_device_count} {current_platform.device_name} device(s) on this node, " + f"but Ray reports 0 {current_platform.ray_device_key} resources. " + f"This usually means the existing Ray cluster was started without explicitly registering accelerator " + f"resources. Stop the current Ray cluster and restart it with explicit resources, for example " + f"`ray stop` then `ray start --head --num-gpus={visible_device_count}`." + ) + + assert num_nodes <= ray_num_nodes, ( + f"The Ray clusters(ray_num_nodes: {ray_num_nodes}) cannot meet the required number of nodes " + f"(`num_nodes`{num_nodes}). required per-node {current_platform.ray_device_key} >= {num_gpus_per_node}, " + f"ray.available_resources()={available_resources}.{accelerator_hint}" + ) self.num_nodes = num_nodes self.gpu_per_node = num_gpus_per_node self.num_gpus = self.gpu_per_node * self.num_nodes diff --git a/roll/distributed/strategy/vllm_strategy.py b/roll/distributed/strategy/vllm_strategy.py index 6ee0f9d9d..857a89d1b 100644 --- a/roll/distributed/strategy/vllm_strategy.py +++ b/roll/distributed/strategy/vllm_strategy.py @@ -113,6 +113,18 @@ async def initialize(self, model_provider): os.environ["VLLM_PORT"] = str(vllm_port) self.model = await create_async_llm(resource_placement_groups=self.worker_config.resource_placement_groups, **vllm_config) + self.sleep_mode_enabled = bool(getattr(self.model, "roll_sleep_mode_enabled", False)) + if self.sleep_level > 0 and not self.sleep_mode_enabled: + logger.warning( + "vLLM sleep/offload is disabled because the allocator backend is unavailable. " + "actor_infer will stay resident on GPU." + ) + if getattr(self.worker.pipeline_config, "is_actor_infer_colocated", False): + logger.warning( + "actor_infer overlaps with training/reference GPUs while vLLM sleep/offload is unavailable. " + "This colocated layout can easily OOM on non-standard accelerator stacks; prefer disjoint " + "device_mapping for actor_train/reference and actor_infer." + ) if Version("0.15.0") <= Version(vllm.__version__): @@ -325,6 +337,10 @@ async def load_states(self, *args, **kwargs): async def offload_states(self, include=None, non_blocking=False): await self.model.reset_prefix_cache() + if not getattr(self, "sleep_mode_enabled", False): + gc.collect() + current_platform.empty_cache() + return if include is None or OffloadStateType.model_params in include: if self.is_model_in_gpu and self.worker.pipeline_config.is_actor_infer_colocated: await self.model.offload_states(self.sleep_level) diff --git a/roll/platforms/__init__.py b/roll/platforms/__init__.py index 6869621f4..561196ee5 100644 --- a/roll/platforms/__init__.py +++ b/roll/platforms/__init__.py @@ -1,6 +1,7 @@ import torch from .platform import Platform +from .corex import CorexPlatform from .cuda import CudaPlatform from .npu import NpuPlatform from .rocm import RocmPlatform @@ -13,20 +14,35 @@ logger = get_logger() +def _is_corex_device_name(device_name: str) -> bool: + normalized = device_name.upper() + return any(keyword in normalized for keyword in ("ILUVATAR", "COREX", "BI-V")) + + def _init_platform() -> Platform: """ Detect and initialize the appropriate platform based on available devices. Priority: - 1. CUDA (NVIDIA / AMD ROCm) + 1. CUDA (NVIDIA / AMD ROCm / CoreX-like stacks) 2. NPU (if torch_npu is installed) 3. CPU (fallback) Returns: An instance of a subclass of Platform corresponding to the detected hardware. """ - if torch.cuda.is_available(): - device_name = torch.cuda.get_device_name().upper() + cuda_device_count = 0 + try: + cuda_device_count = torch.cuda.device_count() + except Exception as exc: + logger.warning("Failed to query CUDA device count. Falling back to CPU/NPU detection. Error: %s", exc) + + if cuda_device_count > 0: + try: + device_name = torch.cuda.get_device_name(0).upper() + except Exception as exc: + logger.warning("Failed to query CUDA device name. Falling back to UnknownPlatform. Error: %s", exc) + return UnknownPlatform() logger.debug(f"Detected CUDA device: {device_name}") if "NVIDIA" in device_name: logger.debug("Initializing CUDA platform (NVIDIA).") @@ -34,17 +50,20 @@ def _init_platform() -> Platform: elif "AMD" in device_name: logger.debug("Initializing ROCm platform (AMD).") return RocmPlatform() + elif _is_corex_device_name(device_name): + logger.debug("Initializing CoreX platform.") + return CorexPlatform() logger.warning("Unrecognized CUDA device. Falling back to UnknownPlatform.") return UnknownPlatform() - else: - try: - import torch_npu # noqa: F401 - logger.debug("Detected torch_npu. Initializing NPU platform.") - return NpuPlatform() - except ImportError: - logger.debug("No supported accelerator detected. Initializing CPU platform.") - return CpuPlatform() + try: + import torch_npu # noqa: F401 + + logger.debug("Detected torch_npu. Initializing NPU platform.") + return NpuPlatform() + except ImportError: + logger.debug("No supported accelerator detected. Initializing CPU platform.") + return CpuPlatform() # Global singleton representing the current platform in use. diff --git a/roll/platforms/corex.py b/roll/platforms/corex.py new file mode 100644 index 000000000..235c9ae9a --- /dev/null +++ b/roll/platforms/corex.py @@ -0,0 +1,9 @@ +from .unknown import UnknownPlatform + + +class CorexPlatform(UnknownPlatform): + device_name: str = "COREX" + + @classmethod + def is_cuda(cls) -> bool: + return True diff --git a/roll/platforms/platform.py b/roll/platforms/platform.py index 9f9d4b105..ef8faa98a 100644 --- a/roll/platforms/platform.py +++ b/roll/platforms/platform.py @@ -1,9 +1,156 @@ -import torch +import ctypes +import ctypes.util import os +from threading import Lock + +import torch from ..utils.logging import get_logger logger = get_logger() +_device_memory_used_fallback_warned = set() +_device_memory_used_compat_warned = set() +_nvml_compat_lib = None +_nvml_compat_lib_name = None +_nvml_compat_lib_lock = Lock() + + +class _NvmlMemoryInfo(ctypes.Structure): + _fields_ = [ + ("total", ctypes.c_ulonglong), + ("free", ctypes.c_ulonglong), + ("used", ctypes.c_ulonglong), + ] + + +def _nvml_compatible_library_candidates(): + candidates = [] + + env_library = os.environ.get("ROLL_NVML_COMPAT_LIB") + if env_library: + candidates.append(env_library) + + for library_name in ( + ctypes.util.find_library("nvidia-ml"), + "libnvidia-ml.so.1", + ctypes.util.find_library("ixml"), + "libixml.so", + ): + if library_name and library_name not in candidates: + candidates.append(library_name) + + return candidates + + +def _load_nvml_compatible_library(): + global _nvml_compat_lib, _nvml_compat_lib_name + + if _nvml_compat_lib is not None: + return _nvml_compat_lib, _nvml_compat_lib_name + + with _nvml_compat_lib_lock: + if _nvml_compat_lib is not None: + return _nvml_compat_lib, _nvml_compat_lib_name + + errors = [] + for library_name in _nvml_compatible_library_candidates(): + try: + library = ctypes.CDLL(library_name) + except OSError as exc: + errors.append(f"{library_name}: {exc}") + continue + + init_fn = getattr(library, "nvmlInit_v2", None) or getattr(library, "nvmlInit", None) + if init_fn is None: + errors.append(f"{library_name}: missing nvmlInit_v2/nvmlInit") + continue + + init_fn.restype = ctypes.c_int + ret = init_fn() + if ret != 0: + error_string = _nvml_error_string(library, ret) + errors.append(f"{library_name}: nvmlInit failed with {ret} ({error_string})") + continue + + _nvml_compat_lib = library + _nvml_compat_lib_name = library_name + return _nvml_compat_lib, _nvml_compat_lib_name + + error_msg = "; ".join(errors) if errors else "no candidate libraries" + raise RuntimeError(f"failed to load a NVML-compatible library: {error_msg}") + + +def _nvml_error_string(library, retcode: int) -> str: + error_fn = getattr(library, "nvmlErrorString", None) + if error_fn is None: + return "unknown" + + error_fn.argtypes = [ctypes.c_int] + error_fn.restype = ctypes.c_char_p + try: + value = error_fn(retcode) + except Exception: + return "unknown" + + if not value: + return "unknown" + + try: + return value.decode("utf-8", errors="replace") + except Exception: + return str(value) + + +def _map_visible_device_index(device: int, env_var: str) -> int: + visible_devices = os.environ.get(env_var, "") + if not visible_devices: + return device + + parts = [part.strip() for part in visible_devices.split(",") if part.strip()] + if not parts or device < 0 or device >= len(parts): + return device + + try: + return int(parts[device]) + except ValueError: + return device + + +def _nvml_compatible_device_memory_used(device: int, env_var: str) -> tuple[int, str]: + library, library_name = _load_nvml_compatible_library() + physical_device = _map_visible_device_index(device, env_var) + + handle = ctypes.c_void_p() + get_handle = getattr(library, "nvmlDeviceGetHandleByIndex_v2", None) or getattr( + library, "nvmlDeviceGetHandleByIndex", None + ) + if get_handle is None: + raise RuntimeError(f"{library_name} does not export nvmlDeviceGetHandleByIndex") + + get_handle.argtypes = [ctypes.c_uint, ctypes.POINTER(ctypes.c_void_p)] + get_handle.restype = ctypes.c_int + ret = get_handle(physical_device, ctypes.byref(handle)) + if ret != 0: + raise RuntimeError( + f"{library_name} nvmlDeviceGetHandleByIndex({physical_device}) failed with {ret} " + f"({_nvml_error_string(library, ret)})" + ) + + memory_info = _NvmlMemoryInfo() + get_memory_info = getattr(library, "nvmlDeviceGetMemoryInfo", None) + if get_memory_info is None: + raise RuntimeError(f"{library_name} does not export nvmlDeviceGetMemoryInfo") + + get_memory_info.argtypes = [ctypes.c_void_p, ctypes.POINTER(_NvmlMemoryInfo)] + get_memory_info.restype = ctypes.c_int + ret = get_memory_info(handle, ctypes.byref(memory_info)) + if ret != 0: + raise RuntimeError( + f"{library_name} nvmlDeviceGetMemoryInfo({physical_device}) failed with {ret} " + f"({_nvml_error_string(library, ret)})" + ) + + return int(memory_info.used), library_name class Platform: @@ -89,6 +236,104 @@ def __getattr__(self, key: str): logger.warning("Current platform %s does not have '%s' attribute.", self.device_type, key) return None + @classmethod + def device_memory_used(cls, device=None) -> int: + device_module = getattr(torch, cls.device_type, None) + if device_module is None: + logger.warning("Current platform %s does not expose torch.%s.", cls.device_name, cls.device_type) + return 0 + + if device is None: + try: + device_count = device_module.device_count() + device = device_module.current_device() if device_count > 0 else 0 + except Exception: + device = 0 + + try: + return int(device_module.device_memory_used(device)) + except Exception as exc: + compat_used = cls._nvml_compatible_device_memory_used(device=device, primary_exc=exc) + if compat_used is not None: + return compat_used + return cls._fallback_device_memory_used(device_module=device_module, device=device, primary_exc=exc) + + @classmethod + def _nvml_compatible_device_memory_used(cls, device: int, primary_exc: Exception): + if cls.device_type != "cuda": + return None + + try: + used, library_name = _nvml_compatible_device_memory_used(device, cls.device_control_env_var) + except Exception: + return None + + warning_key = (cls.device_name, library_name) + if warning_key not in _device_memory_used_compat_warned: + _device_memory_used_compat_warned.add(warning_key) + logger.warning( + "torch.%s.device_memory_used is unavailable on platform %s (device %s, error: %s). " + "Using NVML-compatible library %s instead.", + cls.device_type, + cls.device_name, + device, + primary_exc, + library_name, + ) + + return used + + @classmethod + def _fallback_device_memory_used(cls, device_module, device: int, primary_exc: Exception) -> int: + fallback_candidates = ( + ("mem_get_info", lambda: _mem_get_info_used(device_module, device)), + ("memory_reserved", lambda: int(device_module.memory_reserved(device))), + ("memory_allocated", lambda: int(device_module.memory_allocated(device))), + ) + + errors = [] + for fallback_name, fallback_fn in fallback_candidates: + if not hasattr(device_module, fallback_name): + continue + try: + value = fallback_fn() + except Exception as fallback_exc: + errors.append(f"{fallback_name}: {fallback_exc}") + continue + + cls._warn_device_memory_used_fallback_once( + device=device, + fallback_name=fallback_name, + primary_exc=primary_exc, + ) + return value + + logger.warning( + "Failed to query device memory usage for platform %s on device %s. Primary error: %s. " + "Fallback errors: %s", + cls.device_name, + device, + primary_exc, + "; ".join(errors) if errors else "none", + ) + raise primary_exc + + @classmethod + def _warn_device_memory_used_fallback_once(cls, device: int, fallback_name: str, primary_exc: Exception) -> None: + warning_key = (cls.device_name, fallback_name) + if warning_key in _device_memory_used_fallback_warned: + return + + _device_memory_used_fallback_warned.add(warning_key) + logger.warning( + "device_memory_used is unavailable on platform %s (device %s, error: %s). " + "Falling back to torch.%s.", + cls.device_name, + device, + primary_exc, + fallback_name, + ) + @classmethod def is_cuda(cls) -> bool: return False @@ -201,3 +446,8 @@ def apply_ulysses_patch(cls) -> None: provide framework- and hardware-specific Ulysses patching. """ raise NotImplementedError + + +def _mem_get_info_used(device_module, device: int) -> int: + free, total = device_module.mem_get_info(device) + return int(total - free) diff --git a/roll/third_party/megatron/optimizer.py b/roll/third_party/megatron/optimizer.py index 888dc7a87..177a5a045 100644 --- a/roll/third_party/megatron/optimizer.py +++ b/roll/third_party/megatron/optimizer.py @@ -14,6 +14,37 @@ logger = logging.getLogger(__name__) +def _build_param_groups_and_buffers_kwargs( + *, + model_chunk_offset: int, + config: OptimizerConfig, + no_weight_decay_cond: Optional[Callable], + scale_lr_cond: Optional[Callable], + lr_mult: float, + filter_fn: Callable, + buffer_name: str, +) -> Dict: + signature = inspect.signature(_get_param_groups_and_buffers).parameters + kwargs = { + "model_chunk_offset": model_chunk_offset, + "config": config, + "filter_fn": filter_fn, + "buffer_name": buffer_name, + } + if "no_weight_decay_cond" in signature: + kwargs["no_weight_decay_cond"] = no_weight_decay_cond + if "scale_lr_cond" in signature: + kwargs["scale_lr_cond"] = scale_lr_cond + if "lr_mult" in signature: + kwargs["lr_mult"] = lr_mult + if "default_skip_embedding_weight_decay" in signature: + kwargs["default_skip_embedding_weight_decay"] = False + if "config_overrides" in signature: + # config_overrides is required in some newer mcore versions. + kwargs["config_overrides"] = None + return kwargs + + def get_megatron_optimizer( config: OptimizerConfig, model_chunks: List[MegatronModule], @@ -68,19 +99,20 @@ def get_megatron_optimizer( optimizers = [] model_chunk_offset = 0 - kwargs = {} - if "config_overrides" in inspect.signature(_get_param_groups_and_buffers).parameters: - # config_overrides is required in mcore-core>=0.16 - kwargs = {"config_overrides": None} for dense_model_chunks, overlap_param_gather_with_optimizer_step in zip( all_dense_model_chunks, overlap_param_gather_with_optimizer_step_flags ): - param_groups, buffers = _get_param_groups_and_buffers( - dense_model_chunks, + kwargs = _build_param_groups_and_buffers_kwargs( model_chunk_offset=model_chunk_offset, config=config, + no_weight_decay_cond=no_weight_decay_cond, + scale_lr_cond=scale_lr_cond, + lr_mult=lr_mult, filter_fn=lambda g: not g['is_expert_parallel'], buffer_name='buffers', + ) + param_groups, buffers = _get_param_groups_and_buffers( + dense_model_chunks, **kwargs, ) for model_chunk in dense_model_chunks: @@ -109,13 +141,18 @@ def get_megatron_optimizer( setattr(optimizers[-1], "model_chunks", dense_model_chunks) model_chunk_offset += 1 - moe_param_groups, moe_buffers = _get_param_groups_and_buffers( - model_chunks, + moe_kwargs = _build_param_groups_and_buffers_kwargs( model_chunk_offset=0, config=config, + no_weight_decay_cond=no_weight_decay_cond, + scale_lr_cond=scale_lr_cond, + lr_mult=lr_mult, filter_fn=lambda g: g['is_expert_parallel'], buffer_name='expert_parallel_buffers', - **kwargs, + ) + moe_param_groups, moe_buffers = _get_param_groups_and_buffers( + model_chunks, + **moe_kwargs, ) if len(moe_param_groups) > 0: model_parallel_rank = torch.distributed.get_rank( diff --git a/roll/third_party/vllm/__init__.py b/roll/third_party/vllm/__init__.py index 77f67cbbb..8de27f0ec 100644 --- a/roll/third_party/vllm/__init__.py +++ b/roll/third_party/vllm/__init__.py @@ -44,8 +44,33 @@ logger.info(f"Using vllm version {vllm.__version__}") +def _is_cumem_available() -> bool: + try: + from vllm.device_allocator.cumem import cumem_available + + return bool(cumem_available) + except Exception as exc: + logger.warning("Failed to probe vLLM cumem allocator availability: %s", exc) + return False + + +def _resolve_enable_sleep_mode(requested: bool) -> bool: + if not requested: + return False + + if _is_cumem_available(): + return True + + logger.warning( + "vLLM sleep mode requested but cumem allocator is unavailable on this platform. " + "Disabling sleep mode and continuing without vLLM offload/reload." + ) + return False + + async def create_async_llm(resource_placement_groups: List[Dict], **kwargs): - kwargs["enable_sleep_mode"] = True + requested_sleep_mode = kwargs.pop("enable_sleep_mode", True) + kwargs["enable_sleep_mode"] = _resolve_enable_sleep_mode(requested_sleep_mode) if "worker_extension_cls" not in kwargs: # VLLM_USE_V1 is deprecated in vllm>=0.11.1 @@ -135,6 +160,7 @@ async def create_async_llm(resource_placement_groups: List[Dict], **kwargs): stat_loggers=None, ) + setattr(async_llm, "roll_sleep_mode_enabled", kwargs["enable_sleep_mode"]) await async_llm.custom_init_worker() return async_llm diff --git a/tests/distributed/scheduler/test_initialize.py b/tests/distributed/scheduler/test_initialize.py index 6a20c5480..5d2519d23 100644 --- a/tests/distributed/scheduler/test_initialize.py +++ b/tests/distributed/scheduler/test_initialize.py @@ -1,6 +1,8 @@ +from types import SimpleNamespace + import ray -from roll.distributed.scheduler.initialize import init +from roll.distributed.scheduler.initialize import _get_ray_start_resource_args, init @ray.remote @@ -33,6 +35,26 @@ def test_ray_cluster_func(): print(hello_msg2) +def test_get_ray_start_resource_args_for_gpu(monkeypatch): + fake_platform = SimpleNamespace(device_type="cuda", ray_device_key="GPU") + fake_torch = SimpleNamespace(cuda=SimpleNamespace(device_count=lambda: 4)) + + monkeypatch.setattr("roll.distributed.scheduler.initialize.current_platform", fake_platform) + monkeypatch.setattr("roll.distributed.scheduler.initialize.torch", fake_torch) + + assert _get_ray_start_resource_args() == " --num-gpus=4" + + +def test_get_ray_start_resource_args_for_custom_accelerator(monkeypatch): + fake_platform = SimpleNamespace(device_type="npu", ray_device_key="NPU") + fake_torch = SimpleNamespace(npu=SimpleNamespace(device_count=lambda: 8)) + + monkeypatch.setattr("roll.distributed.scheduler.initialize.current_platform", fake_platform) + monkeypatch.setattr("roll.distributed.scheduler.initialize.torch", fake_torch) + + assert _get_ray_start_resource_args() == """ --resources='{"NPU": 8}'""" + + if __name__ == "__main__": """ RANK=0 WORLD_SIZE=2 MASTER_ADDR='33.197.137.224' MASTER_PORT=54893 python tests/distributed/scheduler/test_initialize.py diff --git a/tests/distributed/scheduler/test_resource_manager.py b/tests/distributed/scheduler/test_resource_manager.py index 96e2f9142..3ba68e1cb 100644 --- a/tests/distributed/scheduler/test_resource_manager.py +++ b/tests/distributed/scheduler/test_resource_manager.py @@ -1,8 +1,10 @@ import os +from types import SimpleNamespace from ray.runtime_env import RuntimeEnv os.environ["RAY_DEDUP_LOGS"] = "0" +import pytest import ray from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -91,6 +93,28 @@ def test_resource_manager_num_gpus_per_worker_gt_1(): print(res) +def test_resource_manager_reports_missing_ray_gpu_resources(monkeypatch): + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.ray.available_resources", + lambda: {"CPU": 8.0}, + ) + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.ray.nodes", + lambda: [{"Alive": True, "Resources": {"CPU": 8.0}}], + ) + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.current_platform", + SimpleNamespace(ray_device_key="GPU", device_type="cuda", device_name="Iluvatar"), + ) + monkeypatch.setattr( + "roll.distributed.scheduler.resource_manager.torch", + SimpleNamespace(cuda=SimpleNamespace(device_count=lambda: 16)), + ) + + with pytest.raises(AssertionError, match="Ray reports 0 GPU resources"): + ResourceManager(num_gpus_per_node=1, num_nodes=1) + + if __name__ == "__main__": """ RANK=0 WORLD_SIZE=2 MASTER_ADDR='33.195.52.67' MASTER_PORT=54893 python tests/distributed/scheduler/test_resource_manager.py diff --git a/tests/platforms/test_platform_init.py b/tests/platforms/test_platform_init.py new file mode 100644 index 000000000..38d6591f8 --- /dev/null +++ b/tests/platforms/test_platform_init.py @@ -0,0 +1,47 @@ +from types import SimpleNamespace + +from roll.platforms import _init_platform + + +def test_init_platform_falls_back_to_cpu_when_no_visible_cuda_devices(monkeypatch): + fake_torch = SimpleNamespace( + cuda=SimpleNamespace( + device_count=lambda: 0, + ) + ) + + monkeypatch.setattr("roll.platforms.torch", fake_torch) + + platform = _init_platform() + + assert platform.device_name == "CPU" + + +def test_init_platform_uses_corex_platform_for_iluvatar_device(monkeypatch): + fake_torch = SimpleNamespace( + cuda=SimpleNamespace( + device_count=lambda: 1, + get_device_name=lambda index=0: "Iluvatar BI-V150", + ) + ) + + monkeypatch.setattr("roll.platforms.torch", fake_torch) + + platform = _init_platform() + + assert platform.device_name == "COREX" + + +def test_init_platform_uses_unknown_platform_for_unrecognized_cuda_device(monkeypatch): + fake_torch = SimpleNamespace( + cuda=SimpleNamespace( + device_count=lambda: 1, + get_device_name=lambda index=0: "Some Vendor Accelerator", + ) + ) + + monkeypatch.setattr("roll.platforms.torch", fake_torch) + + platform = _init_platform() + + assert platform.device_name == "UNKNOWN" diff --git a/tests/platforms/test_platform_memory.py b/tests/platforms/test_platform_memory.py new file mode 100644 index 000000000..7ae668b3f --- /dev/null +++ b/tests/platforms/test_platform_memory.py @@ -0,0 +1,95 @@ +import ctypes + +import roll.platforms.platform as platform_module +from roll.platforms.unknown import UnknownPlatform + + +def test_device_memory_used_falls_back_to_mem_get_info(monkeypatch): + def fake_device_memory_used(device=None): + raise RuntimeError("nvml unavailable") + + def fake_mem_get_info(device=None): + return (3, 10) + + monkeypatch.setattr("torch.cuda.device_memory_used", fake_device_memory_used) + monkeypatch.setattr("torch.cuda.mem_get_info", fake_mem_get_info) + monkeypatch.setattr( + UnknownPlatform, + "_nvml_compatible_device_memory_used", + classmethod(lambda cls, device, primary_exc: None), + ) + + assert UnknownPlatform.device_memory_used() == 7 + + +def test_device_memory_used_falls_back_to_memory_reserved(monkeypatch): + def fake_device_memory_used(device=None): + raise RuntimeError("nvml unavailable") + + def fake_mem_get_info(device=None): + raise RuntimeError("mem_get_info unavailable") + + monkeypatch.setattr("torch.cuda.device_memory_used", fake_device_memory_used) + monkeypatch.setattr("torch.cuda.mem_get_info", fake_mem_get_info) + monkeypatch.setattr("torch.cuda.memory_reserved", lambda device=None: 123) + monkeypatch.setattr( + UnknownPlatform, + "_nvml_compatible_device_memory_used", + classmethod(lambda cls, device, primary_exc: None), + ) + + assert UnknownPlatform.device_memory_used() == 123 + + +def test_map_visible_device_index(monkeypatch): + monkeypatch.setenv("CUDA_VISIBLE_DEVICES", "4,7") + + assert platform_module._map_visible_device_index(0, "CUDA_VISIBLE_DEVICES") == 4 + assert platform_module._map_visible_device_index(1, "CUDA_VISIBLE_DEVICES") == 7 + + +def test_device_memory_used_uses_nvml_compatible_library(monkeypatch): + class FakeFunction: + def __init__(self, fn): + self.fn = fn + self.argtypes = None + self.restype = None + + def __call__(self, *args, **kwargs): + return self.fn(*args, **kwargs) + + class FakeLibrary: + def __init__(self): + self.handle_index = None + self.nvmlInit_v2 = FakeFunction(lambda: 0) + self.nvmlErrorString = FakeFunction(lambda retcode: b"ok") + self.nvmlDeviceGetHandleByIndex_v2 = FakeFunction(self._get_handle) + self.nvmlDeviceGetMemoryInfo = FakeFunction(self._get_memory_info) + + def _get_handle(self, index, handle_ptr): + self.handle_index = index + ctypes.cast(handle_ptr, ctypes.POINTER(ctypes.c_void_p)).contents.value = 1234 + return 0 + + def _get_memory_info(self, handle, memory_info_ptr): + memory_info = ctypes.cast( + memory_info_ptr, ctypes.POINTER(platform_module._NvmlMemoryInfo) + ).contents + memory_info.total = 100 + memory_info.free = 25 + memory_info.used = 75 + return 0 + + fake_library = FakeLibrary() + + monkeypatch.setenv("CUDA_VISIBLE_DEVICES", "5") + monkeypatch.setattr( + platform_module, + "_load_nvml_compatible_library", + lambda: (fake_library, "libixml.so"), + ) + + used, library_name = platform_module._nvml_compatible_device_memory_used(0, "CUDA_VISIBLE_DEVICES") + assert used == 75 + assert library_name == "libixml.so" + assert fake_library.handle_index == 5 diff --git a/tests/third_party/megatron/test_optimizer_compat.py b/tests/third_party/megatron/test_optimizer_compat.py new file mode 100644 index 000000000..b77d0d743 --- /dev/null +++ b/tests/third_party/megatron/test_optimizer_compat.py @@ -0,0 +1,69 @@ +from roll.third_party.megatron.optimizer import _build_param_groups_and_buffers_kwargs + + +def test_build_param_groups_and_buffers_kwargs_supports_new_signature(monkeypatch): + def fake_get_param_groups_and_buffers( + model_chunks, + model_chunk_offset, + config, + no_weight_decay_cond, + scale_lr_cond, + lr_mult, + filter_fn, + buffer_name, + default_skip_embedding_weight_decay=False, + ): + return model_chunks, {} + + monkeypatch.setattr( + "roll.third_party.megatron.optimizer._get_param_groups_and_buffers", + fake_get_param_groups_and_buffers, + ) + + kwargs = _build_param_groups_and_buffers_kwargs( + model_chunk_offset=3, + config="cfg", + no_weight_decay_cond="no_wd", + scale_lr_cond="scale_lr", + lr_mult=0.5, + filter_fn="filter", + buffer_name="buffers", + ) + + assert kwargs == { + "model_chunk_offset": 3, + "config": "cfg", + "no_weight_decay_cond": "no_wd", + "scale_lr_cond": "scale_lr", + "lr_mult": 0.5, + "filter_fn": "filter", + "buffer_name": "buffers", + "default_skip_embedding_weight_decay": False, + } + + +def test_build_param_groups_and_buffers_kwargs_supports_old_signature(monkeypatch): + def fake_get_param_groups_and_buffers(model_chunks, model_chunk_offset, config, filter_fn, buffer_name): + return model_chunks, {} + + monkeypatch.setattr( + "roll.third_party.megatron.optimizer._get_param_groups_and_buffers", + fake_get_param_groups_and_buffers, + ) + + kwargs = _build_param_groups_and_buffers_kwargs( + model_chunk_offset=1, + config="cfg", + no_weight_decay_cond="no_wd", + scale_lr_cond="scale_lr", + lr_mult=1.0, + filter_fn="filter", + buffer_name="buffers", + ) + + assert kwargs == { + "model_chunk_offset": 1, + "config": "cfg", + "filter_fn": "filter", + "buffer_name": "buffers", + } From 5a2e836759600344de8ba913b22f58168a8f9cc6 Mon Sep 17 00:00:00 2001 From: lxzlxzliuxuzhao Date: Sat, 21 Mar 2026 11:28:38 +0000 Subject: [PATCH 2/2] fix vllm version bug --- examples/agentic_demo/agent_val_rock_swe.yaml | 20 +++++----- .../agent_val_sokoban_sandbox.yaml | 2 +- roll/third_party/vllm/__init__.py | 11 ++++-- roll/third_party/vllm/versioning.py | 27 +++++++++++++ roll/third_party/vllm/worker.py | 8 ++-- tests/third_party/vllm/test_versioning.py | 39 +++++++++++++++++++ 6 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 roll/third_party/vllm/versioning.py create mode 100644 tests/third_party/vllm/test_versioning.py diff --git a/examples/agentic_demo/agent_val_rock_swe.yaml b/examples/agentic_demo/agent_val_rock_swe.yaml index 55110b3e6..5e3898b18 100644 --- a/examples/agentic_demo/agent_val_rock_swe.yaml +++ b/examples/agentic_demo/agent_val_rock_swe.yaml @@ -75,13 +75,13 @@ actor_train: strategy_args: strategy_name: megatron_train strategy_config: - tensor_model_parallel_size: 1 + tensor_model_parallel_size: 2 pipeline_model_parallel_size: 1 expert_model_parallel_size: 1 context_parallel_size: 1 use_distributed_optimizer: false recompute_granularity: full - device_mapping: list(range(2,3)) + device_mapping: list(range(2,4)) infer_batch_size: 1 actor_infer: model_args: @@ -105,8 +105,8 @@ actor_infer: gpu_memory_utilization: 0.8 block_size: 16 load_format: auto - tensor_parallel_size: 1 - device_mapping: list(range(1,2)) + tensor_parallel_size: 2 + device_mapping: list(range(0,2)) reference: model_args: @@ -123,7 +123,7 @@ reference: pipeline_model_parallel_size: 1 expert_model_parallel_size: 1 context_parallel_size: 1 - device_mapping: list(range(2,3)) + device_mapping: list(range(4,5)) infer_batch_size: 1 reward_normalization: @@ -181,7 +181,7 @@ agent_config_common: custom_install_cmd: "wget --retry-connrefused --tries=10 --waitretry=2 -O ~/iflow-cli.tgz 'http://cloud.iflow.cn/iflow-cli/iflow-ai-iflow-cli-for-roll-0-4-4-v5.tgz' && npm i -g ~/iflow-cli.tgz" env: IFLOW_apiKey: "test" - IFLOW_baseUrl: "http://localhost:8080/v1" + IFLOW_baseUrl: "http://10.31.10.33:8080/v1" IFLOW_modelName: "ROME" IFLOW_searchApiKey: "88888888" IFLOW_selectedAuthType: "openai-compatible" @@ -200,11 +200,11 @@ custom_envs: agent_system_template: "agent_system_template placeholder" agent_template: "agent_template placeholder" env_config: - dataset_name: /ROLL/data/swe_bench_verified_example.jsonl + dataset_name: /data1/lxzs_workspace/ROLL/data/swe_bench_verified_example.jsonl tools: ~ max_steps: ${max_actions_per_traj} mode: "train" - sandbox_base_url: http://localhost:8080 # change to your own service address if needed + sandbox_base_url: http://10.31.10.33:8080 # change to your own service address if needed user_id: "xxx" experiment_id: "test_tb_native" test_files: ["/terminal-bench-datasets/datasets/swebench-verified"] @@ -217,11 +217,11 @@ custom_envs: agent_system_template: "agent_system_template placeholder" agent_template: "agent_template placeholder" env_config: - dataset_name: /ROLL/data/swe_bench_verified_example.jsonl + dataset_name: /data1/lxzs_workspace/ROLL/data/swe_bench_verified_example.jsonl tools: ~ max_steps: ${max_actions_per_traj} mode: "val" - sandbox_base_url: http://localhost:8080 # change to your own service address if needed + sandbox_base_url: http://10.31.10.33:8080 # change to your own service address if needed user_id: "xxx" experiment_id: "test_tb_native" test_files: ["/terminal-bench-datasets/datasets/swebench-verified"] diff --git a/examples/agentic_demo/agent_val_sokoban_sandbox.yaml b/examples/agentic_demo/agent_val_sokoban_sandbox.yaml index 8bb2d8042..3256558a5 100644 --- a/examples/agentic_demo/agent_val_sokoban_sandbox.yaml +++ b/examples/agentic_demo/agent_val_sokoban_sandbox.yaml @@ -162,7 +162,7 @@ custom_envs: SokobanSandbox: env_type: sokoban_sandbox env_config: - base_url: 'http://localhost:8080' # change to your own service address if needed + base_url: 'http://10.31.10.33:8080' # change to your own service address if needed max_steps: ${max_actions_per_traj} max_tokens_per_step: ${max_tokens_per_step} env_manager_cls: ${env_manager_cls} diff --git a/roll/third_party/vllm/__init__.py b/roll/third_party/vllm/__init__.py index 8de27f0ec..0678016a0 100644 --- a/roll/third_party/vllm/__init__.py +++ b/roll/third_party/vllm/__init__.py @@ -12,6 +12,7 @@ from roll.platforms import current_platform import roll.third_party.vllm.fp8 as fp8 +from roll.third_party.vllm.versioning import supports_vllm_0_11_v0_ray_executor, uses_vllm_0_11_adapter from roll.utils.import_utils import safe_import_class from roll.utils.logging import get_logger @@ -25,9 +26,13 @@ elif Version("0.10.2") == Version(vllm.__version__): ray_executor_class_v0 = safe_import_class("roll.third_party.vllm.vllm_0_10_2.ray_distributed_executor.CustomRayDistributedExecutor") ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_10_2.v1.ray_distributed_executor.CustomRayDistributedExecutor") -elif Version("0.11.0") == Version(vllm.__version__) or Version("0.11.1rc1") == Version(vllm.__version__) or Version("0.11.1rc2.dev0+gc3a722fcb.d20251021") == Version(vllm.__version__): - ray_executor_class_v0 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.ray_distributed_executor.CustomRayDistributedExecutor") - ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.v1.ray_distributed_executor.CustomRayDistributedExecutor") +elif uses_vllm_0_11_adapter(vllm.__version__): + ray_executor_class_v0 = None + if supports_vllm_0_11_v0_ray_executor(vllm.__version__): + ray_executor_class_v0 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.ray_distributed_executor.CustomRayDistributedExecutor") + ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_11_0.v1.ray_distributed_executor.CustomRayDistributedExecutor") + else: + ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_12_0.ray_distributed_executor.CustomRayDistributedExecutor") elif Version("0.12.0") == Version(vllm.__version__): ray_executor_class_v0 = None # V0 deprecated ray_executor_class_v1 = safe_import_class("roll.third_party.vllm.vllm_0_12_0.ray_distributed_executor.CustomRayDistributedExecutor") diff --git a/roll/third_party/vllm/versioning.py b/roll/third_party/vllm/versioning.py new file mode 100644 index 000000000..e7337419a --- /dev/null +++ b/roll/third_party/vllm/versioning.py @@ -0,0 +1,27 @@ +from packaging.version import Version + + +_VLLM_0_11_MIN = Version("0.11.0") +_VLLM_0_12_MIN = Version("0.12.0") +_VLLM_0_11_V0_RAY_EXECUTOR_MAX = Version("0.11.2") + + +def uses_vllm_0_11_adapter(version: str) -> bool: + parsed = Version(version) + return _VLLM_0_11_MIN <= parsed < _VLLM_0_12_MIN + + +def supports_vllm_0_11_v0_ray_executor(version: str) -> bool: + parsed = Version(version) + return _VLLM_0_11_MIN <= parsed < _VLLM_0_11_V0_RAY_EXECUTOR_MAX + + +def load_process_weights_after_loading_utils(): + from vllm.model_executor.model_loader.utils import process_weights_after_loading + + try: + from vllm.model_executor.model_loader.utils import set_default_torch_dtype + except ImportError: + from vllm.utils.torch_utils import set_default_torch_dtype + + return process_weights_after_loading, set_default_torch_dtype diff --git a/roll/third_party/vllm/worker.py b/roll/third_party/vllm/worker.py index ea82ceb40..f9bb5e0e1 100644 --- a/roll/third_party/vllm/worker.py +++ b/roll/third_party/vllm/worker.py @@ -7,9 +7,9 @@ import torch import vllm -from packaging.version import Version from roll.platforms import current_platform +from roll.third_party.vllm.versioning import load_process_weights_after_loading_utils, uses_vllm_0_11_adapter from roll.third_party.vllm.vllm_utils import TensorLoRARequest, patch_vllm_lora_manager from roll.utils.collective import collective from roll.utils.cuda_ipc_utils import MultiprocessingSerializer @@ -144,10 +144,8 @@ def update_parameter_in_bucket(self, serialized_named_tensors, is_lora=False): self.load_weights([(name, weight) for name, weight in named_params]) def process_weights_after_loading(self): - if (Version("0.11.0") == Version(vllm.__version__) or - Version("0.11.1rc1") == Version(vllm.__version__) or - Version("0.11.1rc2.dev0+gc3a722fcb.d20251021") == Version(vllm.__version__)): - from vllm.model_executor.model_loader.utils import process_weights_after_loading,set_default_torch_dtype + if uses_vllm_0_11_adapter(vllm.__version__): + process_weights_after_loading, set_default_torch_dtype = load_process_weights_after_loading_utils() device_config = self.device_config load_config = self.vllm_config.load_config load_device = (device_config.device if load_config.device is None else load_config.device) diff --git a/tests/third_party/vllm/test_versioning.py b/tests/third_party/vllm/test_versioning.py new file mode 100644 index 000000000..06e6ac8ee --- /dev/null +++ b/tests/third_party/vllm/test_versioning.py @@ -0,0 +1,39 @@ +import vllm + +import roll.third_party.vllm as roll_vllm +from roll.third_party.vllm.versioning import ( + load_process_weights_after_loading_utils, + supports_vllm_0_11_v0_ray_executor, + uses_vllm_0_11_adapter, +) + + +def test_uses_vllm_0_11_adapter(): + assert uses_vllm_0_11_adapter("0.11.0") + assert uses_vllm_0_11_adapter("0.11.1rc1") + assert uses_vllm_0_11_adapter("0.11.2") + assert not uses_vllm_0_11_adapter("0.10.2") + assert not uses_vllm_0_11_adapter("0.12.0") + + +def test_supports_vllm_0_11_v0_ray_executor(): + assert supports_vllm_0_11_v0_ray_executor("0.11.0") + assert supports_vllm_0_11_v0_ray_executor("0.11.1rc1") + assert not supports_vllm_0_11_v0_ray_executor("0.11.2") + assert not supports_vllm_0_11_v0_ray_executor("0.12.0") + + +def test_load_process_weights_after_loading_utils(): + if not uses_vllm_0_11_adapter(vllm.__version__): + return + + process_weights_after_loading, set_default_torch_dtype = load_process_weights_after_loading_utils() + assert callable(process_weights_after_loading) + assert callable(set_default_torch_dtype) + + +def test_roll_vllm_module_exposes_v1_ray_executor(): + if not uses_vllm_0_11_adapter(vllm.__version__): + return + + assert roll_vllm.ray_executor_class_v1 is not None