Skip to content

Add Python distributed L4 to L3 dispatch#711

Open
PKUZHOU wants to merge 6 commits intohw-native-sys:mainfrom
PKUZHOU:feat/l4-l3-distributed-dispatch
Open

Add Python distributed L4 to L3 dispatch#711
PKUZHOU wants to merge 6 commits intohw-native-sys:mainfrom
PKUZHOU:feat/l4-l3-distributed-dispatch

Conversation

@PKUZHOU
Copy link
Copy Markdown
Contributor

@PKUZHOU PKUZHOU commented May 6, 2026

Summary

  • add Python-first gRPC/protobuf distributed dispatch package for L4 -> remote L3
  • integrate Worker.add_remote_worker() through a local PROCESS mailbox shim without C++/nanobind changes
  • add callable catalog, L3 daemon backend process, heartbeat, tensor-pool surface, examples, and docs

Tests

  • python -m pytest tests/ut/py/test_distributed tests/ut/py/test_worker/test_l4_recursive.py -q
  • python -m compileall -q python/simpler/distributed tests/ut/py/test_distributed examples/distributed/l4_l3_remote
  • git diff --check

Notes

  • current e2e remote dispatch covers scalar TaskArgs and callable execution
  • full remote tensor materialization/output write-back remains future work

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a distributed L4 to L3 dispatch system using gRPC and protobuf, enabling cross-host task execution. It introduces a callable catalog for remote function registration, a long-running L3 daemon with a fork-safe backend process, and a mailbox shim thread to integrate remote workers into the existing C++ scheduler. Feedback highlights critical issues regarding the transmission of raw memory pointers across host boundaries, which would lead to segmentation faults. Other recommendations include removing redundant logic in the catalog registration, ensuring consistent use of cloudpickle for deserialization, and improving error handling for unexpected backend process terminations.

tag = args.tag(i)
tensors.append(
dispatch_pb2.ContinuousTensorRef(
data=int(tensor.data),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Sending raw memory pointers (tensor.data) across host boundaries in a distributed system is incorrect. These addresses are local to the L4 process and will be invalid on the remote L3 node, leading to segmentation faults if accessed. A position-independent mechanism, such as handles or offsets into a shared tensor pool, should be used instead.

References
  1. To ensure shared memory is position-independent for future cross-process/cross-address-space communication, avoid storing absolute pointers (to stack or heap) within shared memory structures. Use relative offsets or process-local handles instead.

shape = tuple(int(x) for x in ref.shape)
dtype = DataType(int(ref.dtype))
tag = TensorArgType(int(ref.tag))
args.add_tensor(ContinuousTensor.make(int(ref.data), shape, dtype), tag)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Reconstructing a ContinuousTensor from a raw pointer received over the network is dangerous. In a distributed environment, ref.data is a pointer from a different address space (and likely a different host), making it invalid for local use.

References
  1. To ensure shared memory is position-independent for future cross-process/cross-address-space communication, avoid storing absolute pointers (to stack or heap) within shared memory structures. Use relative offsets or process-local handles instead.

Comment on lines +40 to +43
if callable_id is None:
self._next_id = max(self._next_id, cid + 1)
else:
self._next_id = max(self._next_id, cid + 1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The self._next_id update is redundant here because self.install_from_payload(cid, version, payload) (called on line 39) already performs the exact same max(self._next_id, cid + 1) update on line 67. Additionally, the if/else branches are identical.

References
  1. Reuse existing helper functions or methods instead of duplicating their logic. This improves consistency, maintainability, and reduces the chance of introducing bugs.


def _loads_with_allowlist(payload: bytes, allowed_modules: Optional[Tuple[str, ...]]) -> Callable:
if allowed_modules is None:
return pickle.loads(payload)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since cloudpickle (aliased as _pickle_impl) is used for serialization in register, it should also be used for deserialization here to ensure compatibility, especially for lambdas and closures which standard pickle cannot handle.

Suggested change
return pickle.loads(payload)
return _pickle_impl.loads(payload)

Comment on lines +104 to +106
with self._backend_lock:
self._backend_conn.send(msg)
ok, payload = self._backend_conn.recv()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

self._backend_conn.recv() will raise an EOFError if the backend process terminates unexpectedly (e.g., due to a crash). This should be handled to provide a more descriptive error message to the RPC client rather than letting the gRPC handler thread fail with an unhandled exception.

Suggested change
with self._backend_lock:
self._backend_conn.send(msg)
ok, payload = self._backend_conn.recv()
try:
with self._backend_lock:
self._backend_conn.send(msg)
ok, payload = self._backend_conn.recv()
except EOFError:
raise RuntimeError("L3 daemon backend process terminated unexpectedly") from None

Comment on lines +130 to +131
def sleep_poll_interval() -> None:
time.sleep(0.0005)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function sleep_poll_interval appears to be unused in the current implementation.

References
  1. Remove unused exported variables to prevent them from causing errors if their values become incorrect in different environments.

@jvjhfhg
Copy link
Copy Markdown
Collaborator

jvjhfhg commented May 7, 2026

examples/distributed/l4_l3_remote/l4_master.py seems to imply that remote execution can mutate L4-local Python state:

counter = Counter()

def l3_sub(task_args):
    counter.add(task_args.scalar(0))

...
print(f"remote counter={counter.value}")
return 0 if counter.value == 7 else 1

But l3_sub is pushed to the L3 daemon through the callable catalog, so the captured counter is serialized with the closure. The remote backend/sub-worker mutates its own deserialized/forked copy, not the original counter in the L4 process.

Meanwhile, the current dispatch response only reports success/failure:

inner.run(orch_fn, args, cfg)
return dispatch_pb2.DispatchResp(task_id=req.task_id, error_code=0), inner

and RemoteWorkerProxy.dispatch() only checks error_code; it does not read or materialize DispatchResp.output_tensors.

The tests (test_l4_l3_remote.py) avoid this by using externally visible state such as a file/shared-memory counter, which verifies that remote execution happened, but it does not demonstrate a real distributed result-return path. For a cross-host example, this feels misleading.

The example should either avoid expecting L4-local closure state to change, or explicitly use/document an external side effect until DispatchResp.output_tensors or another result-return mechanism is implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants