Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Writer-exclusivity validation (Issue #89)**: Combining `.source()`, `.transform()`, and `.link_from()` on the same record now panics at configuration time with a clear message instead of silently producing a last-writer-wins race on the buffer. Multiple `.link_from()` inbound connectors (fan-in) remain allowed. ([aimdb-core](aimdb-core/CHANGELOG.md))
- **`no_std` Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets — no longer Tokio-only. Multi-input join fan-in moved out of `aimdb-core` into the new `JoinFanInRuntime` traits in `aimdb-executor`, with implementations in the Tokio (`mpsc::channel`, capacity 64), Embassy (`embassy_sync::Channel`, capacity 8), and WASM (`futures_channel::mpsc`, capacity 64) adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md))
- **Task-model join handler**: New `JoinBuilder::on_triggers(FnOnce(JoinEventRx, Producer) -> impl Future)` API replaces the previous callback model. Eliminates per-event heap allocation and lets handler state borrow across `.await` points. **Breaking change** vs. the old `with_state().on_trigger(...)` form — see [aimdb-core](aimdb-core/CHANGELOG.md).
- **Weather-mesh `DewPoint` demo**: All three weather stations (alpha, beta, gamma) now derive a `DewPoint` record from `Temperature` and `Humidity` via `transform_join`, demonstrating the API end-to-end on Tokio and Embassy.
Expand Down
1 change: 1 addition & 0 deletions aimdb-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Writer-exclusivity validation for `.link_from()` (Issue #89)**: `.source()`, `.transform()`, and `.link_from()` are now mutually exclusive on a single record — combining any two now panics at configuration time instead of silently racing on the buffer (last-writer-wins). The check fires from `LinkFromBuilder::finish()` (panic message includes the offending URL), with symmetric defense-in-depth checks added to `TypedRecord::set_producer_service`, `set_transform`, and `add_inbound_connector`. Multiple `.link_from()` calls on the same record (fan-in) remain permitted.
Comment thread
lxsaah marked this conversation as resolved.
- **`no_std` support for the full Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets. Multi-input join fan-in is no longer hardcoded to `tokio::sync::mpsc`; it uses the runtime-agnostic `JoinFanInRuntime` traits from `aimdb-executor`, implemented by Tokio, Embassy, and WASM adapters.
- **`JoinEventRx`** — type-erased trigger receiver passed to the `on_triggers` handler. Call `.recv().await` in a loop to consume `JoinTrigger` events from all input forwarders.
- **`transform_join` as an inherent method on `RecordRegistrar`** (gated `feature = "alloc"`, `R: JoinFanInRuntime`). Previously only exposed via the `impl_record_registrar_ext!` macro under `feature = "std"`.
Expand Down
142 changes: 142 additions & 0 deletions aimdb-core/src/typed_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,8 @@ where
/// - If no deserializer is provided
/// - If no connector is registered for the URL scheme
/// - If the URL is invalid
/// - If the record already has a `.source()` or `.transform()`
/// (local producer + inbound connector would race as last-writer-wins)
pub fn finish(self) -> &'a mut RecordRegistrar<'a, T, R> {
use crate::connector::{ConnectorUrl, DeserializerKind, InboundConnectorLink};

Expand All @@ -921,6 +923,23 @@ where
);
}

// Mutual exclusion with local producers — both write to the same
// buffer and would race as last-writer-wins. Builder-level check
// surfaces the URL in the message; `add_inbound_connector` enforces
// the same invariant from the other direction.
if self.registrar.rec.has_transform() {
panic!(
"Record already has a .transform(); cannot also have a .link_from() for {}",
self.url
);
}
if self.registrar.rec.has_producer_service() {
panic!(
"Record already has a .source(); cannot also have a .link_from() for {}",
self.url
);
}

// Resolve deserializer variant (mutually exclusive)
let deser_kind = if let Some(ctx_deser) = self.context_deserializer {
DeserializerKind::Context(ctx_deser)
Expand Down Expand Up @@ -1445,4 +1464,127 @@ mod tests {
// No serializer set — should panic
reg.link_to("mqtt://broker/topic").finish();
}

// ====================================================================
// Writer-exclusivity tests (.source / .transform / .link_from)
// ====================================================================

/// Helper: build a `TransformDescriptor` with a no-op spawn function.
fn dummy_transform_descriptor() -> crate::transform::TransformDescriptor<TestRecord, MockRuntime>
{
crate::transform::TransformDescriptor::<TestRecord, MockRuntime> {
input_keys: vec![],
spawn_fn: Box::new(|_p, _db, _ctx| Box::pin(async {})),
}
}

#[test]
#[should_panic(
expected = "Record already has a .source(); cannot also have a .link_from() for mqtt://broker/topic"
)]
fn link_from_after_source_panics() {
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
rec.set_buffer(Box::new(MockBuffer));
rec.set_producer_service(|_p, _ctx| async move {});

let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
vec![Box::new(MockConnectorBuilder {
scheme: "mqtt".to_string(),
})];
let extensions = crate::extensions::Extensions::new();

let mut reg = make_registrar(&mut rec, &builders, &extensions);
reg.link_from("mqtt://broker/topic")
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
.finish();
}

#[test]
#[should_panic(
expected = "Record already has a .transform(); cannot also have a .link_from() for mqtt://broker/topic"
)]
fn link_from_after_transform_panics() {
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
rec.set_buffer(Box::new(MockBuffer));
rec.set_transform(dummy_transform_descriptor());

let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
vec![Box::new(MockConnectorBuilder {
scheme: "mqtt".to_string(),
})];
let extensions = crate::extensions::Extensions::new();

let mut reg = make_registrar(&mut rec, &builders, &extensions);
reg.link_from("mqtt://broker/topic")
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
.finish();
}

#[test]
#[should_panic(expected = "Record already has a .link_from(); cannot also have a .source().")]
fn source_after_link_from_panics() {
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
rec.set_buffer(Box::new(MockBuffer));

let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
vec![Box::new(MockConnectorBuilder {
scheme: "mqtt".to_string(),
})];
let extensions = crate::extensions::Extensions::new();
{
let mut reg = make_registrar(&mut rec, &builders, &extensions);
reg.link_from("mqtt://broker/topic")
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
.finish();
}

rec.set_producer_service(|_p, _ctx| async move {});
}

#[test]
#[should_panic(
expected = "Record already has a .link_from(); cannot also have a .transform()."
)]
fn transform_after_link_from_panics() {
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
rec.set_buffer(Box::new(MockBuffer));

let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
vec![Box::new(MockConnectorBuilder {
scheme: "mqtt".to_string(),
})];
let extensions = crate::extensions::Extensions::new();
{
let mut reg = make_registrar(&mut rec, &builders, &extensions);
reg.link_from("mqtt://broker/topic")
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
.finish();
}

rec.set_transform(dummy_transform_descriptor());
}

#[test]
fn multiple_link_from_allowed() {
let mut rec = crate::typed_record::TypedRecord::<TestRecord, MockRuntime>::new();
rec.set_buffer(Box::new(MockBuffer));

let builders: Vec<Box<dyn crate::connector::ConnectorBuilder<MockRuntime>>> =
vec![Box::new(MockConnectorBuilder {
scheme: "mqtt".to_string(),
})];
let extensions = crate::extensions::Extensions::new();
let mut reg = make_registrar(&mut rec, &builders, &extensions);

// Chain via finish() → &mut RecordRegistrar — the registrar's
// lifetime only permits one borrow chain at a time.
reg.link_from("mqtt://broker/topic-a")
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
.finish()
.link_from("mqtt://broker/topic-b")
.with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 }))
.finish();

assert_eq!(rec.inbound_connectors().len(), 2);
}
}
40 changes: 37 additions & 3 deletions aimdb-core/src/typed_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
/// Long-running task that generates data via `producer.produce()`. Auto-spawned during `build()`.
///
/// # Panics
/// Panics if producer already set (one producer per record).
/// Panics if producer already set (one producer per record), if a transform is registered,
/// or if a `.link_from()` inbound connector is registered (all three would race on the buffer).
pub fn set_producer_service<F, Fut>(&mut self, f: F)
where
F: FnOnce(crate::Producer<T, R>, Arc<dyn Any + Send + Sync>) -> Fut + Send + Sync + 'static,
Expand All @@ -639,6 +640,10 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
panic!("Record already has a .transform(); cannot also have a .source().");
}

if !self.inbound_connectors.is_empty() {
panic!("Record already has a .link_from(); cannot also have a .source().");
}

// Check if already set
#[cfg(feature = "std")]
let already_set = self.producer_service.lock().unwrap().is_some();
Expand Down Expand Up @@ -711,8 +716,10 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type

/// Sets the transform descriptor for this record.
///
/// A transform is mutually exclusive with a `.source()` — a record cannot
/// have both. Panics if a source or transform is already registered.
/// A transform is mutually exclusive with `.source()` and with any
/// `.link_from()` inbound connector — all three write to the same buffer
/// and would race as last-writer-wins. Panics if any of those are
/// already registered, or if a transform is already set.
pub(crate) fn set_transform(
&mut self,
descriptor: crate::transform::TransformDescriptor<T, R>,
Expand All @@ -727,6 +734,10 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
panic!("Record already has a .source(); cannot also have a .transform().");
}

if !self.inbound_connectors.is_empty() {
panic!("Record already has a .link_from(); cannot also have a .transform().");
}

#[cfg(feature = "std")]
let mut slot = self.transform.lock().unwrap();
#[cfg(not(feature = "std"))]
Expand Down Expand Up @@ -1061,7 +1072,30 @@ impl<T: Send + 'static + Debug + Clone, R: aimdb_executor::Spawn + 'static> Type
/// Adds an inbound connector link (External → AimDB)
///
/// Called by `.link_from()` builder API during record configuration.
///
/// # Panics
/// Panics if a `.source()` or `.transform()` is already registered.
/// All three write to the same buffer and would race as last-writer-wins.
/// Multiple inbound connectors on the same record are permitted (fan-in).
pub fn add_inbound_connector(&mut self, link: crate::connector::InboundConnectorLink) {
#[cfg(feature = "std")]
let has_source = self.producer_service.lock().unwrap().is_some();
#[cfg(not(feature = "std"))]
let has_source = self.producer_service.lock().is_some();

if has_source {
panic!("Record already has a .source(); cannot also have a .link_from().");
}

#[cfg(feature = "std")]
let has_transform = self.transform.lock().unwrap().is_some();
#[cfg(not(feature = "std"))]
let has_transform = self.transform.lock().is_some();

if has_transform {
panic!("Record already has a .transform(); cannot also have a .link_from().");
}

self.inbound_connectors.push(link);
}

Expand Down