diff --git a/examples/workers/l3/moe_multi_chip_experts/README.md b/examples/workers/l3/moe_multi_chip_experts/README.md new file mode 100644 index 000000000..bfd9c2749 --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/README.md @@ -0,0 +1,128 @@ +# `moe_multi_chip_experts/` — one expert per chip + +Runs a small distributed Mixture-of-Experts pipeline across multiple chips. +Each rank owns one expert, exchanges token slices through HCCL window buffers, +applies a simple per-expert compute kernel, and gathers the processed expert +results back to the source ranks. + +This example is intentionally tiny: `NUM_TOKENS = 10`, `HIDDEN_DIM = 16`, and +only the first `COUNT = 4` tokens are processed. The small shape makes the +data movement easy to inspect while still exercising cross-chip dispatch, +compute, and combine. + +## What This Demonstrates + +| Concept | Where it shows up | +| ------- | ----------------- | +| L3 multi-chip worker | `Worker(level=3, device_ids=[...])` in `main.py` | +| HCCL bootstrap buffers | `ChipBootstrapConfig` with `scratch1` and `scratch2` | +| Cross-rank dispatch | `kernels/aiv/moe_dispatch_alltoall.cpp` | +| Per-rank expert compute | `kernels/aiv/moe_simple_compute.cpp` | +| Cross-rank combine | `kernels/aiv/moe_combine_alltoall.cpp` | +| Device orchestration | `kernels/orchestration/moe_end2end_orch.cpp` | +| Pytest integration | `test_moe_multi_chip_experts.py` calls `main.run(...)` | + +## Layout + +```text +moe_multi_chip_experts/ + main.py # CLI demo and reusable run() entry + test_moe_multi_chip_experts.py # pytest wrapper, matching other L3 examples + kernels/ + aiv/ + moe_dispatch_alltoall.cpp # publish each rank's expert input + moe_simple_compute.cpp # add 1.0 to dispatched token slices + moe_combine_alltoall.cpp # gather processed expert outputs + orchestration/ + moe_end2end_orch.cpp # submit dispatch -> compute -> combine + README.md +``` + +## Pipeline + +For `N` chips, each chip owns one expert and starts with: + +```text +send[expert_id][token][hidden] +recv[source_rank][token][hidden] +output[expert_id][token][hidden] +``` + +The orchestration submits three AIV kernels: + +```text +┌──────────┐ ┌─────────┐ ┌─────────┐ +│ Dispatch │ ───▶ │ Compute │ ───▶ │ Combine │ +└──────────┘ └─────────┘ └─────────┘ +``` + +1. Dispatch writes each rank's expert slice into the owner rank's `recv`. +2. Compute adds `1.0` to the first `COUNT` tokens in `recv`. +3. Combine copies each expert's processed slice into the source rank's + `output[expert_id]` row. + +`scratch1` is the HCCL window used by dispatch. `scratch2` is the HCCL window +used by combine. Compute only updates `recv`; it does not use either scratch +window. + +The two communication phases use independent windows mainly because each +kernel places its barrier signal slots at the tail of its scratch buffer and +does not reset those slots before use. Dispatch leaves its signal slots +incremented after its cross-rank barrier. If combine reused the same window, +its `TWAIT` could observe the old dispatch signals and pass before combine has +staged its own data. A separate `scratch2` gives combine independent data +storage and independent signal slots. + +## Data Pattern + +Inputs are initialized with unique values: + +```text +value = card_id * 1_000_000 + expert_id * 10_000 + token * 100 + dim +``` + +After compute, every checked output value should be the corresponding input +value plus `1.0`. `main.py` computes the golden reference in Python and checks +every `output[expert_id][token][hidden]` element for the processed token +range. + +## Run + +Hardware: + +```bash +python examples/workers/l3/moe_multi_chip_experts/main.py -p a2a3 -d 0-1 +``` + +Simulation: + +```bash +python examples/workers/l3/moe_multi_chip_experts/main.py -p a2a3sim -d 0-1 +``` + +The pytest wrapper follows the same style as the other L3 examples: + +```bash +python -m pytest examples/workers/l3/moe_multi_chip_experts --platform a2a3 --device 0-1 +``` + +For the CLI, device ids can be written as a range (`-d 0-1`) or a +comma-separated list (`-d 0,1`). For pytest, pass the same device spec to +`--device`. The examples use ranges because that matches the other L3 docs. + +Expected successful output for the two-chip commands above includes: + +```text +[End2End] End-to-end pipeline completed! + Total: 256/256 correct +[End2End] All values correct! End-to-end pipeline works perfectly. +``` + +## Notes + +- `test_moe_multi_chip_experts.py` is a thin pytest wrapper around + `main.run(...)`. +- The pytest case runs on `a2a3` hardware and requires two available device + ids. +- Each rank allocates independent `scratch1` and `scratch2` HCCL windows + during worker bootstrap. diff --git a/examples/workers/l3/moe_multi_chip_experts/__init__.py b/examples/workers/l3/moe_multi_chip_experts/__init__.py new file mode 100644 index 000000000..febbca099 --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Multi-chip MoE example package.""" diff --git a/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_combine_alltoall.cpp b/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_combine_alltoall.cpp new file mode 100644 index 000000000..99b816f69 --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_combine_alltoall.cpp @@ -0,0 +1,217 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * MoE Combine All-to-All Kernel (Direct Store Version) + * + * This kernel implements the combine phase of distributed MoE: + * Each card i sends recv[i][card_j] (expert_i's result for card_j) to card j, + * then directly stores all received results to output (one expert per output row). + * + * Data flow: + * Phase 1 (stage-in): recv[:][:][:COUNT][:] → scratch[my_rank][:][:][:] + * Phase 2 (barrier): signal matrix + TWAIT cross-rank sync + * Phase 3 (store): for expert_i in num_cards: copy scratch[expert_i][my_rank][:][:] to output[expert_i][:][:] + * + * Output layout: + * output[expert_i][token_t][:] = data from expert_i for this card, token t + * + * args layout: + * tensor(0) = recv_local [num_cards][num_tokens][hidden_dim] + * tensor(1) = output_local [num_cards][count][hidden_dim] - stores all experts' data + * tensor(2) = scratch HCCL window buffer + * tensor(3) = scratch_print Debug output buffer (Phase 1 stage-in mirror) + * scalar(0) = card_id which card this is + * scalar(1) = num_cards total number of cards + * scalar(2) = CommContext device pointer for cross-card communication + */ + +#include +#include +#include "pto/comm/comm_types.hpp" +#include "pto/comm/pto_comm_inst.hpp" +#include "platform_comm/comm_context.h" +#include "tensor.h" + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +// Configuration matching the in-test golden references +static constexpr size_t NUM_TOKENS = 10; +static constexpr size_t HIDDEN_DIM = 16; +static constexpr size_t COUNT = 4; // tokens to process per (card, expert) pair +static constexpr int kMaxSupportedCards = 16; + +template +AICORE inline __gm__ T *CommRemotePtr(__gm__ CommContext *ctx, __gm__ T *localPtr, int pe) { + uint64_t localBase = ctx->windowsIn[ctx->rankId]; + uint64_t offset = (uint64_t)localPtr - localBase; + return (__gm__ T *)(ctx->windowsIn[pe] + offset); +} + +extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { + // Unpack tensors + __gm__ Tensor *recv_tensor = reinterpret_cast<__gm__ Tensor *>(args[0]); + __gm__ Tensor *output_tensor = reinterpret_cast<__gm__ Tensor *>(args[1]); + __gm__ Tensor *scratch_tensor = reinterpret_cast<__gm__ Tensor *>(args[2]); + __gm__ Tensor *scratch_print_tensor = reinterpret_cast<__gm__ Tensor *>(args[3]); + + // Unpack scalars + int64_t card_id = static_cast(args[4]); + int num_cards = static_cast(args[5]); + __gm__ CommContext *commCtx = reinterpret_cast<__gm__ CommContext *>(args[6]); + + // Get base pointers + __gm__ float *recv = reinterpret_cast<__gm__ float *>(recv_tensor->buffer.addr) + recv_tensor->start_offset; + __gm__ float *output = reinterpret_cast<__gm__ float *>(output_tensor->buffer.addr) + output_tensor->start_offset; + __gm__ float *scratch = + reinterpret_cast<__gm__ float *>(scratch_tensor->buffer.addr) + scratch_tensor->start_offset; + __gm__ float *scratch_print = + reinterpret_cast<__gm__ float *>(scratch_print_tensor->buffer.addr) + scratch_print_tensor->start_offset; + + // Signal area at tail of scratch: num_cards int32 slots + // Must be placed AFTER all data slots to avoid corruption + size_t total_data_size = num_cards * num_cards * NUM_TOKENS * HIDDEN_DIM; + __gm__ int32_t *signal_base = reinterpret_cast<__gm__ int32_t *>(scratch + total_data_size); + + using ShapeDyn = pto::Shape; + using StrideDyn = pto::Stride; + using Global = pto::GlobalTensor; + + int my_rank = static_cast(commCtx->rankId); + + if (num_cards <= 0 || num_cards > kMaxSupportedCards) { + pipe_barrier(PIPE_ALL); + return; + } + + // ------------------------------------------------------------------ + // Phase 1: stage-in — copy recv to scratch + // This card's expert result for all cards (as destination) + // + // + // For card_i with expert_id, copy recv[card_j][:][:] to scratch[expert_id][card_j][:][:] + // ------------------------------------------------------------------ + for (int card_j = 0; card_j < num_cards; ++card_j) { + for (size_t t = 0; t < COUNT; ++t) { + // Source: recv[card_j][t][:HIDDEN_DIM] (expert_id's processed data from card_j) + // recv layout: [num_cards][NUM_TOKENS][HIDDEN_DIM] + // Base points to current (card_j, t), stride should keep access within current token + ShapeDyn src_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn src_stride( + NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, HIDDEN_DIM, 1 + ); + Global srcG(recv + card_j * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM, src_shape, src_stride); + + // Destination: scratch[my_rank][card_j][t][:HIDDEN_DIM] + // Offset = my_rank * (num_cards * NUM_TOKENS * HIDDEN_DIM) + // + card_j * (NUM_TOKENS * HIDDEN_DIM) + // + t * HIDDEN_DIM + size_t dst_offset = + my_rank * num_cards * NUM_TOKENS * HIDDEN_DIM + card_j * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM; + + ShapeDyn dst_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn dst_stride( + num_cards * NUM_TOKENS * HIDDEN_DIM, num_cards * NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, + HIDDEN_DIM, 1 + ); + Global dstG(scratch + dst_offset, dst_shape, dst_stride); + Global dstG_print(scratch_print + dst_offset, dst_shape, dst_stride); + + using TileType = pto::Tile; + TileType tile(1, HIDDEN_DIM); + TASSIGN(tile, 0); + + TLOAD(tile, srcG); + set_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + wait_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + TSTORE(dstG, tile); + TSTORE(dstG_print, tile); + set_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + wait_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + } + } + pipe_barrier(PIPE_ALL); + + // ------------------------------------------------------------------ + // Phase 2: device barrier — each card notifies peers that its + // recv[:][my_card] data is visible in scratch, then waits for all peers. + // ------------------------------------------------------------------ + for (int peer = 0; peer < num_cards; ++peer) { + if (peer == my_rank) continue; + __gm__ int32_t *remote_signal = CommRemotePtr(commCtx, signal_base + my_rank, peer); + pto::comm::Signal sig(remote_signal); + pto::comm::TNOTIFY(sig, (int32_t)1, pto::comm::NotifyOp::AtomicAdd); + } + for (int peer = 0; peer < num_cards; ++peer) { + if (peer == my_rank) continue; + pto::comm::Signal sig(signal_base + peer); + pto::comm::TWAIT(sig, (int32_t)1, pto::comm::WaitCmp::GE); + } + pipe_barrier(PIPE_ALL); + + // ------------------------------------------------------------------ + // Phase 3: direct store — copy each expert's data to output + // Read scratch[expert_i][my_rank][t][:HIDDEN_DIM] from each expert i + // and store to output[expert_i][t][:HIDDEN_DIM] + // + // For card_id with my_rank: + // output[expert_0][t][:] = scratch[expert_0][my_rank][t][:] + // output[expert_1][t][:] = scratch[expert_1][my_rank][t][:] + // etc. + // ------------------------------------------------------------------ + for (int expert_i = 0; expert_i < num_cards; ++expert_i) { + for (size_t t = 0; t < COUNT; ++t) { + // Source: scratch[expert_i][my_rank][t][:HIDDEN_DIM] + // Offset = expert_i * (num_cards * NUM_TOKENS * HIDDEN_DIM) + // + my_rank * (NUM_TOKENS * HIDDEN_DIM) + // + t * HIDDEN_DIM + __gm__ float *src_base = (expert_i == my_rank) ? scratch : CommRemotePtr(commCtx, scratch, expert_i); + size_t src_offset = + expert_i * num_cards * NUM_TOKENS * HIDDEN_DIM + my_rank * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM; + + ShapeDyn src_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn src_stride( + num_cards * NUM_TOKENS * HIDDEN_DIM, num_cards * NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, + HIDDEN_DIM, 1 + ); + Global srcG(src_base + src_offset, src_shape, src_stride); + + // Destination: output[expert_i][t][:HIDDEN_DIM] + // Offset = expert_i * (COUNT * HIDDEN_DIM) + t * HIDDEN_DIM + size_t dst_offset = expert_i * COUNT * HIDDEN_DIM + t * HIDDEN_DIM; + + ShapeDyn dst_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn dst_stride(COUNT * HIDDEN_DIM, HIDDEN_DIM, HIDDEN_DIM, HIDDEN_DIM, 1); + Global dstG(output + dst_offset, dst_shape, dst_stride); + + using TileType = pto::Tile; + TileType tile(1, HIDDEN_DIM); + TASSIGN(tile, 0); + + // Load from scratch + TLOAD(tile, srcG); + set_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + wait_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + + // Store to output + TSTORE(dstG, tile); + set_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + wait_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + } + } + + pipe_barrier(PIPE_ALL); +} diff --git a/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_dispatch_alltoall.cpp b/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_dispatch_alltoall.cpp new file mode 100644 index 000000000..1e424aa49 --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_dispatch_alltoall.cpp @@ -0,0 +1,201 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * MoE Dispatch All-to-All Kernel + * + * This kernel implements the dispatch phase of distributed MoE: + * Each card i sends send[i][expert_i] to all other cards, and receives + * send[j][expert_i] from card j. + * + * Data flow: + * Phase 1 (stage-in): send[expert_i][:][:] → my scratch slot + * Phase 2 (barrier): signal matrix + TWAIT cross-rank sync + * Phase 3 (gather): for card_j in num_cards: TLOAD(card_j_scratch), TSTORE(recv[card_j][:][:]) + * + * args layout: + * tensor(0) = send_local [num_experts][num_tokens][hidden_dim] + * tensor(1) = recv_local [num_cards][num_tokens][hidden_dim] + * tensor(2) = scratch HCCL window buffer + * scalar(0) = expert_id which expert this card processes + * scalar(1) = num_cards total number of cards + * scalar(2) = CommContext device pointer for cross-card communication + */ + +#include +#include +#include "pto/comm/comm_types.hpp" +#include "pto/comm/pto_comm_inst.hpp" +#include "platform_comm/comm_context.h" +#include "tensor.h" + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +// Configuration matching the in-test golden references +static constexpr size_t NUM_TOKENS = 10; +static constexpr size_t HIDDEN_DIM = 16; +static constexpr size_t COUNT = 4; // tokens to process per (card, expert) pair +static constexpr int kMaxSupportedCards = 16; + +template +AICORE inline __gm__ T *CommRemotePtr(__gm__ CommContext *ctx, __gm__ T *localPtr, int pe) { + uint64_t localBase = ctx->windowsIn[ctx->rankId]; + uint64_t offset = (uint64_t)localPtr - localBase; + return (__gm__ T *)(ctx->windowsIn[pe] + offset); +} + +extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { + // Unpack tensors + __gm__ Tensor *send_tensor = reinterpret_cast<__gm__ Tensor *>(args[0]); + __gm__ Tensor *recv_tensor = reinterpret_cast<__gm__ Tensor *>(args[1]); + __gm__ Tensor *scratch_tensor = reinterpret_cast<__gm__ Tensor *>(args[2]); + + // Unpack scalars + int64_t expert_id = static_cast(args[3]); + int num_cards = static_cast(args[4]); + __gm__ CommContext *commCtx = reinterpret_cast<__gm__ CommContext *>(args[5]); + + // Get base pointers + __gm__ float *send = reinterpret_cast<__gm__ float *>(send_tensor->buffer.addr) + send_tensor->start_offset; + __gm__ float *recv = reinterpret_cast<__gm__ float *>(recv_tensor->buffer.addr) + recv_tensor->start_offset; + __gm__ float *scratch = + reinterpret_cast<__gm__ float *>(scratch_tensor->buffer.addr) + scratch_tensor->start_offset; + + // Signal area at tail of scratch: num_cards int32 slots + // Must be placed AFTER all data slots to avoid corruption + size_t total_data_size = num_cards * num_cards * NUM_TOKENS * HIDDEN_DIM; + __gm__ int32_t *signal_base = reinterpret_cast<__gm__ int32_t *>(scratch + total_data_size); + + using ShapeDyn = pto::Shape; + using StrideDyn = pto::Stride; + using Global = pto::GlobalTensor; + + int my_rank = static_cast(commCtx->rankId); + + if (num_cards <= 0 || num_cards > kMaxSupportedCards) { + pipe_barrier(PIPE_ALL); + return; + } + + // ------------------------------------------------------------------ + // Phase 1: stage-in — copy ALL experts' data to my scratch slot + // Each card contributes ALL of its send[:] (all experts) to enable all-to-all + // + // Data layout in scratch: scratch[card_j][expert_i][:][:] + // where card_j = my_rank (the card sending the data) + // expert_i = expert index (0..num_cards-1) + // t = token index (0..COUNT-1) + // + // This allows combine phase to access: + // "expert_i's data from card_j" at scratch[card_j][expert_i] + // ------------------------------------------------------------------ + for (int expert_i = 0; expert_i < num_cards; ++expert_i) { + for (size_t t = 0; t < COUNT; ++t) { + // Load from send[expert_i][t][:HIDDEN_DIM] (ALL experts, not just expert_id) + ShapeDyn send_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn send_stride(NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, HIDDEN_DIM, HIDDEN_DIM, 1); + Global sendG(send + expert_i * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM, send_shape, send_stride); + + // Store to scratch[my_rank][expert_i][t][:HIDDEN_DIM] + // Index = my_rank * (num_cards * NUM_TOKENS * HIDDEN_DIM) + // + expert_i * (NUM_TOKENS * HIDDEN_DIM) + // + t * HIDDEN_DIM + size_t scratch_offset = + my_rank * num_cards * NUM_TOKENS * HIDDEN_DIM + expert_i * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM; + + ShapeDyn scratch_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn scratch_stride( + num_cards * NUM_TOKENS * HIDDEN_DIM, num_cards * NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, + HIDDEN_DIM, 1 + ); + Global scratchG(scratch + scratch_offset, scratch_shape, scratch_stride); + + // Use tile for data movement + using TileType = pto::Tile; + TileType tile(1, HIDDEN_DIM); + TASSIGN(tile, 0); + + TLOAD(tile, sendG); + set_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + wait_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + TSTORE(scratchG, tile); + set_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + wait_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + } + } + pipe_barrier(PIPE_ALL); + + // ------------------------------------------------------------------ + // Phase 2: device barrier — each card notifies peers that its + // send[expert_i] data is visible in scratch, then waits for all peers. + // ------------------------------------------------------------------ + for (int peer = 0; peer < num_cards; ++peer) { + if (peer == my_rank) continue; + __gm__ int32_t *remote_signal = CommRemotePtr(commCtx, signal_base + my_rank, peer); + pto::comm::Signal sig(remote_signal); + pto::comm::TNOTIFY(sig, (int32_t)1, pto::comm::NotifyOp::AtomicAdd); + } + for (int peer = 0; peer < num_cards; ++peer) { + if (peer == my_rank) continue; + pto::comm::Signal sig(signal_base + peer); + pto::comm::TWAIT(sig, (int32_t)1, pto::comm::WaitCmp::GE); + } + pipe_barrier(PIPE_ALL); + + // ------------------------------------------------------------------ + // Phase 3: gather — read send[j][expert_id] from each card j's scratch + // and store to recv[card_j][:COUNT][:HIDDEN_DIM] + // + // For expert_id on this card, gather data from ALL cards: + // recv[card_j][:][:] = scratch[card_j][expert_id][:][:] + // ------------------------------------------------------------------ + for (int card_j = 0; card_j < num_cards; ++card_j) { + for (size_t t = 0; t < COUNT; ++t) { + // Source: scratch[card_j][expert_id][t][:HIDDEN_DIM] + // Offset = card_j * (num_cards * NUM_TOKENS * HIDDEN_DIM) + // + expert_id * (NUM_TOKENS * HIDDEN_DIM) + // + t * HIDDEN_DIM + __gm__ float *src_base = (card_j == my_rank) ? scratch : CommRemotePtr(commCtx, scratch, card_j); + size_t src_offset = + card_j * num_cards * NUM_TOKENS * HIDDEN_DIM + expert_id * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM; + + ShapeDyn src_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn src_stride( + num_cards * NUM_TOKENS * HIDDEN_DIM, num_cards * NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, + HIDDEN_DIM, 1 + ); + Global srcG(src_base + src_offset, src_shape, src_stride); + + // Destination: recv[card_j][t][:HIDDEN_DIM] + ShapeDyn dst_shape(1, 1, 1, 1, HIDDEN_DIM); + StrideDyn dst_stride(NUM_TOKENS * HIDDEN_DIM, NUM_TOKENS * HIDDEN_DIM, HIDDEN_DIM, HIDDEN_DIM, 1); + Global dstG(recv + card_j * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM, dst_shape, dst_stride); + + using TileType = pto::Tile; + TileType tile(1, HIDDEN_DIM); + TASSIGN(tile, 0); + + TLOAD(tile, srcG); + set_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + wait_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0); + TSTORE(dstG, tile); + set_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + wait_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0); + } + } + + pipe_barrier(PIPE_ALL); +} diff --git a/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_simple_compute.cpp b/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_simple_compute.cpp new file mode 100644 index 000000000..c7e04d621 --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/kernels/aiv/moe_simple_compute.cpp @@ -0,0 +1,62 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/* + * Simple Compute Kernel for MoE + * + * Adds 1.0 to all elements in recv[:][:4][:] + * + * args layout: + * tensor(0) = recv [num_cards][NUM_TOKENS][HIDDEN_DIM] + * scalar(0) = num_cards + * scalar(1) = unused (for compatibility) + * scalar(2) = unused (for compatibility) + */ + +#include +#include +#include "tensor.h" + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +static constexpr size_t NUM_TOKENS = 10; +static constexpr size_t HIDDEN_DIM = 16; +static constexpr size_t COUNT = 4; +static constexpr int kMaxSupportedCards = 16; + +extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t *args) { + __gm__ Tensor *recv_tensor = reinterpret_cast<__gm__ Tensor *>(args[0]); + __gm__ float *recv = reinterpret_cast<__gm__ float *>(recv_tensor->buffer.addr) + recv_tensor->start_offset; + int num_cards = static_cast(args[1]); + + if (num_cards <= 0 || num_cards > kMaxSupportedCards) { + pipe_barrier(PIPE_ALL); + return; + } + + // Add 1.0 to first COUNT tokens for all cards + // recv layout: [num_cards][NUM_TOKENS][HIDDEN_DIM] + for (int card = 0; card < num_cards; ++card) { + for (size_t t = 0; t < COUNT; ++t) { + for (size_t d = 0; d < HIDDEN_DIM; ++d) { + size_t offset = card * NUM_TOKENS * HIDDEN_DIM + t * HIDDEN_DIM + d; + recv[offset] += 1.0f; + } + } + } + + pipe_barrier(PIPE_ALL); +} diff --git a/examples/workers/l3/moe_multi_chip_experts/kernels/orchestration/moe_end2end_orch.cpp b/examples/workers/l3/moe_multi_chip_experts/kernels/orchestration/moe_end2end_orch.cpp new file mode 100644 index 000000000..b01237072 --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/kernels/orchestration/moe_end2end_orch.cpp @@ -0,0 +1,119 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +// Orchestration Function: End-to-End MoE Pipeline +// +// This orchestration runs the complete MoE pipeline: +// 1. Dispatch: distribute tokens to expert cards +// 2. Compute: process tokens on each expert card +// 3. Combine: gather results back to source cards +// +// Uses independent dispatch and combine scratch buffers to avoid reuse hazards. + +#include "runtime.h" +#include + +#include +#include +#include + +#include "pto_orchestration_api.h" + +// Must match the in-test golden references and kernel configurations +static constexpr int64_t COUNT = 4; // Number of tokens to process per (card, expert) pair +static constexpr int64_t NUM_TOKENS = 10; // Total number of tokens +static constexpr int64_t HIDDEN_DIM = 16; // Hidden dimension + +extern "C" { + +__attribute__((visibility("default"))) PTO2OrchestrationConfig +aicpu_orchestration_config(const ChipStorageTaskArgs &orch_args) { + return PTO2OrchestrationConfig{ + .expected_arg_count = + 10, // send, recv, output, scratch1, scratch2, scratch_print, expert_id, card_id, num_cards, commCtx + }; +} + +__attribute__((visibility("default"))) void aicpu_orchestration_entry(const ChipStorageTaskArgs &orch_args) { + // External tensors + Tensor ext_send = from_tensor_arg(orch_args.tensor(0)); // [num_experts][tokens][hidden] + Tensor ext_recv = from_tensor_arg(orch_args.tensor(1)); // [num_cards][tokens][hidden] + Tensor ext_output = from_tensor_arg(orch_args.tensor(2)); // [num_cards][count][hidden] + Tensor ext_scratch1 = from_tensor_arg(orch_args.tensor(3)); // HCCL scratch buffer for dispatch + Tensor ext_scratch2 = from_tensor_arg(orch_args.tensor(4)); // HCCL scratch buffer for combine + Tensor ext_scratch_print = from_tensor_arg(orch_args.tensor(5)); // Scratch print buffer + + // Scalar arguments + int64_t expert_id = static_cast(orch_args.scalar(0)); // Which expert this card processes + int64_t card_id = static_cast(orch_args.scalar(1)); // Which card this is + int64_t num_cards = static_cast(orch_args.scalar(2)); // Total number of cards + uint64_t comm_ctx_ptr = static_cast(orch_args.scalar(3)); // CommContext* + + printf("[End2End Orch] card_id=%ld expert_id=%ld num_cards=%ld\n", card_id, expert_id, num_cards); + fflush(stdout); + + PTO2_SCOPE() { + // ========== PART 1: Full Pipeline ========== + printf("[End2End Orch] Part 1: Full Pipeline (Dispatch + Compute + Combine) - card_id=%ld\n", card_id); + fflush(stdout); + + // === Phase 1: Dispatch === + printf("[End2End Orch] Phase 1: Dispatch - card_id=%ld\n", card_id); + fflush(stdout); + + Arg params_dispatch; + params_dispatch.add_input(ext_send); + params_dispatch.add_output(ext_recv); + params_dispatch.add_inout(ext_scratch1); + params_dispatch.add_scalar(expert_id); + params_dispatch.add_scalar(num_cards); + params_dispatch.add_scalar(comm_ctx_ptr); + pto2_rt_submit_aiv_task(0, params_dispatch); // moe_dispatch_alltoall + + printf("[End2End Orch] Dispatch submitted\n", card_id); + fflush(stdout); + + // === Phase 2: Compute === + printf("[End2End Orch] Phase 2: Compute - card_id=%ld\n", card_id); + fflush(stdout); + + Arg params_compute; + params_compute.add_inout(ext_recv); + params_compute.add_scalar(num_cards); + params_compute.add_scalar(0); // unused + params_compute.add_scalar(0); // unused + pto2_rt_submit_aiv_task(1, params_compute); // moe_simple_compute + + printf("[End2End Orch] Compute submitted\n", card_id); + fflush(stdout); + + // === Phase 3: Combine (Full Pipeline) === + printf("[End2End Orch] Phase 3: Combine (full pipeline) - card_id=%ld\n", card_id); + fflush(stdout); + + Arg params_combine; + params_combine.add_input(ext_recv); + params_combine.add_output(ext_output); + params_combine.add_inout(ext_scratch2); + params_combine.add_output(ext_scratch_print); + params_combine.add_scalar(card_id); + params_combine.add_scalar(num_cards); + params_combine.add_scalar(comm_ctx_ptr); + pto2_rt_submit_aiv_task(2, params_combine); // moe_combine_alltoall + + printf("[End2End Orch] Combine (full pipeline) submitted\n", card_id); + fflush(stdout); + } + + printf("[End2End Orch] card_id=%ld completed\n", card_id); + fflush(stdout); +} + +} // extern "C" diff --git a/examples/workers/l3/moe_multi_chip_experts/main.py b/examples/workers/l3/moe_multi_chip_experts/main.py new file mode 100644 index 000000000..a763ec61e --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/main.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""End-to-end distributed MoE demo. + +Runs dispatch, per-expert compute, and combine across one expert per chip. + +Run: + python examples/workers/l3/moe_multi_chip_experts/main.py -p a2a3 -d 0-3 +""" + +import argparse +import os +import sys +import traceback + +os.environ.setdefault("KMP_DUPLICATE_LIB_OK", "TRUE") + +import torch +from simpler.task_interface import ( + ArgDirection, + CallConfig, + ChipBootstrapConfig, + ChipBufferSpec, + ChipCallable, + ChipCommBootstrapConfig, + ContinuousTensor, + CoreCallable, + DataType, + TaskArgs, + TensorArgType, +) +from simpler.worker import Worker + +from simpler_setup.elf_parser import extract_text_section +from simpler_setup.kernel_compiler import KernelCompiler +from simpler_setup.pto_isa import ensure_pto_isa_root +from simpler_setup.torch_interop import make_tensor_arg + +HERE = os.path.dirname(os.path.abspath(__file__)) + +# MoE configuration +NUM_TOKENS = 10 +HIDDEN_DIM = 16 +COUNT = 4 + + +def parse_args(): + parser = argparse.ArgumentParser(description="Test complete MoE pipeline (Dispatch + Compute + Combine)") + parser.add_argument("-p", "--platform", required=True, choices=["a2a3sim", "a2a3", "a5sim", "a5"]) + parser.add_argument("-d", "--device", default="0-3", help="Device range") + return parser.parse_args() + + +def parse_device_range(spec: str) -> list[int]: + if "-" in spec: + lo, hi = (int(x) for x in spec.split("-")) + return list(range(lo, hi + 1)) + elif "," in spec: + return [int(x) for x in spec.split(",")] + else: + return [int(spec)] + + +def build_end2end_callable(platform: str) -> ChipCallable: + """Build callable with dispatch + compute + combine kernels.""" + print("[End2End] Compiling kernels...", flush=True) + kc = KernelCompiler(platform=platform) + runtime = "tensormap_and_ringbuffer" + pto_isa_root = ensure_pto_isa_root(clone_protocol="https") + include_dirs = kc.get_orchestration_include_dirs(runtime) + kernel_include_dirs = list(include_dirs) + [str(kc.project_root / "src" / "common")] + + # Compile dispatch kernel + dispatch_bytes = kc.compile_incore( + source_path=os.path.join(HERE, "kernels/aiv/moe_dispatch_alltoall.cpp"), + core_type="aiv", + pto_isa_root=pto_isa_root, + extra_include_dirs=kernel_include_dirs, + ) + print("[End2End] Dispatch kernel compiled", flush=True) + + # Compile compute kernel + compute_bytes = kc.compile_incore( + source_path=os.path.join(HERE, "kernels/aiv/moe_simple_compute.cpp"), + core_type="aiv", + pto_isa_root=pto_isa_root, + extra_include_dirs=include_dirs, + ) + print("[End2End] Compute kernel compiled", flush=True) + + # Compile combine kernel + combine_bytes = kc.compile_incore( + source_path=os.path.join(HERE, "kernels/aiv/moe_combine_alltoall.cpp"), + core_type="aiv", + pto_isa_root=pto_isa_root, + extra_include_dirs=kernel_include_dirs, + ) + print("[End2End] Combine kernel compiled", flush=True) + + if not platform.endswith("sim"): + dispatch_bytes = extract_text_section(dispatch_bytes) + compute_bytes = extract_text_section(compute_bytes) + combine_bytes = extract_text_section(combine_bytes) + print("[End2End] Text sections extracted", flush=True) + + # Compile orchestration + print("[End2End] Compiling orchestration...", flush=True) + orch_bytes = kc.compile_orchestration( + runtime_name=runtime, + source_path=os.path.join(HERE, "kernels/orchestration/moe_end2end_orch.cpp"), + ) + print("[End2End] Orchestration compiled", flush=True) + + # Build core callables + dispatch_cc = CoreCallable.build( + signature=[ + ArgDirection.IN, + ArgDirection.OUT, + ArgDirection.INOUT, + ArgDirection.IN, + ArgDirection.IN, + ArgDirection.IN, + ], + binary=dispatch_bytes, + ) + + compute_cc = CoreCallable.build( + signature=[ArgDirection.INOUT, ArgDirection.IN, ArgDirection.IN, ArgDirection.IN], + binary=compute_bytes, + ) + + combine_cc = CoreCallable.build( + signature=[ + ArgDirection.IN, + ArgDirection.OUT, + ArgDirection.INOUT, + ArgDirection.OUT, + ArgDirection.IN, + ArgDirection.IN, + ArgDirection.IN, + ], + binary=combine_bytes, + ) + + return ChipCallable.build( + signature=[ + ArgDirection.IN, # send + ArgDirection.OUT, # recv + ArgDirection.OUT, # output + ArgDirection.INOUT, # scratch1: dispatch HCCL window + ArgDirection.INOUT, # scratch2: combine HCCL window + ArgDirection.OUT, # scratch_print + ArgDirection.IN, # expert_id + ArgDirection.IN, # card_id + ArgDirection.IN, # num_cards + ArgDirection.IN, # CommContext* + ], + func_name="aicpu_orchestration_entry", + binary=orch_bytes, + children=[(0, dispatch_cc), (1, compute_cc), (2, combine_cc)], # All three phases + ) + + +def compute_golden_end2end(num_cards: int, host_send: list[torch.Tensor]) -> list[torch.Tensor]: + """ + Compute golden output for end-to-end pipeline: + 1. Dispatch: send[card_j][expert_i][:COUNT][:] -> recv[card_i][card_j][:COUNT][:] + 2. Compute: recv[card_i][card_j][:COUNT][:] += 1.0 + 3. Combine: recv[expert_j][card_i][:COUNT][:] -> output[card_i][expert_j][:COUNT][:] + + Send initialization: unique values using (card * 1000000 + expert * 10000 + token * 100 + dim) + """ + golden_outputs = [] + for cardi in range(num_cards): + output = torch.zeros(num_cards, COUNT, HIDDEN_DIM, dtype=torch.float32) + for expertj in range(num_cards): + for t in range(COUNT): + for d in range(HIDDEN_DIM): + # After dispatch: recv[cardi][expertj][:][:] = send[expertj][cardi][:][:] + # Value from cardi's send[expertj][cardi][t][d] + send_value = host_send[cardi][expertj, t, d].item() + # After compute: recv += 1.0 + recv_value = send_value + 1.0 + # After combine: output[cardi][expertj][t][d] = recv[expertj][cardi][t][d] + output[expertj, t, d] = recv_value + golden_outputs.append(output) + + return golden_outputs + + +def make_host_tensors(num_cards: int, num_experts: int): + host_send = [] + for i in range(num_cards): + send = torch.zeros(num_experts, NUM_TOKENS, HIDDEN_DIM, dtype=torch.float32).share_memory_() + for expert_j in range(num_experts): + for t in range(NUM_TOKENS): + for d in range(HIDDEN_DIM): + value = float(i * 1000000 + expert_j * 10000 + t * 100 + d) + send[expert_j, t, d] = value + host_send.append(send) + + host_recv = [ + torch.zeros(num_cards, NUM_TOKENS, HIDDEN_DIM, dtype=torch.float32).share_memory_() for _ in range(num_cards) + ] + host_output = [ + torch.zeros(num_cards, COUNT, HIDDEN_DIM, dtype=torch.float32).share_memory_() for _ in range(num_cards) + ] + return host_send, host_recv, host_output + + +def make_bootstrap_configs(num_cards: int, rootinfo_path: str, window_size: int, scratch_buffer_count: int): + total_scratch_nbytes = scratch_buffer_count * 4 + return [ + ChipBootstrapConfig( + comm=ChipCommBootstrapConfig( + rank=rank, + nranks=num_cards, + rootinfo_path=rootinfo_path, + window_size=window_size, + ), + buffers=[ + ChipBufferSpec( + name="scratch1", + dtype="float32", + count=scratch_buffer_count, + nbytes=total_scratch_nbytes, + ), + ChipBufferSpec( + name="scratch2", + dtype="float32", + count=scratch_buffer_count, + nbytes=total_scratch_nbytes, + ), + ], + ) + for rank in range(num_cards) + ] + + +def print_output_samples(num_cards: int, host_output: list[torch.Tensor], golden_outputs: list[torch.Tensor]) -> None: + print("\n" + "=" * 80) + print("[End2End] OUTPUT DATA:") + print("=" * 80) + + for i in range(num_cards): + print(f"\n[End2End] Card {i} output data:") + print(" Expected: Each value = send_value + 1.0") + print(f" Sample data (up to 2 experts, first {COUNT} tokens, first 3 dims):") + + for expert_j in range(min(2, num_cards)): + print(f" Expert {expert_j}:") + for t in range(min(COUNT, 2)): + vals = host_output[i][expert_j, t, :3].tolist() + golden_vals = golden_outputs[i][expert_j, t, :3].tolist() + print(f" Token {t}: Output={vals}, Golden={golden_vals}") + + +def verify_outputs(num_cards: int, host_output: list[torch.Tensor], golden_outputs: list[torch.Tensor]) -> bool: + print("\n" + "=" * 80) + print("[End2End] VERIFICATION:") + print("=" * 80) + + all_correct = True + error_count = 0 + total_checked = 0 + + for i in range(num_cards): + print(f"\n[End2End] Card {i}:") + card_errors = 0 + for expert_j in range(num_cards): + for t in range(COUNT): + for d in range(HIDDEN_DIM): + expected = golden_outputs[i][expert_j, t, d].item() + actual = host_output[i][expert_j, t, d].item() + total_checked += 1 + if abs(actual - expected) > 1e-3: + card_errors += 1 + error_count += 1 + all_correct = False + + if card_errors == 0: + print(f" ✓ All {num_cards * COUNT * HIDDEN_DIM} values correct") + else: + print(f" ✗ {card_errors} / {num_cards * COUNT * HIDDEN_DIM} values incorrect") + + print(f"\n Total: {total_checked - error_count}/{total_checked} correct") + return all_correct + + +def make_scratch_arg(contexts, rank: int, name: str, scratch_buffer_count: int): + return ContinuousTensor.make( + data=contexts[rank].buffer_ptrs[name], + shapes=(scratch_buffer_count,), + dtype=DataType.FLOAT32, + child_memory=True, + ) + + +def run(platform: str, device_ids: list[int]) -> int: + print(f"[End2End] Testing complete MoE pipeline on devices {device_ids}", flush=True) + num_cards = len(device_ids) + num_experts = num_cards + scratch_count = num_cards * num_cards * NUM_TOKENS * HIDDEN_DIM + signal_count = num_cards + scratch_buffer_count = scratch_count + signal_count + total_scratch_nbytes = scratch_buffer_count * 4 + window_size = max(total_scratch_nbytes * 2, 4 * 1024) + + print("\n[End2End] Test Configuration:") + print(f" Platform: {platform}") + print(f" Number of cards: {num_cards}") + print(f" Device IDs: {device_ids}") + print(f" NUM_TOKENS: {NUM_TOKENS}") + print(f" HIDDEN_DIM: {HIDDEN_DIM}") + print(f" COUNT (tokens processed): {COUNT}") + + rootinfo_path = f"/tmp/pto_end2end_{os.getpid()}.bin" + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + + torch.manual_seed(42) + host_send, host_recv, host_output = make_host_tensors(num_cards, num_experts) + host_scratch_print = [torch.zeros(scratch_count, dtype=torch.float32).share_memory_() for _ in device_ids] + + print("\n[End2End] Allocated tensors:") + print(" send=unique_values, recv=0.0, output=0.0") + print(" Value encoding: (card_id * 1000000) + (expert_id * 10000) + (token * 100) + dim", flush=True) + + print("\n[End2End] Computing golden output...") + golden_outputs = compute_golden_end2end(num_cards, host_send) + print("[End2End] Golden output computed", flush=True) + + cfgs = make_bootstrap_configs(num_cards, rootinfo_path, window_size, scratch_buffer_count) + worker = Worker( + level=3, + platform=platform, + runtime="tensormap_and_ringbuffer", + device_ids=device_ids, + num_sub_workers=0, + chip_bootstrap_configs=cfgs, + ) + + print(f"\n[End2End] Compiling kernels for {platform}...", flush=True) + end2end_cc = build_end2end_callable(platform) + print("[End2End] All kernels compiled successfully", flush=True) + + print("[End2End] Initializing worker...", flush=True) + worker.init() + contexts = worker.chip_contexts + print(f"[End2End] Worker initialized with {len(contexts)} contexts", flush=True) + + try: + + def orch_fn(orch, _args, cfg): + print(f"[End2End] Submitting tasks for {num_cards} cards", flush=True) + for i in range(num_cards): + args = TaskArgs() + args.add_tensor(make_tensor_arg(host_send[i]), TensorArgType.INPUT) + args.add_tensor(make_tensor_arg(host_recv[i]), TensorArgType.OUTPUT_EXISTING) + args.add_tensor(make_tensor_arg(host_output[i]), TensorArgType.OUTPUT_EXISTING) + + args.add_tensor(make_scratch_arg(contexts, i, "scratch1", scratch_buffer_count), TensorArgType.INOUT) + args.add_tensor(make_scratch_arg(contexts, i, "scratch2", scratch_buffer_count), TensorArgType.INOUT) + args.add_tensor(make_tensor_arg(host_scratch_print[i]), TensorArgType.OUTPUT_EXISTING) + + args.add_scalar(i) # expert_id + args.add_scalar(i) # card_id + args.add_scalar(num_cards) + args.add_scalar(contexts[i].device_ctx) + + orch.submit_next_level(end2end_cc, args, cfg, worker=i) + print(f"[End2End] Submitted task for card {i}", flush=True) + + print("\n[End2End] Running end-to-end test...", flush=True) + + worker.run(orch_fn, args=None, config=CallConfig()) + print("\n[End2End] End-to-end pipeline completed!", flush=True) + + print_output_samples(num_cards, host_output, golden_outputs) + all_correct = verify_outputs(num_cards, host_output, golden_outputs) + print("\n" + "=" * 80) + print("[End2End] FINAL VERDICT:") + print("=" * 80) + + if all_correct: + print("\n[End2End] ✅ All values correct! End-to-end pipeline works perfectly.") + return 0 + else: + print("\n[End2End] ❌ Some values incorrect!") + return 1 + + except Exception as e: + print(f"[End2End] ERROR: {e}") + traceback.print_exc() + return 1 + + finally: + print("[End2End] Shutting down worker...") + worker.close() + try: + os.unlink(rootinfo_path) + except FileNotFoundError: + pass + + +def main() -> int: + args = parse_args() + device_ids = parse_device_range(args.device) + return run(args.platform, device_ids) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/workers/l3/moe_multi_chip_experts/test_moe_multi_chip_experts.py b/examples/workers/l3/moe_multi_chip_experts/test_moe_multi_chip_experts.py new file mode 100644 index 000000000..c501d8900 --- /dev/null +++ b/examples/workers/l3/moe_multi_chip_experts/test_moe_multi_chip_experts.py @@ -0,0 +1,26 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Hardware ST for examples/workers/l3/moe_multi_chip_experts.""" + +import pytest + +from .main import run + + +@pytest.mark.platforms(["a2a3"]) +@pytest.mark.runtime("tensormap_and_ringbuffer") +@pytest.mark.device_count(2) +def test_moe_multi_chip_2_experts(st_platform, st_device_ids): + """Test multi-chip MoE with 2 experts (1 per chip). + + This should produce the SAME results as moe_single_chip with 2 experts, + just executed in parallel across 2 chips instead of sequentially on 1 chip. + """ + rc = run(st_platform, [int(d) for d in st_device_ids]) + assert rc == 0