Skip to content

Replace Spawn trait with FuturesUnordered #88

@lxsaah

Description

@lxsaah

Background

Spawn is currently composed into the Runtime bundle trait:

pub trait Runtime: RuntimeAdapter + TimeOps + Logger + Spawn {}

This propagates R: Spawn through the entire type system — AimDb<R>, RuntimeContext<R>, Producer<T, R>, Consumer<T, R>, TypedRecord<T, R> — and forces each runtime adapter to implement dynamic task spawning. On Embassy this requires a non-trivial workaround: every future is heap-allocated, type-erased, and fed into a compile-time-fixed task pool with an unsafe { Pin::new_unchecked } block. On WASM it forces unsafe impl Send/Sync.

Audit finding

All spawn() calls in aimdb occur inside a single phase: database.build(). No task is ever spawned after initialization completes. Every spawn count is determined by database configuration — not by runtime events.

Site File Count
Producer service tasks aimdb-core/src/typed_record.rs one per .source()
Consumer tasks aimdb-core/src/typed_record.rs one per .tap()
Transform forwarder tasks aimdb-core/src/transform/join.rs one per join input
Join transform tasks aimdb-core/src/typed_record.rs one per .transform_join()
Start handlers aimdb-core/src/builder.rs one per .on_start()

The set of futures to run is fully known by the time build() is called. This is exactly the use case FuturesUnordered is designed for.

Proposed change

database.build() collects futures instead of spawning them. A new database.run() drives them:

let db = Database::build(config)?;
let handle = db.handle();
db.run().await;  // drives all internal futures, blocks until shutdown

Internally, run() uses FuturesUnordered<Pin<Box<dyn Future<Output = ()>>>> — the same boxing that already happens today in both the Tokio task allocator and the Embassy adapter, so there is no regression in allocation overhead.

Impact

  • Removes Spawn from the Runtime bundle traitRuntimeAdapter + TimeOps + Logger is sufficient for all three runtimes
  • Deletes the Embassy task-pool workaroundgeneric_task_runner, BoxedFuture, TASK_POOL_SIZE feature flags, and unsafe { Pin::new_unchecked } all go away
  • Removes unsafe impl Send/Sync from EmbassyAdapter and WasmAdapter — these exist solely because Spawn requires F: Send + 'static; re-evaluate remaining unsafe impls on Producer/Consumer in typed_api.rs as part of this work
  • Uniform behaviour across all runtimes — no adapter-specific workarounds

Work items

  • Remove Spawn from the Runtime supertrait in aimdb-executor/src/lib.rs
  • Delete Spawn impl from EmbassyAdapter (aimdb-embassy-adapter/src/runtime.rs) and all associated pool machinery
  • Delete Spawn impl from WasmAdapter
  • Change database.build() to collect futures into a Vec instead of spawning them
  • Implement database.run() driving collected futures via FuturesUnordered
  • Remove R: Spawn bounds throughout aimdb-core (typed_api.rs, typed_record.rs, context.rs, database.rs)
  • Re-evaluate unsafe impl Send/Sync on Producer<T, R> and Consumer<T, R> — remove if no longer needed
  • Update all examples and aimdb-pro call sites to use db.run().await
  • Update changelog and public API docs

Breaking changes

  • database.build() no longer starts internal tasks — callers must call db.run().await
  • Spawn is removed from the Runtime supertrait — custom adapter implementations no longer need to implement it
  • EmbassyAdapter feature flags embassy-task-pool-8/16/32 are removed

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions