Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9a06387
feat(callable): per-callable_id orch SO dispatch on AICPU (a2a3 trb)
poursoul Apr 29, 2026
d931ec2
Add: prepare_callable / run_prepared / unregister_callable C ABI
poursoul Apr 29, 2026
97e56d1
feat(callable): expose prepare/run_prepared/unregister via ChipWorker…
poursoul Apr 29, 2026
6621829
feat(callable): port Stage 1 ABI surface to remaining variants
poursoul Apr 29, 2026
8690d32
feat(callable): Stage 3 — mailbox cid protocol + L3+ lazy-register + …
poursoul May 6, 2026
da1a0eb
feat(callable): Stage 4 — unify L2 API on register + run(cid)
poursoul May 6, 2026
67e12e7
Add: aicpu_dlopen_count getter for callable registration verification
poursoul May 6, 2026
ee90d6a
fix(pr): address review and CI issues for #710
poursoul May 6, 2026
9a80d71
refactor(callable): unify MAX_REGISTERED_CALLABLE_IDS source of truth
poursoul May 6, 2026
8667074
fix(callable): chip_process_loop falls back to legacy run when varian…
poursoul May 6, 2026
642e95a
fix(callable): scope orch SO file name by callable_id (a2a3 507018)
poursoul May 6, 2026
5f264ef
fix(pr): resolve CI failures for #710
poursoul May 7, 2026
80f2be5
feat(callable): Phase 0 — add active_callable_id_ to all runtimes
poursoul May 7, 2026
22ea75d
feat(callable): Phase 1 — port a5/trb to per-cid orch SO table
poursoul May 7, 2026
e5f6656
feat(callable): Phase 1 — mirror prepared_callable ST test to a5/trb
poursoul May 7, 2026
3bedebc
feat(callable): Phase 2 — host_build_graph prepare/run_prepared with …
poursoul May 8, 2026
b89c391
feat(callable): Phase 2 — add prepared_callable ST tests for hbg vari…
poursoul May 8, 2026
c0ff9f2
refactor(callable): Phase 3 — drop RUNTIME_HAS_CALLABLE_ID and RUNTIM…
poursoul May 8, 2026
e15489d
refactor(callable): Phase 3 — drop Python fallback to legacy run()
poursoul May 8, 2026
b976a90
refactor(callable): Phase 4 — drop run_runtime / init_runtime_impl le…
poursoul May 8, 2026
8e7b291
refactor(callable): Phase 4 — drop has_new_orch_so_ and AICPU legacy …
poursoul May 8, 2026
1293581
fix(pr): migrate vector_add and child_memory examples to register/cid…
poursoul May 8, 2026
a1bd0ff
fix(pr): silence ruff PLR0915 on _chip_process_loop_with_bootstrap
poursoul May 8, 2026
55a8c7e
fix(pr): make a5 prepared_callable test mirror a2a3, add a5 kernel-bi…
poursoul May 8, 2026
f5911a7
fix(callable): preserve child_memory flag in mailbox args deserializa…
poursoul May 8, 2026
faeedd4
refactor(callable): chip child loops use raw blob path; consolidate a…
poursoul May 9, 2026
afae373
fix(pr): plug callable SO leaks and drop in-comment doc anchors
poursoul May 9, 2026
aa38eb7
fix(pr): align stragglers with cid API contract for #710
poursoul May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,11 +920,22 @@ def st_worker(request, st_platform, device_pool, _l2_worker_pool):

# Register SubCallable entries from cls.CALLABLE
sub_ids = {}
chip_cids = {}
for entry in cls.CALLABLE.get("callables", []):
if "callable" in entry:
cid = w.register(entry["callable"])
sub_ids[entry["name"]] = cid
elif "orchestration" in entry:
from simpler_setup.scene_test import _compile_chip_callable_from_spec # noqa: PLC0415

name = entry["name"]
cache_key = (cls.__qualname__, name, st_platform, runtime)
chip = _compile_chip_callable_from_spec(entry, st_platform, runtime, cache_key)
cid = w.register(chip)
chip_cids[name] = cid
chip_cids[f"{name}_sig"] = entry["orchestration"].get("signature", [])
cls._st_sub_ids = sub_ids
cls._st_chip_cids = chip_cids

w.init()
yield w
Expand Down
11 changes: 9 additions & 2 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,21 @@ worker.init(host_path=str(binaries.host_path),
aicore_path=str(binaries.aicore_path))
worker.set_device(device_id=0)

# Execute callable on device
worker.run(chip_callable, orch_args, block_dim=24)
# Register the ChipCallable to obtain a callable_id
cid = worker.register(chip_callable)

# Execute the registered callable on device
worker.run(cid, orch_args, block_dim=24)

# Cleanup
worker.reset_device()
worker.finalize()
```

`ChipWorker` follows the same `register → run(cid)` contract as
`Worker(level=2)`; reach for the high-level `Worker` first and use
`ChipWorker` only when a low-level handle is required.

## Configuration

### Compile-time Configuration (Runtime Limits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def run(
chip_bootstrap_configs=cfgs,
build=build,
)
chip_cid = worker.register(chip_callable)
try:
worker.init()
contexts: list[ChipContext] = worker.chip_contexts
Expand All @@ -157,7 +158,7 @@ def orch_fn(orch, _args, cfg):
TensorArgType.INPUT,
)
args.add_scalar(ctx.device_ctx)
orch.submit_next_level(chip_callable, args, cfg, worker=rank)
orch.submit_next_level(chip_cid, args, cfg, worker=rank)

worker.run(orch_fn, args=None, config=CallConfig())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def run(
chip_bootstrap_configs=cfgs,
build=build,
)
chip_cid = worker.register(chip_callable)
try:
worker.init()
contexts: list[ChipContext] = worker.chip_contexts
Expand Down Expand Up @@ -187,7 +188,7 @@ def orch_fn(orch, _args, cfg):
TensorArgType.INPUT,
)
args.add_scalar(ctx.device_ctx)
orch.submit_next_level(chip_callable, args, cfg, worker=rank)
orch.submit_next_level(chip_cid, args, cfg, worker=rank)

worker.run(orch_fn, args=None, config=CallConfig())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def run(
chip_bootstrap_configs=cfgs,
build=build,
)
chip_cid = worker.register(chip_callable)
try:
worker.init()
contexts: list[ChipContext] = worker.chip_contexts
Expand All @@ -191,7 +192,7 @@ def orch_fn(orch, _args, cfg):
args.add_tensor(make_tensor_arg(out[rank]), TensorArgType.OUTPUT_EXISTING)
args.add_tensor(make_tensor_arg(result[rank]), TensorArgType.OUTPUT_EXISTING)
args.add_scalar(ctx.device_ctx)
orch.submit_next_level(chip_callable, args, cfg, worker=rank)
orch.submit_next_level(chip_cid, args, cfg, worker=rank)

worker.run(orch_fn, args=None, config=CallConfig())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def run(platform: str = "a5", device_ids: list[int] | None = None, pto_isa_commi
num_sub_workers=0,
chip_bootstrap_configs=cfgs,
)
chip_cid = worker.register(chip_callable)
try:
worker.init()
contexts: list[ChipContext] = worker.chip_contexts
Expand All @@ -151,7 +152,7 @@ def orch_fn(orch, _args, cfg):
TensorArgType.INPUT,
)
args.add_scalar(ctx.device_ctx)
orch.submit_next_level(chip_callable, args, cfg, worker=rank)
orch.submit_next_level(chip_cid, args, cfg, worker=rank)

worker.run(orch_fn, args=None, config=CallConfig())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def run(
chip_bootstrap_configs=cfgs,
build=build,
)
chip_cid = worker.register(chip_callable)
try:
worker.init()
contexts: list[ChipContext] = worker.chip_contexts
Expand Down Expand Up @@ -187,7 +188,7 @@ def orch_fn(orch, _args, cfg):
TensorArgType.INPUT,
)
args.add_scalar(ctx.device_ctx)
orch.submit_next_level(chip_callable, args, cfg, worker=rank)
orch.submit_next_level(chip_cid, args, cfg, worker=rank)

worker.run(orch_fn, args=None, config=CallConfig())

Expand Down
11 changes: 9 additions & 2 deletions examples/workers/l2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ worker = Worker(
)
worker.init() # load host.so + aicpu.so + aicore.o, set device
try:
# ... allocate device buffers, build ChipCallable, run ...
worker.run(chip_callable, task_args, call_config)
# ... allocate device buffers, build ChipCallable ...
cid = worker.register(chip_callable) # one-shot: cid is reused across runs
worker.run(cid, task_args, call_config)
finally:
worker.close() # release ACL resources and device
```

`register()` is the only way to obtain a `cid`; `worker.run` always takes
that int, never the raw `ChipCallable`. A cid stays valid for the
lifetime of the worker, so you register once and reuse it across runs —
this is also why ST cases cache the cid on the test class (see
`_st_l2_cid` in `simpler_setup/scene_test.py`).

The `try/finally` is important — if anything between `init()` and `close()`
raises, you still want the device released. The
[L2 conftest leak issue](https://github.com/hw-native-sys/simpler/issues/604)
Expand Down
2 changes: 1 addition & 1 deletion examples/workers/l2/vector_add/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ args.add_tensor(ContinuousTensor.make(dev_a, shape, DataType.FLOAT32))
args.add_tensor(ContinuousTensor.make(dev_b, shape, DataType.FLOAT32))
args.add_tensor(ContinuousTensor.make(dev_out, shape, DataType.FLOAT32))

worker.run(chip_callable, args, CallConfig())
worker.run(chip_cid, args, CallConfig()) # chip_cid = worker.register(chip_callable) before init()
```

The tensor order must match `signature` order on the `ChipCallable`. `run()`
Expand Down
11 changes: 7 additions & 4 deletions examples/workers/l2/vector_add/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
host arrays ──[worker.malloc + copy_to]──► device buffers
worker.run(chip_callable, task_args, cfg)
chip_cid = worker.register(chip_callable) # before init()
worker.run(chip_cid, task_args, cfg)
device result ──[worker.copy_from]──► host array ──[torch compare]

Expand Down Expand Up @@ -126,7 +127,7 @@ def build_chip_callable(platform: str) -> ChipCallable:
)


def _run(worker: Worker, chip_callable: ChipCallable) -> None:
def _run(worker: Worker, chip_cid: int) -> None:
"""Allocate device memory, copy inputs, execute, copy outputs back, verify."""
# --- 1. Prepare host arrays ---
torch.manual_seed(42)
Expand Down Expand Up @@ -154,7 +155,7 @@ def _run(worker: Worker, chip_callable: ChipCallable) -> None:
# --- 4. Run. CallConfig() defaults are fine for this kernel. ---
config = CallConfig()
print("[vector_add] running on device...")
worker.run(chip_callable, args, config)
worker.run(chip_cid, args, config)

# --- 5. D2H copy back + verify ---
worker.copy_from(host_out.data_ptr(), dev_out, NBYTES)
Expand Down Expand Up @@ -183,10 +184,12 @@ def run(platform: str, device_id: int) -> int:
chip_callable = build_chip_callable(platform)
print(f"[vector_add] compiled. binary_size={chip_callable.binary_size} bytes")

chip_cid = worker.register(chip_callable)

print(f"[vector_add] init worker (device={device_id})...")
worker.init()
try:
_run(worker, chip_callable)
_run(worker, chip_cid)
finally:
worker.close()
return 0
Expand Down
3 changes: 2 additions & 1 deletion examples/workers/l3/allreduce_distributed/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def run(device_ids: list[int]) -> int:
num_sub_workers=0,
chip_bootstrap_configs=cfgs,
)
chip_cid = worker.register(chip_callable)

try:
print("[allreduce] init worker (forks chip children + bootstraps HCCL)...")
Expand Down Expand Up @@ -227,7 +228,7 @@ def orch_fn(orch, _args, cfg):
)
chip_args.add_scalar(ctx.nranks)
chip_args.add_scalar(ctx.device_ctx)
orch.submit_next_level(chip_callable, chip_args, cfg, worker=i)
orch.submit_next_level(chip_cid, chip_args, cfg, worker=i)

print("[allreduce] running 2-chip allreduce DAG...")
worker.run(orch_fn, args=None, config=CallConfig())
Expand Down
3 changes: 2 additions & 1 deletion examples/workers/l3/child_memory/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def run(platform: str, device_id: int) -> int:

print(f"[child_memory] compiling kernels for {platform}...")
chip_callable = build_chip_callable(platform)
chip_cid = worker.register(chip_callable)

print("[child_memory] init worker...")
worker.init()
Expand All @@ -172,7 +173,7 @@ def orch_fn(orch, _args, cfg):
a.add_tensor(make_tensor_arg(host_a), TensorArgType.INPUT)
a.add_tensor(w_dev, TensorArgType.INPUT)
a.add_tensor(make_tensor_arg(out), TensorArgType.OUTPUT_EXISTING)
orch.submit_next_level(chip_callable, a, cfg, worker=0)
orch.submit_next_level(chip_cid, a, cfg, worker=0)

# dev_w is reclaimed by DeviceRunner::finalize on worker.close() —
# we don't orch.free it here, that's the whole point of child_memory.
Expand Down
6 changes: 4 additions & 2 deletions examples/workers/l3/ffn_tp_parallel/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ def run(device_ids: list[int]) -> int:
num_sub_workers=0,
chip_bootstrap_configs=cfgs,
)
ffn_cid = worker.register(ffn_local_cc)
allreduce_cid = worker.register(allreduce_cc)

try:
print("[ffn_tp_parallel] init worker (forks chip children + bootstraps HCCL)...")
Expand All @@ -231,7 +233,7 @@ def orch_fn(orch, _args, cfg):
a1.add_tensor(make_tensor_arg(host_x_shards[i]), TensorArgType.INPUT)
a1.add_tensor(make_tensor_arg(host_w_shards[i]), TensorArgType.INPUT)
a1.add_tensor(make_tensor_arg(host_partial[i]), TensorArgType.OUTPUT_EXISTING)
orch.submit_next_level(ffn_local_cc, a1, cfg, worker=i)
orch.submit_next_level(ffn_cid, a1, cfg, worker=i)

# Stage 2: AIV cross-rank sum. Tagging partial_local INPUT
# with the same buffer.addr makes TensorMap auto-link this
Expand All @@ -250,7 +252,7 @@ def orch_fn(orch, _args, cfg):
)
a2.add_scalar(ctx.nranks)
a2.add_scalar(ctx.device_ctx)
orch.submit_next_level(allreduce_cc, a2, cfg, worker=i)
orch.submit_next_level(allreduce_cid, a2, cfg, worker=i)

print("[ffn_tp_parallel] running 2-chip 2-stage DAG...")
worker.run(orch_fn, args=None, config=CallConfig())
Expand Down
18 changes: 11 additions & 7 deletions examples/workers/l3/multi_chip_dispatch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ chip outputs. The smallest correct L3 program.
| ------- | ------------------------------ |
| Shared-memory tensors | `torch.randn(...).share_memory_()` — chip children see the same storage |
| `TensorArgType` tags | `INPUT` / `OUTPUT_EXISTING` drive DAG dependency tracking |
| Python SubWorker | `worker.register(fn)` **before** `init()` |
| ChipCallable id | `chip_cid = worker.register(chip_callable)` **before** `init()` |
| Python SubWorker | `sub_cid = worker.register(fn)` **before** `init()` |
| `Worker(level=3)` config | `device_ids=[0, 1]`, `num_sub_workers=1` |
| Orchestration | `orch.submit_next_level(...)` per chip + `orch.submit_sub(cid, args)` |
| Orchestration | `orch.submit_next_level(chip_cid, ...)` per chip + `orch.submit_sub(sub_cid, args)` |

## Layout

Expand Down Expand Up @@ -66,17 +67,20 @@ host_b = [torch.randn(...).share_memory_() for _ in device_ids]
host_out = [torch.zeros(...).share_memory_() for _ in device_ids]

def subworker(sub_args): ...
sub_cid = worker.register(subworker) # BEFORE init() — see below
chip_cid = worker.register(chip_callable) # ChipCallable: BEFORE init()
sub_cid = worker.register(subworker) # Python SubWorker: BEFORE init()
```

`share_memory_()` moves the tensor's storage to a `mmap` region. After
`fork()`, the chip child process has that region mapped at the same virtual
address, so when the kernel writes to `host_out[i]`, the parent's tensor sees
it immediately. No explicit copy back.

**`register()` MUST come before `init()`**. `init()` forks child processes;
the registry is captured by copy-on-write. Anything registered after `init()`
is invisible to the forked children.
**`register()` MUST come before `init()`** for *every* callable — both
the `ChipCallable` dispatched to chips and the Python sub functions.
`init()` forks child processes; the registry is captured by copy-on-write.
Anything registered after `init()` is invisible to the forked children,
and `Worker.register()` at L≥3 raises if called post-init.

### 2. `init()` — fork + C++ scheduler

Expand All @@ -93,7 +97,7 @@ def orch_fn(orch, _args, cfg):
chip_args.add_tensor(make_tensor_arg(host_a[i]), TensorArgType.INPUT)
chip_args.add_tensor(make_tensor_arg(host_b[i]), TensorArgType.INPUT)
chip_args.add_tensor(make_tensor_arg(host_out[i]), TensorArgType.OUTPUT_EXISTING)
orch.submit_next_level(chip_callable, chip_args, cfg, worker=i)
orch.submit_next_level(chip_cid, chip_args, cfg, worker=i)

sub_args = TaskArgs()
for i in range(len(device_ids)):
Expand Down
5 changes: 4 additions & 1 deletion examples/workers/l3/multi_chip_dispatch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def subworker(sub_args: TaskArgs) -> None:
print(f"[multi_chip_dispatch] compiling kernels for {platform}...")
chip_callable = build_chip_callable(platform)

# Register the ChipCallable so submit_next_level takes a cid.
chip_cid = worker.register(chip_callable)

# --- 5. init() forks chip + sub child processes, starts C++ scheduler.
print("[multi_chip_dispatch] init worker...")
worker.init()
Expand All @@ -165,7 +168,7 @@ def orch_fn(orch, _args, cfg):
chip_args.add_tensor(make_tensor_arg(host_a[i]), TensorArgType.INPUT)
chip_args.add_tensor(make_tensor_arg(host_b[i]), TensorArgType.INPUT)
chip_args.add_tensor(make_tensor_arg(host_out[i]), TensorArgType.OUTPUT_EXISTING)
orch.submit_next_level(chip_callable, chip_args, cfg, worker=i)
orch.submit_next_level(chip_cid, chip_args, cfg, worker=i)

# Sub task that depends on both chip outputs. Tagging the two
# host_out[i] tensors INPUT tells the scheduler to wait for
Expand Down
Loading