Add Python distributed L4 to L3 dispatch#711
Add Python distributed L4 to L3 dispatch#711PKUZHOU wants to merge 6 commits intohw-native-sys:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a distributed L4 to L3 dispatch system using gRPC and protobuf, enabling cross-host task execution. It introduces a callable catalog for remote function registration, a long-running L3 daemon with a fork-safe backend process, and a mailbox shim thread to integrate remote workers into the existing C++ scheduler. Feedback highlights critical issues regarding the transmission of raw memory pointers across host boundaries, which would lead to segmentation faults. Other recommendations include removing redundant logic in the catalog registration, ensuring consistent use of cloudpickle for deserialization, and improving error handling for unexpected backend process terminations.
| tag = args.tag(i) | ||
| tensors.append( | ||
| dispatch_pb2.ContinuousTensorRef( | ||
| data=int(tensor.data), |
There was a problem hiding this comment.
Sending raw memory pointers (tensor.data) across host boundaries in a distributed system is incorrect. These addresses are local to the L4 process and will be invalid on the remote L3 node, leading to segmentation faults if accessed. A position-independent mechanism, such as handles or offsets into a shared tensor pool, should be used instead.
References
- To ensure shared memory is position-independent for future cross-process/cross-address-space communication, avoid storing absolute pointers (to stack or heap) within shared memory structures. Use relative offsets or process-local handles instead.
| shape = tuple(int(x) for x in ref.shape) | ||
| dtype = DataType(int(ref.dtype)) | ||
| tag = TensorArgType(int(ref.tag)) | ||
| args.add_tensor(ContinuousTensor.make(int(ref.data), shape, dtype), tag) |
There was a problem hiding this comment.
Reconstructing a ContinuousTensor from a raw pointer received over the network is dangerous. In a distributed environment, ref.data is a pointer from a different address space (and likely a different host), making it invalid for local use.
References
- To ensure shared memory is position-independent for future cross-process/cross-address-space communication, avoid storing absolute pointers (to stack or heap) within shared memory structures. Use relative offsets or process-local handles instead.
| if callable_id is None: | ||
| self._next_id = max(self._next_id, cid + 1) | ||
| else: | ||
| self._next_id = max(self._next_id, cid + 1) |
There was a problem hiding this comment.
The self._next_id update is redundant here because self.install_from_payload(cid, version, payload) (called on line 39) already performs the exact same max(self._next_id, cid + 1) update on line 67. Additionally, the if/else branches are identical.
References
- Reuse existing helper functions or methods instead of duplicating their logic. This improves consistency, maintainability, and reduces the chance of introducing bugs.
|
|
||
| def _loads_with_allowlist(payload: bytes, allowed_modules: Optional[Tuple[str, ...]]) -> Callable: | ||
| if allowed_modules is None: | ||
| return pickle.loads(payload) |
There was a problem hiding this comment.
Since cloudpickle (aliased as _pickle_impl) is used for serialization in register, it should also be used for deserialization here to ensure compatibility, especially for lambdas and closures which standard pickle cannot handle.
| return pickle.loads(payload) | |
| return _pickle_impl.loads(payload) |
| with self._backend_lock: | ||
| self._backend_conn.send(msg) | ||
| ok, payload = self._backend_conn.recv() |
There was a problem hiding this comment.
self._backend_conn.recv() will raise an EOFError if the backend process terminates unexpectedly (e.g., due to a crash). This should be handled to provide a more descriptive error message to the RPC client rather than letting the gRPC handler thread fail with an unhandled exception.
| with self._backend_lock: | |
| self._backend_conn.send(msg) | |
| ok, payload = self._backend_conn.recv() | |
| try: | |
| with self._backend_lock: | |
| self._backend_conn.send(msg) | |
| ok, payload = self._backend_conn.recv() | |
| except EOFError: | |
| raise RuntimeError("L3 daemon backend process terminated unexpectedly") from None |
| def sleep_poll_interval() -> None: | ||
| time.sleep(0.0005) |
|
counter = Counter()
def l3_sub(task_args):
counter.add(task_args.scalar(0))
...
print(f"remote counter={counter.value}")
return 0 if counter.value == 7 else 1But Meanwhile, the current dispatch response only reports success/failure: inner.run(orch_fn, args, cfg)
return dispatch_pb2.DispatchResp(task_id=req.task_id, error_code=0), innerand The tests ( The example should either avoid expecting L4-local closure state to change, or explicitly use/document an external side effect until |
Summary
Tests
Notes