From fb4b42e54306a34ec3195ca1137521eb04673c12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Fri, 8 May 2026 12:55:47 +0000 Subject: [PATCH 1/3] feat: enforce writer exclusivity for source, transform and link_from in records --- aimdb-core/src/typed_api.rs | 142 +++++++++++++++++++++++++++++++++ aimdb-core/src/typed_record.rs | 40 +++++++++- 2 files changed, 179 insertions(+), 3 deletions(-) diff --git a/aimdb-core/src/typed_api.rs b/aimdb-core/src/typed_api.rs index 0def868d..7d3ae438 100644 --- a/aimdb-core/src/typed_api.rs +++ b/aimdb-core/src/typed_api.rs @@ -905,6 +905,8 @@ where /// - If no deserializer is provided /// - If no connector is registered for the URL scheme /// - If the URL is invalid + /// - If the record already has a `.source()` or `.transform()` + /// (local producer + inbound connector would race as last-writer-wins) pub fn finish(self) -> &'a mut RecordRegistrar<'a, T, R> { use crate::connector::{ConnectorUrl, DeserializerKind, InboundConnectorLink}; @@ -921,6 +923,23 @@ where ); } + // Mutual exclusion with local producers — both write to the same + // buffer and would race as last-writer-wins. Builder-level check + // surfaces the URL in the message; `add_inbound_connector` enforces + // the same invariant from the other direction. + if self.registrar.rec.has_transform() { + panic!( + "Record already has a .transform(); cannot also have a .link_from() for {}", + self.url + ); + } + if self.registrar.rec.has_producer_service() { + panic!( + "Record already has a .source(); cannot also have a .link_from() for {}", + self.url + ); + } + // Resolve deserializer variant (mutually exclusive) let deser_kind = if let Some(ctx_deser) = self.context_deserializer { DeserializerKind::Context(ctx_deser) @@ -1445,4 +1464,127 @@ mod tests { // No serializer set — should panic reg.link_to("mqtt://broker/topic").finish(); } + + // ==================================================================== + // Writer-exclusivity tests (.source / .transform / .link_from) + // ==================================================================== + + /// Helper: build a `TransformDescriptor` with a no-op spawn function. + fn dummy_transform_descriptor() -> crate::transform::TransformDescriptor + { + crate::transform::TransformDescriptor:: { + input_keys: vec![], + spawn_fn: Box::new(|_p, _db, _ctx| Box::pin(async {})), + } + } + + #[test] + #[should_panic( + expected = "Record already has a .source(); cannot also have a .link_from() for mqtt://broker/topic" + )] + fn link_from_after_source_panics() { + let mut rec = crate::typed_record::TypedRecord::::new(); + rec.set_buffer(Box::new(MockBuffer)); + rec.set_producer_service(|_p, _ctx| async move {}); + + let builders: Vec>> = + vec![Box::new(MockConnectorBuilder { + scheme: "mqtt".to_string(), + })]; + let extensions = crate::extensions::Extensions::new(); + + let mut reg = make_registrar(&mut rec, &builders, &extensions); + reg.link_from("mqtt://broker/topic") + .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) + .finish(); + } + + #[test] + #[should_panic( + expected = "Record already has a .transform(); cannot also have a .link_from() for mqtt://broker/topic" + )] + fn link_from_after_transform_panics() { + let mut rec = crate::typed_record::TypedRecord::::new(); + rec.set_buffer(Box::new(MockBuffer)); + rec.set_transform(dummy_transform_descriptor()); + + let builders: Vec>> = + vec![Box::new(MockConnectorBuilder { + scheme: "mqtt".to_string(), + })]; + let extensions = crate::extensions::Extensions::new(); + + let mut reg = make_registrar(&mut rec, &builders, &extensions); + reg.link_from("mqtt://broker/topic") + .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) + .finish(); + } + + #[test] + #[should_panic(expected = "Record already has a .link_from(); cannot also have a .source().")] + fn source_after_link_from_panics() { + let mut rec = crate::typed_record::TypedRecord::::new(); + rec.set_buffer(Box::new(MockBuffer)); + + let builders: Vec>> = + vec![Box::new(MockConnectorBuilder { + scheme: "mqtt".to_string(), + })]; + let extensions = crate::extensions::Extensions::new(); + { + let mut reg = make_registrar(&mut rec, &builders, &extensions); + reg.link_from("mqtt://broker/topic") + .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) + .finish(); + } + + rec.set_producer_service(|_p, _ctx| async move {}); + } + + #[test] + #[should_panic( + expected = "Record already has a .link_from(); cannot also have a .transform()." + )] + fn transform_after_link_from_panics() { + let mut rec = crate::typed_record::TypedRecord::::new(); + rec.set_buffer(Box::new(MockBuffer)); + + let builders: Vec>> = + vec![Box::new(MockConnectorBuilder { + scheme: "mqtt".to_string(), + })]; + let extensions = crate::extensions::Extensions::new(); + { + let mut reg = make_registrar(&mut rec, &builders, &extensions); + reg.link_from("mqtt://broker/topic") + .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) + .finish(); + } + + rec.set_transform(dummy_transform_descriptor()); + } + + #[test] + fn multiple_link_from_allowed() { + let mut rec = crate::typed_record::TypedRecord::::new(); + rec.set_buffer(Box::new(MockBuffer)); + + let builders: Vec>> = + vec![Box::new(MockConnectorBuilder { + scheme: "mqtt".to_string(), + })]; + let extensions = crate::extensions::Extensions::new(); + let mut reg = make_registrar(&mut rec, &builders, &extensions); + + // Chain via finish() → &mut RecordRegistrar — the registrar's + // lifetime only permits one borrow chain at a time. + reg.link_from("mqtt://broker/topic-a") + .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) + .finish() + .link_from("mqtt://broker/topic-b") + .with_deserializer_raw(|_b: &[u8]| Ok(TestRecord { value: 0 })) + .finish(); + + assert_eq!(rec.inbound_connectors().len(), 2); + } } diff --git a/aimdb-core/src/typed_record.rs b/aimdb-core/src/typed_record.rs index 9aadf593..82375776 100644 --- a/aimdb-core/src/typed_record.rs +++ b/aimdb-core/src/typed_record.rs @@ -623,7 +623,8 @@ impl Type /// Long-running task that generates data via `producer.produce()`. Auto-spawned during `build()`. /// /// # Panics - /// Panics if producer already set (one producer per record). + /// Panics if producer already set (one producer per record), if a transform is registered, + /// or if a `.link_from()` inbound connector is registered (all three would race on the buffer). pub fn set_producer_service(&mut self, f: F) where F: FnOnce(crate::Producer, Arc) -> Fut + Send + Sync + 'static, @@ -639,6 +640,10 @@ impl Type panic!("Record already has a .transform(); cannot also have a .source()."); } + if !self.inbound_connectors.is_empty() { + panic!("Record already has a .link_from(); cannot also have a .source()."); + } + // Check if already set #[cfg(feature = "std")] let already_set = self.producer_service.lock().unwrap().is_some(); @@ -711,8 +716,10 @@ impl Type /// Sets the transform descriptor for this record. /// - /// A transform is mutually exclusive with a `.source()` — a record cannot - /// have both. Panics if a source or transform is already registered. + /// A transform is mutually exclusive with `.source()` and with any + /// `.link_from()` inbound connector — all three write to the same buffer + /// and would race as last-writer-wins. Panics if any of those are + /// already registered, or if a transform is already set. pub(crate) fn set_transform( &mut self, descriptor: crate::transform::TransformDescriptor, @@ -727,6 +734,10 @@ impl Type panic!("Record already has a .source(); cannot also have a .transform()."); } + if !self.inbound_connectors.is_empty() { + panic!("Record already has a .link_from(); cannot also have a .transform()."); + } + #[cfg(feature = "std")] let mut slot = self.transform.lock().unwrap(); #[cfg(not(feature = "std"))] @@ -1061,7 +1072,30 @@ impl Type /// Adds an inbound connector link (External → AimDB) /// /// Called by `.link_from()` builder API during record configuration. + /// + /// # Panics + /// Panics if a `.source()` or `.transform()` is already registered. + /// All three write to the same buffer and would race as last-writer-wins. + /// Multiple inbound connectors on the same record are permitted (fan-in). pub fn add_inbound_connector(&mut self, link: crate::connector::InboundConnectorLink) { + #[cfg(feature = "std")] + let has_source = self.producer_service.lock().unwrap().is_some(); + #[cfg(not(feature = "std"))] + let has_source = self.producer_service.lock().is_some(); + + if has_source { + panic!("Record already has a .source(); cannot also have a .link_from()."); + } + + #[cfg(feature = "std")] + let has_transform = self.transform.lock().unwrap().is_some(); + #[cfg(not(feature = "std"))] + let has_transform = self.transform.lock().is_some(); + + if has_transform { + panic!("Record already has a .transform(); cannot also have a .link_from()."); + } + self.inbound_connectors.push(link); } From 2f27e058ededc3b02506f1d76107a255672b09ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Fri, 8 May 2026 19:35:49 +0000 Subject: [PATCH 2/3] chore: update subproject commit for embassy --- _external/embassy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_external/embassy b/_external/embassy index d965ef3b..f8fd6ec9 160000 --- a/_external/embassy +++ b/_external/embassy @@ -1 +1 @@ -Subproject commit d965ef3b12c048811df2f1faed463bb70742705c +Subproject commit f8fd6ec924b7d73fcee9ce8e97e1dba47200dac2 From c4c026f44db57a4f1a50d2d224bf618207d216fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Schn=C3=B6rch?= Date: Fri, 8 May 2026 19:39:03 +0000 Subject: [PATCH 3/3] feat: add writer-exclusivity validation for `.link_from()` in records --- CHANGELOG.md | 1 + aimdb-core/CHANGELOG.md | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd175855..c3d98fd1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Writer-exclusivity validation (Issue #89)**: Combining `.source()`, `.transform()`, and `.link_from()` on the same record now panics at configuration time with a clear message instead of silently producing a last-writer-wins race on the buffer. Multiple `.link_from()` inbound connectors (fan-in) remain allowed. ([aimdb-core](aimdb-core/CHANGELOG.md)) - **`no_std` Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets — no longer Tokio-only. Multi-input join fan-in moved out of `aimdb-core` into the new `JoinFanInRuntime` traits in `aimdb-executor`, with implementations in the Tokio (`mpsc::channel`, capacity 64), Embassy (`embassy_sync::Channel`, capacity 8), and WASM (`futures_channel::mpsc`, capacity 64) adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md)) - **Task-model join handler**: New `JoinBuilder::on_triggers(FnOnce(JoinEventRx, Producer) -> impl Future)` API replaces the previous callback model. Eliminates per-event heap allocation and lets handler state borrow across `.await` points. **Breaking change** vs. the old `with_state().on_trigger(...)` form — see [aimdb-core](aimdb-core/CHANGELOG.md). - **Weather-mesh `DewPoint` demo**: All three weather stations (alpha, beta, gamma) now derive a `DewPoint` record from `Temperature` and `Humidity` via `transform_join`, demonstrating the API end-to-end on Tokio and Embassy. diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index f5367f56..752ac1ec 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Writer-exclusivity validation for `.link_from()` (Issue #89)**: `.source()`, `.transform()`, and `.link_from()` are now mutually exclusive on a single record — combining any two now panics at configuration time instead of silently racing on the buffer (last-writer-wins). The check fires from `LinkFromBuilder::finish()` (panic message includes the offending URL), with symmetric defense-in-depth checks added to `TypedRecord::set_producer_service`, `set_transform`, and `add_inbound_connector`. Multiple `.link_from()` calls on the same record (fan-in) remain permitted. - **`no_std` support for the full Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets. Multi-input join fan-in is no longer hardcoded to `tokio::sync::mpsc`; it uses the runtime-agnostic `JoinFanInRuntime` traits from `aimdb-executor`, implemented by Tokio, Embassy, and WASM adapters. - **`JoinEventRx`** — type-erased trigger receiver passed to the `on_triggers` handler. Call `.recv().await` in a loop to consume `JoinTrigger` events from all input forwarders. - **`transform_join` as an inherent method on `RecordRegistrar`** (gated `feature = "alloc"`, `R: JoinFanInRuntime`). Previously only exposed via the `impl_record_registrar_ext!` macro under `feature = "std"`.