refit latest update based on ahmads refactor#1993
refit latest update based on ahmads refactor#1993shanmugamr1992 wants to merge 10 commits intoahmadki/mcore_mainfrom
Conversation
Signed-off-by: Yi-Fu Wu <yifu.wu@gmail.com>
Signed-off-by: Yi-Fu Wu <yifu.wu@gmail.com>
… into ahmadki/mcore_main
Signed-off-by: Ahmad Kiswani <kiswani.ahmad@gmail.com>
📝 WalkthroughWalkthroughThis PR introduces a coordinator-based persistent inference engine for Megatron backend in GRPO, refactors MegatronPolicyWorker to support dynamic inference with async context management, adds MegatronGeneration wrapper class, updates CUDA graph and RNG tracker configurations, and implements parallel initialization for Megatron generation workers. Changes
Sequence Diagram(s)sequenceDiagram
participant GRPO as GRPO Orchestrator
participant MG as MegatronGeneration
participant MPW as MegatronPolicyWorker
participant Coordinator as Inference Coordinator
participant Engine as DynamicInferenceEngine
GRPO->>+MG: __init__(cluster, config, tokenizer)
MG->>+MPW: initialize on workers
MPW->>MPW: create Policy (inference mode)
deactivate MPW
deactivate MG
GRPO->>+MG: generate(data, greedy)
MG->>+MPW: generate(data)
MPW->>MPW: inference_mode context enter
MPW->>MPW: _initialize_inference_engine (on first call)
MPW->>+Coordinator: _start_inference_coordinator()
Coordinator->>+Engine: setup DynamicInferenceEngine
Engine-->>-Coordinator: ready
Coordinator-->>-MPW: coordinator ready
MPW->>+Coordinator: submit prompts via InferenceClient
Coordinator->>+Engine: process tokens
Engine-->>-Coordinator: generated tokens
Coordinator-->>-MPW: results (rank 0 only)
MPW->>MPW: broadcast results to all ranks
MPW->>MPW: inference_mode context exit
MPW-->>-MG: generated output
MG-->>-GRPO: generated tokens
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly Related PRs
Suggested Labels
Suggested Reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 20
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
nemo_rl/models/policy/workers/megatron_policy_worker.py (1)
115-233:⚠️ Potential issue | 🔴 CriticalMissing
__init__initialization for all new inference-engine state —AttributeErrorat runtimeNone of the instance variables introduced by the new methods are initialized in the constructor. The first call to
generate()/inference_mode()will immediately raiseAttributeError:
Variable First accessed at _inference_engine_initializedLine 680, Line 912 _inference_engine_alseepLine 755, Line 920 _inference_loopLine 1136, Line 1148, Line 1171 inference_contextLine 736 inference_wrapped_modelLine 739 dynamic_inference_engineLine 749 inference_clientLine 773, Line 1217 🐛 Proposed fix — add to the end of
__init__## used for streaming update inference engine weights self._held_gather_buffer = None + + # Inference engine state (coordinator-based persistent inference) + self._inference_engine_initialized = False + self._inference_engine_asleep = True + self._inference_loop = None + self.inference_context = None + self.inference_wrapped_model = None + self.dynamic_inference_engine = None + self.inference_client = NoneAs per coding guidelines: "Initialize all externally visible members of a class in the constructor."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nemo_rl/models/policy/workers/megatron_policy_worker.py` around lines 115 - 233, The constructor for MegatronPolicyWorker fails to initialize several new inference-engine instance attributes, causing AttributeError when methods like generate()/inference_mode() access them; fix by adding initialization of all new fields at the end of __init__ — set _inference_engine_initialized=False, _inference_engine_alseep=False (or False spelled as in codebase), _inference_loop=None, inference_context=None, inference_wrapped_model=None, dynamic_inference_engine=None, inference_client=None, and any other inference-related placeholders (e.g., _held_gather_buffer already present) so methods like generate, inference_mode, and related functions find defined attributes on self.
🧹 Nitpick comments (2)
nemo_rl/models/policy/lm_policy.py (1)
646-661: Comment indentation and mixed quote styles in themegatronelifbranchThe comments at lines 647–653 are indented to the same level as the
elifkeyword (8 spaces), not to the body of the branch (12 spaces). This makes the block visually ambiguous. Additionally,self.cfg["generation"]['backend']mixes double and single quotes for the same dict access on lines 638 and 646.♻️ Proposed fix
- elif self.cfg["generation"]['backend'] == "megatron": - # For coordinator-based inference: send ALL data to DP rank 0 only. - # Other DP ranks are called with data=None but still participate in the - # inference engine loop. The coordinator handles load balancing across DP ranks. - # - # With in_sharded_axes=[] and data_parallel not in replicate_on_axes, - # data_parallel becomes a "free axis". Only workers at DP coord 0 receive data, - # while workers at other DP coords get None (via make_dummy_calls_to_free_axes). - in_sharded_axes = [] + elif self.cfg["generation"]["backend"] == "megatron": + # For coordinator-based inference: send ALL data to DP rank 0 only. + # Other DP ranks are called with data=None but still participate in the + # inference engine loop. The coordinator handles load balancing across DP ranks. + # + # With in_sharded_axes=[] and data_parallel not in replicate_on_axes, + # data_parallel becomes a "free axis". Only workers at DP coord 0 receive data, + # while workers at other DP coords get None (via make_dummy_calls_to_free_axes). + in_sharded_axes = []Apply the same quote normalization to line 638:
self.cfg["generation"]["backend"].🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nemo_rl/models/policy/lm_policy.py` around lines 646 - 661, Indent the comment block inside the megatron branch to match the branch body (so the comments are under the elif: branch, not aligned with the elif keyword) and normalize dictionary access quotes to use the same style (change self.cfg['generation']['backend'] to self.cfg["generation"]["backend"] wherever used in this snippet, including the f-string in the ValueError and any other references); ensure the variables in this branch (in_sharded_axes, output_is_replicated) remain unchanged.nemo_rl/models/policy/workers/megatron_policy_worker.py (1)
1412-1432:update_weights_from_collectiveis an unimplemented stub — will break any callerAny code path that calls
update_weights_from_collectiveon aMegatronPolicyWorker(e.g., fromMegatronGeneration) will immediately raiseNotImplementedError, making the full non-colocated weight-refit flow non-functional.Would you like me to open a tracking issue or generate a skeleton implementation using
packed_broadcast_consumerto receive and apply weights from the training side?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nemo_rl/models/policy/workers/megatron_policy_worker.py` around lines 1412 - 1432, The method update_weights_from_collective in MegatronPolicyWorker is a placeholder that raises NotImplementedError and must be implemented to receive and apply weights from training workers; implement it to iterate over self.state_dict_info, call packed_broadcast_consumer (or the project's consumer API) for each packed chunk to receive tensors, deserialize/reshape those buffers into parameter tensors, and copy them into the Megatron model parameters (use .data.copy_ or load_state_dict-like updates) so the local model matches the broadcasted weights; ensure torch.no_grad() is kept, properly handle device placement and dtype conversion, and return True on success or False on failure, matching the contract of broadcast_weights_for_collective.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@migration_notes_delete.md`:
- Around line 1-120: Add a backward-compatibility shim module by creating
megatron/core/datasets/megatron_tokenizer.py that defines the
MegatronLegacyTokenizer class (with methods/properties shown) and then sets
MegatronTokenizer = MegatronLegacyTokenizer so imports of MegatronTokenizer
succeed; also update the migration_notes_delete.md to fix markdownlint warnings
by incrementing heading levels appropriately (MD001) and adding language
identifiers to all fenced code blocks (e.g., ```python) and ensure closing
fences are present, and check the DistributedDataParallel mention (symbols:
DistributedDataParallel, param_sync, use_forward_hook) is inside a properly
formatted fenced block.
In `@nemo_rl/algorithms/grpo.py`:
- Around line 619-651: The new non-colocated Megatron initialization branch
(uses colocated_inference, init_megatron_generation, init_policy, and writes to
worker_init_timing_metrics) is unreachable because an earlier guard still
asserts backend != "megatron"; remove or relax that guard so backend ==
"megatron" is allowed for non-colocated clusters (or change the assertion to
validate only incompatible combos), ensuring the code path that submits
init_megatron_generation and init_policy via ThreadPoolExecutor can execute.
In `@nemo_rl/models/generation/megatron/__init__.py`:
- Around line 1-13: The copyright header in the
nemo_rl.models.generation.megatron.__init__ module still shows 2025; update the
year to 2026 in the file header so it matches repo policy. Edit the top-of-file
license block in __init__.py within the megatron package to replace "2025" with
"2026" and ensure the rest of the Apache-2.0 header text remains unchanged.
In `@nemo_rl/models/generation/megatron/megatron_generation.py`:
- Around line 1-13: Update the copyright header in the megatron_generation.py
module to use the current year 2026 (replace "2025" with "2026") so the
top-of-file license block in
nemo_rl.models.generation.megatron.megatron_generation.py reflects the repo
rule; ensure the full NVIDIA copyright header and Apache License block remain
intact except for the year change.
In `@nemo_rl/models/megatron/setup.py`:
- Around line 321-323: The conditional is using a stray f-string key access
config["generation"][f"backend"] which triggers Ruff F541; replace the f-string
with a plain string key so the check reads config["generation"]["backend"] ==
"megatron" (the conditional guarding the call to
_apply_cuda_graph_and_rng_tracker_config(model_cfg, config)). Ensure you update
only the key access and keep the rest of the conditional and the call to
_apply_cuda_graph_and_rng_tracker_config unchanged.
- Around line 374-380: Add a Google-style docstring to the helper
_apply_cuda_graph_and_rng_tracker_config to document its purpose and parameters;
include an Args section documenting model_cfg (Any) and config (PolicyConfig)
and a Returns section indicating None, plus a short one-line description of the
function’s behavior (it applies CUDA graph and RNG tracker settings from config
into model_cfg) so it conforms to the repo’s Sphinx-parsable docstring
guidelines.
- Around line 414-416: Add the missing keys to the MegatronConfig TypedDict
(symbol: MegatronConfig) with proper type annotations and short docstrings
describing each field's purpose: cuda_graph_impl, cuda_graph_scope,
inference_rng_tracker, layernorm_epsilon,
moe_pad_experts_for_cuda_graph_inference, moe_permute_fusion,
moe_router_bias_update_rate, moe_router_dtype, train_iters, and
use_te_rng_tracker; ensure the default values described in the docstrings match
the defaults used in setup logic (e.g., where model_cfg.* are set) and update
the example YAMLs under examples/configs/*.yaml to include these keys with the
corresponding default values so the TypedDict, setup.py usage, and examples stay
consistent.
In `@nemo_rl/models/policy/lm_policy.py`:
- Around line 663-670: The generate call to
worker_group.run_all_workers_sharded_data is missing "context_parallel" in the
replicate_on_axes list, causing context-parallel ranks to be treated as free
axes and receive None; update the replicate_on_axes argument in the generate
invocation inside lm_policy.py (the call to run_all_workers_sharded_data within
the generate method) to include "context_parallel" alongside "tensor_parallel"
and "pipeline_parallel" so CP ranks are replicated like the other dispatch
methods.
In `@nemo_rl/models/policy/workers/megatron_policy_worker.py`:
- Line 1008: The with-statement currently binds the DynamicInferenceEngine
returned by self.inference_mode(mcore_generation_config) to the unused name
inference_engine; remove the unused alias by changing the context manager to use
self.inference_mode(mcore_generation_config) without assignment (i.e., drop "=
inference_engine") so the code reads with torch.no_grad(),
self.inference_mode(mcore_generation_config): and leave the rest of the block
unchanged; reference symbols: inference_mode, DynamicInferenceEngine,
mcore_generation_config, and torch.no_grad().
- Around line 1231-1232: In the for-loop inside megatron_policy_worker.py where
you call enumerate(zip(prompt_tokens_tensor, prompt_lengths_tensor,
strict=True)), the enumerated variable request_id is never used; rename it to
_request_id to satisfy the B007 lint rule and indicate the variable is
intentionally unused (leave the rest of the loop unchanged, e.g., the tuple
unpacking of (prompt_tokens, prompt_len) and loop body).
- Line 755: Rename the misspelled instance attribute _inference_engine_alseep to
_inference_engine_asleep everywhere in the MegatronPolicyWorker class (and any
methods referencing it) so the initializer and all uses match; update the
declaration currently set to True and every reference (including the occurrences
you noted) to the corrected name to avoid inconsistencies when adding the proper
__init__ initialization.
- Line 756: Remove the raw print() calls in megatron_policy_worker.py and
replace them with structured logging (e.g., self.logger.info/debug) including
the rank context; specifically replace occurrences like print(f"[Rank
{self.rank}] Initialized persistent inference engine") and the other prints at
the noted locations with calls to the worker's logger (use self.logger and
include self.rank in the message) so logs are consistent across distributed runs
and not interleaved as raw stdout.
- Around line 1099-1104: The assignment to logprobs_padded is off-by-one and
silently drops the final logprob when logprob_len equals max_seq_len; update the
buffer or indexing in megatron_policy_worker.py so the full range is written.
Specifically, adjust the allocation or shape of logprobs_padded (or the slice
indices around the assignment to logprobs_padded[i, 1 : logprob_len + 1]) so
there is room for logprob_len values starting at index 1 (e.g., expand
logprobs_padded by one element or shift the start index to 0), ensuring the
entire torch.tensor(logprobs, ...) is stored without clipping.
- Line 1067: When computing max_gen_seq_len from the generator output, guard
against an empty result to avoid ValueError: replace the direct max([...]) call
with a check (e.g., if not result: set max_gen_seq_len = 0 or an appropriate
default) before computing max(len(x.generated_tokens) for x in result); the
relevant symbols are result, max_gen_seq_len, and the generator call
_generate_with_persistent_engine (and the input prompt_tokens_tensor which can
be zero-rows), so update the code around the max_gen_seq_len assignment in
megatron_policy_worker.py to handle the empty list case safely.
- Around line 977-978: The code calls no_grad = torch.no_grad();
no_grad.__enter__() without a matching __exit__, so non-rank-0 early returns
leave gradients disabled; change this to use a proper context manager by
wrapping the function body (the train/worker execution block that currently
calls no_grad.__enter__()) with "with torch.no_grad():" (or ensure a try/finally
that calls no_grad.__exit__() on all exit paths), remove the now-redundant inner
"with torch.no_grad()" at the inner inference block (the one noted around the
former line 1008), and ensure any other manual enter/exit usages (the no_grad
variable and its manual calls) are eliminated so every return path exits the
no-grad context.
- Around line 1117-1137: The busy-wait in _start_inference_loop_thread can hang
forever if the background thread fails before assigning self._inference_loop;
replace the spin-wait with a threading.Event (e.g., loop_ready_event) that the
thread sets after creating the loop in run_loop, wait on that event with a
reasonable timeout, and after the wait check thread.is_alive() and/or an
exception container set by the thread (store any exception in a shared
attribute) to raise a clear error instead of looping; update references to
self._inference_loop, self._inference_thread and run_loop to use the event and
the exception propagation so startup either succeeds or fails fast.
- Line 696: In _initialize_inference_engine in megatron_policy_worker.py the
local assignment model_cfg = self.megatron_cfg.model is unused (F841); remove
that assignment line to eliminate the unused variable warning (or if you
intended to use it, replace direct references to self.megatron_cfg.model with
model_cfg where needed), referencing the _initialize_inference_engine function
and the model_cfg symbol to locate the change.
- Line 1145: The import concurrent.futures at the top of
megatron_policy_worker.py is unused; remove the dead import statement so there’s
no unused dependency. Specifically delete the import line referencing
concurrent.futures (it was intended for use in _run_async_coordinator_start but
that function body does not reference it), leaving other imports intact.
- Around line 806-819: In both async methods _sleep_engine and _wake_engine,
await the coroutine calls to the inference client so the pause/resume signals
actually execute: change the calls to self.inference_client.suspend_engines()
and self.inference_client.resume_engines() to be awaited (i.e., await
self.inference_client.suspend_engines() in _sleep_engine and await
self.inference_client.resume_engines() in _wake_engine), preserving the existing
rank-0 conditional and the subsequent waits
(dynamic_inference_engine.paused.wait() and
dynamic_inference_engine.suspend()/resume() calls) so the barrier and engine
suspend/resume ordering remain intact.
---
Outside diff comments:
In `@nemo_rl/models/policy/workers/megatron_policy_worker.py`:
- Around line 115-233: The constructor for MegatronPolicyWorker fails to
initialize several new inference-engine instance attributes, causing
AttributeError when methods like generate()/inference_mode() access them; fix by
adding initialization of all new fields at the end of __init__ — set
_inference_engine_initialized=False, _inference_engine_alseep=False (or False
spelled as in codebase), _inference_loop=None, inference_context=None,
inference_wrapped_model=None, dynamic_inference_engine=None,
inference_client=None, and any other inference-related placeholders (e.g.,
_held_gather_buffer already present) so methods like generate, inference_mode,
and related functions find defined attributes on self.
---
Nitpick comments:
In `@nemo_rl/models/policy/lm_policy.py`:
- Around line 646-661: Indent the comment block inside the megatron branch to
match the branch body (so the comments are under the elif: branch, not aligned
with the elif keyword) and normalize dictionary access quotes to use the same
style (change self.cfg['generation']['backend'] to
self.cfg["generation"]["backend"] wherever used in this snippet, including the
f-string in the ValueError and any other references); ensure the variables in
this branch (in_sharded_axes, output_is_replicated) remain unchanged.
In `@nemo_rl/models/policy/workers/megatron_policy_worker.py`:
- Around line 1412-1432: The method update_weights_from_collective in
MegatronPolicyWorker is a placeholder that raises NotImplementedError and must
be implemented to receive and apply weights from training workers; implement it
to iterate over self.state_dict_info, call packed_broadcast_consumer (or the
project's consumer API) for each packed chunk to receive tensors,
deserialize/reshape those buffers into parameter tensors, and copy them into the
Megatron model parameters (use .data.copy_ or load_state_dict-like updates) so
the local model matches the broadcasted weights; ensure torch.no_grad() is kept,
properly handle device placement and dtype conversion, and return True on
success or False on failure, matching the contract of
broadcast_weights_for_collective.
| # NOTES | ||
| ### MegatronTokenizer Issue. | ||
| ``` | ||
| from megatron.core.datasets.megatron_tokenizer import MegatronTokenizer as MegatronTokenizer | ||
| ModuleNotFoundError: No module named 'megatron.core.datasets.megatron_tokenizer' | ||
| ``` | ||
| Fix : | ||
| Create this file in megatron/core/datasets/megatron_tokenizer.py | ||
| ``` | ||
| # Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Backward-compatibility shim. The legacy tokenizer base class was moved out of | ||
| # megatron.core.datasets in newer Megatron-LM versions. Megatron-Bridge still | ||
| # imports from this path, so we keep a thin re-export here. | ||
|
|
||
| import json | ||
| import logging | ||
| from abc import ABC, abstractmethod | ||
| from collections import OrderedDict | ||
| from typing import Any | ||
|
|
||
| import numpy | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class MegatronLegacyTokenizer(ABC): | ||
| """Abstract class for tokenizer | ||
|
|
||
| Absent a config or class-specific tracking of which objects are uniquely identifying, we must | ||
| include all key word arguments as unique identifiers | ||
|
|
||
| Args: | ||
| tokenizer_paths (Tuple[str]): All tokenizer source paths or prefixes | ||
|
|
||
| tokenizer_options (Dict[str, Any]): All tokenizer options | ||
| """ | ||
|
|
||
| def __init__(self, *tokenizer_paths: str, **tokenizer_options: Any): | ||
| logger.warning( | ||
| "You're using the legacy tokenizer system, which is deprecated " | ||
| "and will be removed in a future release. Please migrate to the new tokenizer system " | ||
| "(`megatron.core.tokenizers.MegatronTokenizer`)." | ||
| ) | ||
| self.unique_identifiers = OrderedDict() | ||
| self.unique_identifiers["class"] = type(self).__name__ | ||
| self.unique_identifiers["tokenizer_path"] = list(tokenizer_paths) | ||
| for option in tokenizer_options: | ||
| self.unique_identifiers[option] = str(tokenizer_options[option]) | ||
|
|
||
| self.unique_description = json.dumps(self.unique_identifiers, indent=4) | ||
|
|
||
| super().__init__() | ||
|
|
||
| @abstractmethod | ||
| def tokenize(self, text: str) -> numpy.ndarray: | ||
| pass | ||
|
|
||
| def detokenize(self, ids: numpy.ndarray) -> str: | ||
| raise NotImplementedError("{} has no method 'detokenize'".format(type(self).__name__)) | ||
|
|
||
| def offsets(self, ids: list[int], text: str) -> list[int]: | ||
| raise NotImplementedError("{} has no method 'offsets'".format(type(self).__name__)) | ||
|
|
||
| @property | ||
| @abstractmethod | ||
| def vocab(self): | ||
| pass | ||
|
|
||
| @property | ||
| @abstractmethod | ||
| def inv_vocab(self): | ||
| pass | ||
|
|
||
| @property | ||
| @abstractmethod | ||
| def vocab_size(self): | ||
| pass | ||
|
|
||
| @property | ||
| def cls(self): | ||
| raise NotImplementedError("{} has no attribute 'cls'".format(type(self).__name__)) | ||
|
|
||
| @property | ||
| def sep(self): | ||
| raise NotImplementedError("{} has no attribute 'sep'".format(type(self).__name__)) | ||
|
|
||
| @property | ||
| def pad(self): | ||
| raise NotImplementedError("{} has no attribute 'pad'".format(type(self).__name__)) | ||
|
|
||
| @property | ||
| def eod(self): | ||
| raise NotImplementedError("{} has no attribute 'eod'".format(type(self).__name__)) | ||
|
|
||
| @property | ||
| def bos(self): | ||
| raise NotImplementedError("{} has no attribute 'bos'".format(type(self).__name__)) | ||
|
|
||
| @property | ||
| def eos(self): | ||
| raise NotImplementedError("{} has no attribute 'eos'".format(type(self).__name__)) | ||
|
|
||
| @property | ||
| def mask(self): | ||
| raise NotImplementedError("{} has no attribute 'mask'".format(type(self).__name__)) | ||
|
|
||
|
|
||
| # Older code imported this class under the name ``MegatronTokenizer``. | ||
| MegatronTokenizer = MegatronLegacyTokenizer | ||
|
|
||
| ``` | ||
|
|
||
|
|
||
| ### DDP ISSUE | ||
| ``` | ||
| @@ -369,12 +369,15 @@ class DistributedDataParallel(_BaseDataParallel): | ||
| Skip synchronous param all-gather if `param_sync` is False. | ||
| """ | ||
| assert self.use_forward_hook |
There was a problem hiding this comment.
Fix markdownlint warnings (heading levels + code fence language).
MD001: Heading levels should increment by one.
MD040: Add language identifiers to fenced code blocks.
Example fixes
-### MegatronTokenizer Issue.
+## MegatronTokenizer Issue.
-```
+```python
...
-```
+```🧰 Tools
🪛 markdownlint-cli2 (0.21.0)
[warning] 2-2: Heading levels should only increment by one level at a time
Expected: h2; Actual: h3
(MD001, heading-increment)
[warning] 3-3: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
[warning] 9-9: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
[warning] 116-116: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@migration_notes_delete.md` around lines 1 - 120, Add a backward-compatibility
shim module by creating megatron/core/datasets/megatron_tokenizer.py that
defines the MegatronLegacyTokenizer class (with methods/properties shown) and
then sets MegatronTokenizer = MegatronLegacyTokenizer so imports of
MegatronTokenizer succeed; also update the migration_notes_delete.md to fix
markdownlint warnings by incrementing heading levels appropriately (MD001) and
adding language identifiers to all fenced code blocks (e.g., ```python) and
ensure closing fences are present, and check the DistributedDataParallel mention
(symbols: DistributedDataParallel, param_sync, use_forward_hook) is inside a
properly formatted fenced block.
| if colocated_inference: | ||
| policy_generation = None | ||
| print( | ||
| f" ✓ Using {backend} backend for generation with {policy_config['model_name']}", | ||
| flush=True, | ||
| ) | ||
|
|
||
| policy, policy_time = init_policy() | ||
| worker_init_timing_metrics["policy_init_time_s"] = policy_time | ||
| policy, policy_time = init_policy() | ||
| worker_init_timing_metrics["policy_init_time_s"] = policy_time | ||
| else: | ||
| # Non-colocated Megatron backend: separate inference workers | ||
| print( | ||
| " ⚡ Using parallel worker initialization (non-colocated Megatron mode)", | ||
| flush=True, | ||
| ) | ||
|
|
||
| # Execute both initializations in parallel | ||
| parallel_start_time = time.perf_counter() | ||
| with ThreadPoolExecutor(max_workers=2) as executor: | ||
| megatron_gen_future = executor.submit(init_megatron_generation) | ||
| policy_future = executor.submit(init_policy) | ||
| policy_generation, megatron_gen_time = megatron_gen_future.result() | ||
| policy, policy_time = policy_future.result() | ||
| parallel_wall_time = time.perf_counter() - parallel_start_time | ||
|
|
||
| # Store timing metrics | ||
| worker_init_timing_metrics["megatron_generation_init_time_s"] = ( | ||
| megatron_gen_time | ||
| ) | ||
| worker_init_timing_metrics["policy_init_time_s"] = policy_time | ||
| worker_init_timing_metrics["parallel_wall_time_s"] = parallel_wall_time | ||
| worker_init_timing_metrics["parallel_init_enabled"] = True | ||
|
|
There was a problem hiding this comment.
Non‑colocated Megatron path is blocked by an earlier assert.
You added a non‑colocated Megatron initialization path here, but the earlier guard in the non‑colocated cluster setup still asserts backend != "megatron", so this branch is unreachable. Please remove or relax that guard to enable the new feature.
Proposed fix (remove legacy guard)
- else:
- assert generation_config["backend"] != "megatron", (
- "Non-colocated inference is not supported for Megatron generation backends. "
- "Please use vLLM backend for generation."
- )
+ else:
+ # Non-colocated inference for Megatron is now supported via MegatronGeneration.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/algorithms/grpo.py` around lines 619 - 651, The new non-colocated
Megatron initialization branch (uses colocated_inference,
init_megatron_generation, init_policy, and writes to worker_init_timing_metrics)
is unreachable because an earlier guard still asserts backend != "megatron";
remove or relax that guard so backend == "megatron" is allowed for non-colocated
clusters (or change the assertion to validate only incompatible combos),
ensuring the code path that submits init_megatron_generation and init_policy via
ThreadPoolExecutor can execute.
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. |
There was a problem hiding this comment.
Update copyright year to 2026.
This new file should use the current year per repo rules.
Proposed fix
-# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
+# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.As per coding guidelines: "{src/,examples/,nemo_rl/**}/*.{py,sh}: Add the NVIDIA copyright header (with current year) to all Python files and shell scripts, excluding tests".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/models/generation/megatron/__init__.py` around lines 1 - 13, The
copyright header in the nemo_rl.models.generation.megatron.__init__ module still
shows 2025; update the year to 2026 in the file header so it matches repo
policy. Edit the top-of-file license block in __init__.py within the megatron
package to replace "2025" with "2026" and ensure the rest of the Apache-2.0
header text remains unchanged.
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. |
There was a problem hiding this comment.
Update copyright year to 2026.
This new file should use the current year per repo rules.
Proposed fix
-# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
+# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.As per coding guidelines: "{src/,examples/,nemo_rl/**}/*.{py,sh}: Add the NVIDIA copyright header (with current year) to all Python files and shell scripts, excluding tests".
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/models/generation/megatron/megatron_generation.py` around lines 1 -
13, Update the copyright header in the megatron_generation.py module to use the
current year 2026 (replace "2025" with "2026") so the top-of-file license block
in nemo_rl.models.generation.megatron.megatron_generation.py reflects the repo
rule; ensure the full NVIDIA copyright header and Apache License block remain
intact except for the year change.
| # Apply generation settings | ||
| if config["generation"][f"backend"] == "megatron": | ||
| _apply_cuda_graph_and_rng_tracker_config(model_cfg, config) |
There was a problem hiding this comment.
Fix stray f-string key access.
config["generation"][f"backend"] triggers Ruff F541 and is unnecessary. Use a plain string key.
Proposed fix
- if config["generation"][f"backend"] == "megatron":
+ if config["generation"]["backend"] == "megatron":🧰 Tools
🪛 Ruff (0.15.1)
[error] 322-322: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/models/megatron/setup.py` around lines 321 - 323, The conditional is
using a stray f-string key access config["generation"][f"backend"] which
triggers Ruff F541; replace the f-string with a plain string key so the check
reads config["generation"]["backend"] == "megatron" (the conditional guarding
the call to _apply_cuda_graph_and_rng_tracker_config(model_cfg, config)). Ensure
you update only the key access and keep the rest of the conditional and the call
to _apply_cuda_graph_and_rng_tracker_config unchanged.
| logprob_len = len(logprobs) | ||
| logprobs_padded[i, 1 : logprob_len + 1] = torch.tensor( | ||
| out["logprobs"][i], | ||
| logprobs, | ||
| dtype=torch.float, | ||
| device=data["input_ids"].device, | ||
| ) |
There was a problem hiding this comment.
Off-by-one: last logprob silently dropped when sequence fills the padded buffer
logprobs_padded has shape (batch_size, max_seq_len) with max_seq_len = padded_input_length + max_gen_seq_len. When a prompt has no padding and generation reaches max_gen_seq_len, logprob_len == max_seq_len. The slice logprobs_padded[i, 1 : logprob_len + 1] tries to write max_seq_len values starting at index 1 into a tensor of size max_seq_len. PyTorch clips the slice silently to [1:max_seq_len], dropping the last logprob without any error.
🐛 Proposed fix — expand the logprobs buffer by 1
logprobs_padded = torch.zeros(
- (batch_size, max_seq_len),
+ (batch_size, max_seq_len + 1),
dtype=torch.float,
device=data["input_ids"].device,
)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/models/policy/workers/megatron_policy_worker.py` around lines 1099 -
1104, The assignment to logprobs_padded is off-by-one and silently drops the
final logprob when logprob_len equals max_seq_len; update the buffer or indexing
in megatron_policy_worker.py so the full range is written. Specifically, adjust
the allocation or shape of logprobs_padded (or the slice indices around the
assignment to logprobs_padded[i, 1 : logprob_len + 1]) so there is room for
logprob_len values starting at index 1 (e.g., expand logprobs_padded by one
element or shift the start index to 0), ensuring the entire
torch.tensor(logprobs, ...) is stored without clipping.
| def _start_inference_loop_thread(self): | ||
| """Start a background thread with a persistent event loop for inference. | ||
|
|
||
| This thread runs the event loop that hosts the engine loop task. | ||
| The loop runs forever until explicitly stopped. | ||
| """ | ||
| import threading | ||
|
|
||
| def run_loop(): | ||
| asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) | ||
| self._inference_loop = asyncio.new_event_loop() | ||
| asyncio.set_event_loop(self._inference_loop) | ||
| # Run forever - the engine loop task will run in this loop | ||
| self._inference_loop.run_forever() | ||
|
|
||
| self._inference_thread = threading.Thread(target=run_loop, daemon=True) | ||
| self._inference_thread.start() | ||
|
|
||
| # Wait for the loop to be created | ||
| while self._inference_loop is None: | ||
| time.sleep(0.001) |
There was a problem hiding this comment.
Busy-wait loop in _start_inference_loop_thread has no timeout — hangs forever on thread failure
If the background thread raises any exception before self._inference_loop = asyncio.new_event_loop() is assigned (e.g., policy or resource error), the main thread loops forever at line 1136 with no escape path.
🐛 Proposed fix — use an Event for safe synchronization
def _start_inference_loop_thread(self):
import threading
+ loop_ready = threading.Event()
+
def run_loop():
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
self._inference_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._inference_loop)
+ loop_ready.set()
self._inference_loop.run_forever()
self._inference_thread = threading.Thread(target=run_loop, daemon=True)
self._inference_thread.start()
- # Wait for the loop to be created
- while self._inference_loop is None:
- time.sleep(0.001)
+ if not loop_ready.wait(timeout=30):
+ raise RuntimeError("Inference event loop did not start within 30 seconds")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/models/policy/workers/megatron_policy_worker.py` around lines 1117 -
1137, The busy-wait in _start_inference_loop_thread can hang forever if the
background thread fails before assigning self._inference_loop; replace the
spin-wait with a threading.Event (e.g., loop_ready_event) that the thread sets
after creating the loop in run_loop, wait on that event with a reasonable
timeout, and after the wait check thread.is_alive() and/or an exception
container set by the thread (store any exception in a shared attribute) to raise
a clear error instead of looping; update references to self._inference_loop,
self._inference_thread and run_loop to use the event and the exception
propagation so startup either succeeds or fails fast.
| This is called once during the first generate() call to initialize | ||
| the persistent inference infrastructure. | ||
| """ | ||
| import concurrent.futures |
There was a problem hiding this comment.
Remove unused import concurrent.futures
concurrent.futures is imported inside _run_async_coordinator_start but is never referenced in the method body. This is a dead import.
🐛 Proposed fix
def _run_async_coordinator_start(self, coordinator_port: int):
- import concurrent.futures
-
# Start the background thread with the event loop if not already running📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import concurrent.futures | |
| def _run_async_coordinator_start(self, coordinator_port: int): | |
| # Start the background thread with the event loop if not already running |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/models/policy/workers/megatron_policy_worker.py` at line 1145, The
import concurrent.futures at the top of megatron_policy_worker.py is unused;
remove the dead import statement so there’s no unused dependency. Specifically
delete the import line referencing concurrent.futures (it was intended for use
in _run_async_coordinator_start but that function body does not reference it),
leaving other imports intact.
| for request_id, (prompt_tokens, prompt_len) in enumerate( | ||
| zip(prompt_tokens_tensor, prompt_lengths_tensor, strict=True) |
There was a problem hiding this comment.
Rename unused loop variable request_id to _request_id (B007)
request_id from enumerate() is never used in the loop body.
🐛 Proposed fix
- for request_id, (prompt_tokens, prompt_len) in enumerate(
+ for _request_id, (prompt_tokens, prompt_len) in enumerate(
zip(prompt_tokens_tensor, prompt_lengths_tensor, strict=True)
):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for request_id, (prompt_tokens, prompt_len) in enumerate( | |
| zip(prompt_tokens_tensor, prompt_lengths_tensor, strict=True) | |
| for _request_id, (prompt_tokens, prompt_len) in enumerate( | |
| zip(prompt_tokens_tensor, prompt_lengths_tensor, strict=True) | |
| ): |
🧰 Tools
🪛 Ruff (0.15.1)
[warning] 1231-1231: Loop control variable request_id not used within loop body
Rename unused request_id to _request_id
(B007)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@nemo_rl/models/policy/workers/megatron_policy_worker.py` around lines 1231 -
1232, In the for-loop inside megatron_policy_worker.py where you call
enumerate(zip(prompt_tokens_tensor, prompt_lengths_tensor, strict=True)), the
enumerated variable request_id is never used; rename it to _request_id to
satisfy the B007 lint rule and indicate the variable is intentionally unused
(leave the rest of the loop unchanged, e.g., the tuple unpacking of
(prompt_tokens, prompt_len) and loop body).
| from vllm.distributed.device_communicators.pynccl import PyNcclCommunicator | ||
| from vllm.distributed.utils import StatelessProcessGroup | ||
|
|
||
| # Offset rank by train_world_size so inference workers get unique global ranks | ||
| rank = train_world_size + self.rank | ||
| pg = StatelessProcessGroup.create( | ||
| host=ip, port=port, rank=rank, world_size=world_size | ||
| ) | ||
| device = torch.cuda.current_device() | ||
| self.model_update_group = PyNcclCommunicator(pg, device=device) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check dependency declarations
echo "=== Checking for dependency files ==="
find . -maxdepth 3 -type f \( -name "setup.py" -o -name "pyproject.toml" -o -name "requirements*.txt" \) | head -20
echo ""
echo "=== Searching for vllm mentions in dependency files ==="
fd -e toml -e py -e txt "setup|pyproject|requirements" | xargs grep -l "vllm" 2>/dev/null || echo "No direct vllm dependency found in standard files"
echo ""
echo "=== All vllm imports in Python files ==="
rg "from vllm|import vllm" --type py -n | head -30Repository: NVIDIA-NeMo/RL
Length of output: 3695
🏁 Script executed:
#!/bin/bash
echo "=== Check vllm in pyproject.toml ==="
rg -A 2 -B 2 "vllm" pyproject.toml
echo ""
echo "=== Check context around lines 1440-1470 in megatron_policy_worker.py ==="
sed -n '1440,1470p' nemo_rl/models/policy/workers/megatron_policy_worker.py
echo ""
echo "=== Check if init_collective_as_inference has try-except guards ==="
sed -n '1430,1460p' nemo_rl/models/policy/workers/megatron_policy_worker.pyRepository: NVIDIA-NeMo/RL
Length of output: 4610
🏁 Script executed:
#!/bin/bash
echo "=== Check where init_collective_as_inference is called ==="
rg "init_collective_as_inference" --type py -B 3 -A 3
echo ""
echo "=== Check class definition and context in megatron_policy_worker.py ==="
head -100 nemo_rl/models/policy/workers/megatron_policy_worker.py | grep -A 10 "^class"
echo ""
echo "=== Check if megatron_policy_worker is used in vllm context ==="
rg "MegatronPolicyWorker|megatron_policy_worker" --type py -l | head -10
</parameter>Repository: NVIDIA-NeMo/RL
Length of output: 2853
🏁 Script executed:
#!/bin/bash
echo "=== Check megatron_generation.py context where init_collective_as_inference is called ==="
rg -B 15 "init_collective_as_inference" nemo_rl/models/generation/megatron/megatron_generation.py | head -50
echo ""
echo "=== Check class definition of MegatronPolicyWorker ==="
sed -n '1,100p' nemo_rl/models/policy/workers/megatron_policy_worker.py | tail -50
echo ""
echo "=== Check if there are any conditional checks or guards before calling init_collective_as_inference ==="
rg -B 10 "init_collective_as_inference" nemo_rl/models/generation/megatron/megatron_generation.py | grep -E "try|except|if|vllm" -iRepository: NVIDIA-NeMo/RL
Length of output: 3386
🏁 Script executed:
#!/bin/bash
echo "=== Check when init_collective is called in megatron_generation.py ==="
rg -B 5 -A 20 "def init_collective" nemo_rl/models/generation/megatron/megatron_generation.py
echo ""
echo "=== Check if there's any conditional logic around weight synchronization setup ==="
rg "init_collective|weight.*sync|collective" nemo_rl/models/generation/megatron/megatron_generation.py -n | head -20
echo ""
echo "=== Check if vllm is guarded with try-except in vllm_worker.py for comparison ==="
sed -n '325,335p' nemo_rl/models/generation/vllm/vllm_worker.py
sed -n '560,570p' nemo_rl/models/generation/vllm/vllm_worker.pyRepository: NVIDIA-NeMo/RL
Length of output: 3114
🏁 Script executed:
#!/bin/bash
echo "=== Check if MegatronGeneration is only used with vllm or also standalone ==="
rg "MegatronGeneration|megatron_generation" --type py -l | grep -v "\.pyc" | head -15
echo ""
echo "=== Check megatron_generation.py class definition and imports ==="
head -50 nemo_rl/models/generation/megatron/megatron_generation.py
echo ""
echo "=== Verify if init_collective is conditionally guarded in megatron_generation.py ==="
rg -B 3 "def init_collective" nemo_rl/models/generation/megatron/megatron_generation.pyRepository: NVIDIA-NeMo/RL
Length of output: 2651
🏁 Script executed:
#!/bin/bash
echo "=== Check how MegatronGeneration is instantiated and when ==="
rg "MegatronGeneration\(" --type py -B 5 -A 5
echo ""
echo "=== Check if there's documentation about using Megatron without vllm ==="
find . -name "*.md" -o -name "*.rst" | xargs grep -l "megatron\|generation" 2>/dev/null | head -5
echo ""
echo "=== Check if init_collective_as_inference is expected to be optional ==="
sed -n '1420,1465p' nemo_rl/models/policy/workers/megatron_policy_worker.py
</parameter>Repository: NVIDIA-NeMo/RL
Length of output: 4563
🏁 Script executed:
#!/bin/bash
echo "=== Check what the actual workflow is - when is init_collective called? ==="
rg "init_collective" nemo_rl/algorithms/grpo.py -B 5 -A 5
echo ""
echo "=== Check if there's a path where MegatronGeneration is used WITHOUT calling init_collective ==="
sed -n '1,150p' nemo_rl/models/generation/megatron/megatron_generation.py | tail -80
echo ""
echo "=== Look at the full imports in megatron_policy_worker.py to see if vllm is imported elsewhere ==="
head -80 nemo_rl/models/policy/workers/megatron_policy_worker.py | grep -E "^import|^from"Repository: NVIDIA-NeMo/RL
Length of output: 6007
🏁 Script executed:
#!/bin/bash
echo "=== Confirm: are these vllm utilities or Megatron utilities? Check if Megatron has native equivalents ==="
rg "StatelessProcessGroup|PyNcclCommunicator" --type py
echo ""
echo "=== Check if there's a Megatron native NCCL group setup pattern elsewhere ==="
rg "process.*group|NCCL|collective" nemo_rl/models/policy/workers/megatron_policy_worker.py -n | head -15Repository: NVIDIA-NeMo/RL
Length of output: 2707
🏁 Script executed:
#!/bin/bash
echo "=== Check base_policy_worker.py to see the correct pattern ==="
rg -B 10 -A 10 "init_collective" nemo_rl/models/policy/workers/base_policy_worker.py | head -50
echo ""
echo "=== Check the native StatelessProcessGroup implementation ==="
head -50 nemo_rl/distributed/stateless_process_group.py
echo ""
echo "=== Check if MegatronPolicyWorker extends base_policy_worker ==="
rg "class MegatronPolicyWorker" nemo_rl/models/policy/workers/megatron_policy_worker.py -A 3Repository: NVIDIA-NeMo/RL
Length of output: 3043
🏁 Script executed:
#!/bin/bash
echo "=== See full init_collective in base_policy_worker.py ==="
rg -A 25 "def init_collective" nemo_rl/models/policy/workers/base_policy_worker.py
echo ""
echo "=== Check what StatelessProcessGroup is imported/used in base_policy_worker ==="
rg "StatelessProcessGroup" nemo_rl/models/policy/workers/base_policy_worker.py -B 2 -A 2
echo ""
echo "=== See full init_collective_as_inference in base_policy_worker (if it exists) ==="
rg -A 25 "def init_collective_as_inference" nemo_rl/models/policy/workers/base_policy_worker.pyRepository: NVIDIA-NeMo/RL
Length of output: 1651
Replace vllm distributed imports with native NCCL utilities
init_collective_as_inference imports PyNcclCommunicator and StatelessProcessGroup from vllm, causing ModuleNotFoundError in Megatron-only deployments without vllm. The codebase already provides native equivalents: use nemo_rl.distributed.stateless_process_group.StatelessProcessGroup and the native NCCL communicator setup (matching the pattern in base_policy_worker.py's init_collective). Alternatively, guard the vllm imports with try-except if vllm-specific behavior is intentional.
891b3c2 to
c19ee75
Compare
What does this PR do ?
Add a one line overview of what this PR aims to accomplish.
Issues
List issues that this PR closes (syntax):
Usage
# Add a code snippet demonstrating how to use thisBefore your PR is "Ready for review"
Pre checks:
Additional Information
Summary by CodeRabbit
Release Notes
New Features
Improvements
Backward Compatibility