From 49ab1853663d7f77b9b086e0802cbd8b96cd6d08 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Mon, 27 Apr 2026 07:04:06 -0500 Subject: [PATCH 1/4] refactor: change coroutine to use a per-message handler; rename receiver-style to *_with_receiver --- src/lib.rs | 83 +++++++++++++++++++++++++++++++++++++++----- src/rt/threadpool.rs | 35 ++++++++----------- src/rt/tokio.rs | 35 ++++++++----------- src/task.rs | 68 +++++++++++++++++++++++++++++------- 4 files changed, 158 insertions(+), 63 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b4b517b..bab3504 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ pub mod rc; use std::fmt::{Debug, Formatter}; -use futures::SinkExt; +use futures::{SinkExt, StreamExt}; use futures::channel::mpsc::{Receiver, UnboundedReceiver}; use futures::future::{AbortHandle, Aborted}; use std::future::Future; @@ -365,18 +365,19 @@ pub trait Executor { self.spawn(future); } - /// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). + /// Spawns a new asynchronous task that accepts messages to the task. /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. fn spawn_coroutine(&self, f: F) -> CommunicationTask where - F: FnMut(Receiver) -> Fut, + F: FnMut(T) -> Fut + Send + 'static, Fut: Future + Send + 'static, + T: Send + 'static, { Self::spawn_coroutine_with_buffer(self, 1, f) } - /// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc). + /// Spawns a new asynchronous task that accepts messages to the task with a set buffer. /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. fn spawn_coroutine_with_buffer( @@ -384,6 +385,63 @@ pub trait Executor { buffer: usize, mut f: F, ) -> CommunicationTask + where + F: FnMut(T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + T: Send + 'static, + { + let (tx, mut rx) = futures::channel::mpsc::channel(buffer); + let _task_handle = self.spawn_abortable(async move { + while let Some(msg) = rx.next().await { + f(msg).await; + } + }); + CommunicationTask { + _task_handle, + _channel_tx: tx, + } + } + + /// Spawns a new asynchronous task that accepts unbounded messages to the task. + /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all + /// (in other words, all handles are dropped), the task would be aborted. + fn spawn_unbounded_coroutine(&self, mut f: F) -> UnboundedCommunicationTask + where + F: FnMut(T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + T: Send + 'static, + { + let (tx, mut rx) = futures::channel::mpsc::unbounded(); + let _task_handle = self.spawn_abortable(async move { + while let Some(msg) = rx.next().await { + f(msg).await; + } + }); + UnboundedCommunicationTask { + _task_handle, + _channel_tx: tx, + } + } + + /// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). + /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all + /// (in other words, all handles are dropped), the task would be aborted. + fn spawn_coroutine_with_receiver(&self, f: F) -> CommunicationTask + where + F: FnMut(Receiver) -> Fut, + Fut: Future + Send + 'static, + { + Self::spawn_coroutine_with_receiver_and_buffer(self, 1, f) + } + + /// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc). + /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all + /// (in other words, all handles are dropped), the task would be aborted. + fn spawn_coroutine_with_receiver_and_buffer( + &self, + buffer: usize, + mut f: F, + ) -> CommunicationTask where F: FnMut(Receiver) -> Fut, Fut: Future + Send + 'static, @@ -400,18 +458,22 @@ pub trait Executor { /// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. - fn spawn_coroutine_with_context(&self, context: C, f: F) -> CommunicationTask + fn spawn_coroutine_with_receiver_and_context( + &self, + context: C, + f: F, + ) -> CommunicationTask where F: FnMut(C, Receiver) -> Fut, Fut: Future + Send + 'static, { - Self::spawn_coroutine_with_buffer_and_context(self, context, 1, f) + Self::spawn_coroutine_with_receiver_buffer_and_context(self, context, 1, f) } /// Spawns a new asynchronous task with a set channel buffer and provided context that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. - fn spawn_coroutine_with_buffer_and_context( + fn spawn_coroutine_with_receiver_buffer_and_context( &self, context: C, buffer: usize, @@ -433,7 +495,10 @@ pub trait Executor { /// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. - fn spawn_unbounded_coroutine(&self, mut f: F) -> UnboundedCommunicationTask + fn spawn_unbounded_coroutine_with_receiver( + &self, + mut f: F, + ) -> UnboundedCommunicationTask where F: FnMut(UnboundedReceiver) -> Fut, Fut: Future + Send + 'static, @@ -450,7 +515,7 @@ pub trait Executor { /// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. - fn spawn_unbounded_coroutine_with_context( + fn spawn_unbounded_coroutine_with_receiver_and_context( &self, context: C, mut f: F, diff --git a/src/rt/threadpool.rs b/src/rt/threadpool.rs index 3069ab4..140aa23 100644 --- a/src/rt/threadpool.rs +++ b/src/rt/threadpool.rs @@ -90,19 +90,16 @@ mod tests { #[test] fn task_coroutine() { - use futures::stream::StreamExt; let executor = ThreadPoolExecutor::default(); enum Message { Send(String, futures::channel::oneshot::Sender), } - let mut task = executor.spawn_coroutine(|mut rx: Receiver| async move { - while let Some(msg) = rx.next().await { - match msg { - Message::Send(msg, sender) => { - sender.send(msg).unwrap(); - } + let mut task = executor.spawn_coroutine(|msg: Message| async move { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); } } }); @@ -117,7 +114,7 @@ mod tests { } #[test] - fn task_coroutine_with_context() { + fn task_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; let executor = ThreadPoolExecutor::default(); @@ -131,7 +128,7 @@ mod tests { Get(futures::channel::oneshot::Sender), } - let mut task = executor.spawn_coroutine_with_context( + let mut task = executor.spawn_coroutine_with_receiver_and_context( State::default(), |mut state, mut rx: Receiver| async move { while let Some(msg) = rx.next().await { @@ -161,23 +158,19 @@ mod tests { #[test] fn task_unbounded_coroutine() { - use futures::stream::StreamExt; let executor = ThreadPoolExecutor::default(); enum Message { Send(String, futures::channel::oneshot::Sender), } - let mut task = - executor.spawn_unbounded_coroutine(|mut rx: UnboundedReceiver| async move { - while let Some(msg) = rx.next().await { - match msg { - Message::Send(msg, sender) => { - sender.send(msg).unwrap(); - } - } + let mut task = executor.spawn_unbounded_coroutine(|msg: Message| async move { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); } - }); + } + }); let (tx, rx) = futures::channel::oneshot::channel::(); let msg = Message::Send("Hello".into(), tx); @@ -189,7 +182,7 @@ mod tests { } #[test] - fn task_unbounded_coroutine_with_context() { + fn task_unbounded_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; let executor = ThreadPoolExecutor::default(); @@ -203,7 +196,7 @@ mod tests { Get(futures::channel::oneshot::Sender), } - let mut task = executor.spawn_unbounded_coroutine_with_context( + let mut task = executor.spawn_unbounded_coroutine_with_receiver_and_context( State::default(), |mut state, mut rx: UnboundedReceiver| async move { while let Some(msg) = rx.next().await { diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index 0e7310b..94f7c69 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -112,19 +112,16 @@ mod tests { #[tokio::test] async fn task_coroutine() { - use futures::stream::StreamExt; let executor = TokioExecutor; enum Message { Send(String, futures::channel::oneshot::Sender), } - let mut task = executor.spawn_coroutine(|mut rx: Receiver| async move { - while let Some(msg) = rx.next().await { - match msg { - Message::Send(msg, sender) => { - sender.send(msg).unwrap(); - } + let mut task = executor.spawn_coroutine(|msg: Message| async move { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); } } }); @@ -138,7 +135,7 @@ mod tests { } #[tokio::test] - async fn task_coroutine_with_context() { + async fn task_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; let executor = TokioExecutor; @@ -152,7 +149,7 @@ mod tests { Get(futures::channel::oneshot::Sender), } - let mut task = executor.spawn_coroutine_with_context( + let mut task = executor.spawn_coroutine_with_receiver_and_context( State::default(), |mut state, mut rx: Receiver| async move { while let Some(msg) = rx.next().await { @@ -180,23 +177,19 @@ mod tests { #[tokio::test] async fn task_unbounded_coroutine() { - use futures::stream::StreamExt; let executor = TokioExecutor; enum Message { Send(String, futures::channel::oneshot::Sender), } - let mut task = - executor.spawn_unbounded_coroutine(|mut rx: UnboundedReceiver| async move { - while let Some(msg) = rx.next().await { - match msg { - Message::Send(msg, sender) => { - sender.send(msg).unwrap(); - } - } + let mut task = executor.spawn_unbounded_coroutine(|msg: Message| async move { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); } - }); + } + }); let (tx, rx) = futures::channel::oneshot::channel::(); let msg = Message::Send("Hello".into(), tx); @@ -207,7 +200,7 @@ mod tests { } #[tokio::test] - async fn task_unbounded_coroutine_with_context() { + async fn task_unbounded_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; let executor = TokioExecutor; @@ -221,7 +214,7 @@ mod tests { Get(futures::channel::oneshot::Sender), } - let mut task = executor.spawn_unbounded_coroutine_with_context( + let mut task = executor.spawn_unbounded_coroutine_with_receiver_and_context( State::default(), |mut state, mut rx: UnboundedReceiver| async move { while let Some(msg) = rx.next().await { diff --git a/src/task.rs b/src/task.rs index 7a827d6..e073f29 100644 --- a/src/task.rs +++ b/src/task.rs @@ -50,43 +50,85 @@ where EXECUTOR.dispatch(future); } -/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). +/// Spawns a new asynchronous task that accepts messages to the task. /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. pub fn spawn_coroutine(f: F) -> CommunicationTask where - F: FnMut(Receiver) -> Fut, + F: FnMut(T) -> Fut + Send + 'static, Fut: Future + Send + 'static, + T: Send + 'static, { EXECUTOR.spawn_coroutine(f) } -/// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc). +/// Spawns a new asynchronous task that accepts messages to the task with a set buffer. /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. pub fn spawn_coroutine_with_buffer(buffer: usize, f: F) -> CommunicationTask where - F: FnMut(Receiver) -> Fut, + F: FnMut(T) -> Fut + Send + 'static, Fut: Future + Send + 'static, + T: Send + 'static, { EXECUTOR.spawn_coroutine_with_buffer(buffer, f) } +/// Spawns a new asynchronous task that accepts unbounded messages to the task. +/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all +/// (in other words, all handles are dropped), the task would be aborted. +pub fn spawn_unbounded_coroutine(f: F) -> UnboundedCommunicationTask +where + F: FnMut(T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + T: Send + 'static, +{ + EXECUTOR.spawn_unbounded_coroutine(f) +} + +/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). +/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all +/// (in other words, all handles are dropped), the task would be aborted. +pub fn spawn_coroutine_with_receiver(f: F) -> CommunicationTask +where + F: FnMut(Receiver) -> Fut, + Fut: Future + Send + 'static, +{ + EXECUTOR.spawn_coroutine_with_receiver(f) +} + +/// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc). +/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all +/// (in other words, all handles are dropped), the task would be aborted. +pub fn spawn_coroutine_with_receiver_and_buffer( + buffer: usize, + f: F, +) -> CommunicationTask +where + F: FnMut(Receiver) -> Fut, + Fut: Future + Send + 'static, +{ + EXECUTOR.spawn_coroutine_with_receiver_and_buffer(buffer, f) +} + /// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. -pub fn spawn_coroutine_with_context(context: C, f: F) -> CommunicationTask +pub fn spawn_coroutine_with_receiver_and_context( + context: C, + f: F, +) -> CommunicationTask where F: FnMut(C, Receiver) -> Fut, Fut: Future + Send + 'static, { - EXECUTOR.spawn_coroutine_with_context(context, f) + EXECUTOR.spawn_coroutine_with_receiver_and_context(context, f) } /// Spawns a new asynchronous task with a set channel buffer and provided context that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. -pub fn spawn_coroutine_with_buffer_and_context( +pub fn spawn_coroutine_with_receiver_buffer_and_context( context: C, buffer: usize, f: F, @@ -95,24 +137,26 @@ where F: FnMut(C, Receiver) -> Fut, Fut: Future + Send + 'static, { - EXECUTOR.spawn_coroutine_with_buffer_and_context(context, buffer, f) + EXECUTOR.spawn_coroutine_with_receiver_buffer_and_context(context, buffer, f) } /// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. -pub fn spawn_unbounded_coroutine(f: F) -> UnboundedCommunicationTask +pub fn spawn_unbounded_coroutine_with_receiver( + f: F, +) -> UnboundedCommunicationTask where F: FnMut(UnboundedReceiver) -> Fut, Fut: Future + Send + 'static, { - EXECUTOR.spawn_unbounded_coroutine(f) + EXECUTOR.spawn_unbounded_coroutine_with_receiver(f) } /// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. -pub fn spawn_unbounded_coroutine_with_context( +pub fn spawn_unbounded_coroutine_with_receiver_and_context( context: C, f: F, ) -> UnboundedCommunicationTask @@ -120,7 +164,7 @@ where F: FnMut(C, UnboundedReceiver) -> Fut, Fut: Future + Send + 'static, { - EXECUTOR.spawn_unbounded_coroutine_with_context(context, f) + EXECUTOR.spawn_unbounded_coroutine_with_receiver_and_context(context, f) } #[derive(Default)] From 0eae9d36ccfb94df11673f2a0dfe87056ea51a44 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Mon, 27 Apr 2026 08:51:10 -0500 Subject: [PATCH 2/4] chore: update test --- src/rt/threadpool.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++ src/rt/tokio.rs | 57 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/src/rt/threadpool.rs b/src/rt/threadpool.rs index 140aa23..a0d0966 100644 --- a/src/rt/threadpool.rs +++ b/src/rt/threadpool.rs @@ -113,6 +113,35 @@ mod tests { }); } + #[test] + fn task_coroutine_with_receiver() { + use futures::stream::StreamExt; + let executor = ThreadPoolExecutor::default(); + + enum Message { + Send(String, futures::channel::oneshot::Sender), + } + + let mut task = + executor.spawn_coroutine_with_receiver(|mut rx: Receiver| async move { + while let Some(msg) = rx.next().await { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); + } + } + } + }); + + let (tx, rx) = futures::channel::oneshot::channel::(); + let msg = Message::Send("Hello".into(), tx); + futures::executor::block_on(async move { + task.send(msg).await.unwrap(); + let resp = rx.await.unwrap(); + assert_eq!(resp, "Hello"); + }); + } + #[test] fn task_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; @@ -181,6 +210,36 @@ mod tests { }); } + #[test] + fn task_unbounded_coroutine_with_receiver() { + use futures::stream::StreamExt; + let executor = ThreadPoolExecutor::default(); + + enum Message { + Send(String, futures::channel::oneshot::Sender), + } + + let mut task = executor.spawn_unbounded_coroutine_with_receiver( + |mut rx: UnboundedReceiver| async move { + while let Some(msg) = rx.next().await { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); + } + } + } + }, + ); + + let (tx, rx) = futures::channel::oneshot::channel::(); + let msg = Message::Send("Hello".into(), tx); + futures::executor::block_on(async move { + task.send(msg).unwrap(); + let resp = rx.await.unwrap(); + assert_eq!(resp, "Hello"); + }); + } + #[test] fn task_unbounded_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index 94f7c69..543aaa6 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -134,6 +134,34 @@ mod tests { assert_eq!(resp, "Hello"); } + #[tokio::test] + async fn task_coroutine_with_receiver() { + use futures::stream::StreamExt; + let executor = TokioExecutor; + + enum Message { + Send(String, futures::channel::oneshot::Sender), + } + + let mut task = + executor.spawn_coroutine_with_receiver(|mut rx: Receiver| async move { + while let Some(msg) = rx.next().await { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); + } + } + } + }); + + let (tx, rx) = futures::channel::oneshot::channel::(); + let msg = Message::Send("Hello".into(), tx); + + task.send(msg).await.unwrap(); + let resp = rx.await.unwrap(); + assert_eq!(resp, "Hello"); + } + #[tokio::test] async fn task_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; @@ -199,6 +227,35 @@ mod tests { assert_eq!(resp, "Hello"); } + #[tokio::test] + async fn task_unbounded_coroutine_with_receiver() { + use futures::stream::StreamExt; + let executor = TokioExecutor; + + enum Message { + Send(String, futures::channel::oneshot::Sender), + } + + let mut task = executor.spawn_unbounded_coroutine_with_receiver( + |mut rx: UnboundedReceiver| async move { + while let Some(msg) = rx.next().await { + match msg { + Message::Send(msg, sender) => { + sender.send(msg).unwrap(); + } + } + } + }, + ); + + let (tx, rx) = futures::channel::oneshot::channel::(); + let msg = Message::Send("Hello".into(), tx); + + task.send(msg).unwrap(); + let resp = rx.await.unwrap(); + assert_eq!(resp, "Hello"); + } + #[tokio::test] async fn task_unbounded_coroutine_with_receiver_and_context() { use futures::stream::StreamExt; From 0a2fd7335827587749083f68d880378b33c9c55d Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Tue, 2 Jun 2026 07:47:40 -0500 Subject: [PATCH 3/4] chore: chore Add Executor::spawn_coroutine_with_context --- src/lib.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++ src/rt/threadpool.rs | 54 +++++++++++++++++++++++++++++++ src/rt/tokio.rs | 50 +++++++++++++++++++++++++++++ src/task.rs | 50 +++++++++++++++++++++++++++++ 4 files changed, 230 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index bab3504..625a4c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -423,6 +423,82 @@ pub trait Executor { } } + /// Spawns a new asynchronous task with provided context that accepts messages to the task. + /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all + /// (in other words, all handles are dropped), the task would be aborted. + /// + /// # Note + /// If state must be borrowed across awaits, + /// use [`Executor::spawn_coroutine_with_receiver_and_context`]. + fn spawn_coroutine_with_context( + &self, + context: C, + f: F, + ) -> CommunicationTask + where + F: FnMut(&mut C, T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + C: Send + 'static, + T: Send + 'static, + { + Self::spawn_coroutine_with_buffer_and_context(self, context, 1, f) + } + + /// Spawns a new asynchronous task with provided context that accepts messages to the task with a set buffer. + /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all + /// (in other words, all handles are dropped), the task would be aborted. + fn spawn_coroutine_with_buffer_and_context( + &self, + context: C, + buffer: usize, + mut f: F, + ) -> CommunicationTask + where + F: FnMut(&mut C, T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + C: Send + 'static, + T: Send + 'static, + { + let (tx, mut rx) = futures::channel::mpsc::channel(buffer); + let _task_handle = self.spawn_abortable(async move { + let mut context = context; + while let Some(msg) = rx.next().await { + f(&mut context, msg).await; + } + }); + CommunicationTask { + _task_handle, + _channel_tx: tx, + } + } + + /// Spawns a new asynchronous task with provided context that accepts unbounded messages to the task. + /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all + /// (in other words, all handles are dropped), the task would be aborted. + fn spawn_unbounded_coroutine_with_context( + &self, + context: C, + mut f: F, + ) -> UnboundedCommunicationTask + where + F: FnMut(&mut C, T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + C: Send + 'static, + T: Send + 'static, + { + let (tx, mut rx) = futures::channel::mpsc::unbounded(); + let _task_handle = self.spawn_abortable(async move { + let mut context = context; + while let Some(msg) = rx.next().await { + f(&mut context, msg).await; + } + }); + UnboundedCommunicationTask { + _task_handle, + _channel_tx: tx, + } + } + /// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. diff --git a/src/rt/threadpool.rs b/src/rt/threadpool.rs index a0d0966..c79a0f7 100644 --- a/src/rt/threadpool.rs +++ b/src/rt/threadpool.rs @@ -113,6 +113,33 @@ mod tests { }); } + #[test] + fn task_coroutine_with_context() { + let executor = ThreadPoolExecutor::default(); + + type Resp = futures::channel::oneshot::Sender; + + let mut task = executor.spawn_coroutine_with_context( + 0usize, + |counter: &mut usize, resp: Resp| { + *counter += 1; + let n = *counter; + async move { + resp.send(n).unwrap(); + } + }, + ); + + let (tx1, rx1) = futures::channel::oneshot::channel::(); + let (tx2, rx2) = futures::channel::oneshot::channel::(); + futures::executor::block_on(async move { + task.send(tx1).await.unwrap(); + task.send(tx2).await.unwrap(); + assert_eq!(rx1.await.unwrap(), 1); + assert_eq!(rx2.await.unwrap(), 2); + }); + } + #[test] fn task_coroutine_with_receiver() { use futures::stream::StreamExt; @@ -210,6 +237,33 @@ mod tests { }); } + #[test] + fn task_unbounded_coroutine_with_context() { + let executor = ThreadPoolExecutor::default(); + + type Resp = futures::channel::oneshot::Sender; + + let mut task = executor.spawn_unbounded_coroutine_with_context( + 0usize, + |counter: &mut usize, resp: Resp| { + *counter += 1; + let n = *counter; + async move { + resp.send(n).unwrap(); + } + }, + ); + + let (tx1, rx1) = futures::channel::oneshot::channel::(); + let (tx2, rx2) = futures::channel::oneshot::channel::(); + futures::executor::block_on(async move { + task.send(tx1).unwrap(); + task.send(tx2).unwrap(); + assert_eq!(rx1.await.unwrap(), 1); + assert_eq!(rx2.await.unwrap(), 2); + }); + } + #[test] fn task_unbounded_coroutine_with_receiver() { use futures::stream::StreamExt; diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index 543aaa6..94f8448 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -134,6 +134,31 @@ mod tests { assert_eq!(resp, "Hello"); } + #[tokio::test] + async fn task_coroutine_with_context() { + let executor = TokioExecutor; + + type Resp = futures::channel::oneshot::Sender; + + let mut task = executor.spawn_coroutine_with_context( + 0usize, + |counter: &mut usize, resp: Resp| { + *counter += 1; + let n = *counter; + async move { + resp.send(n).unwrap(); + } + }, + ); + + let (tx1, rx1) = futures::channel::oneshot::channel::(); + let (tx2, rx2) = futures::channel::oneshot::channel::(); + task.send(tx1).await.unwrap(); + task.send(tx2).await.unwrap(); + assert_eq!(rx1.await.unwrap(), 1); + assert_eq!(rx2.await.unwrap(), 2); + } + #[tokio::test] async fn task_coroutine_with_receiver() { use futures::stream::StreamExt; @@ -227,6 +252,31 @@ mod tests { assert_eq!(resp, "Hello"); } + #[tokio::test] + async fn task_unbounded_coroutine_with_context() { + let executor = TokioExecutor; + + type Resp = futures::channel::oneshot::Sender; + + let mut task = executor.spawn_unbounded_coroutine_with_context( + 0usize, + |counter: &mut usize, resp: Resp| { + *counter += 1; + let n = *counter; + async move { + resp.send(n).unwrap(); + } + }, + ); + + let (tx1, rx1) = futures::channel::oneshot::channel::(); + let (tx2, rx2) = futures::channel::oneshot::channel::(); + task.send(tx1).unwrap(); + task.send(tx2).unwrap(); + assert_eq!(rx1.await.unwrap(), 1); + assert_eq!(rx2.await.unwrap(), 2); + } + #[tokio::test] async fn task_unbounded_coroutine_with_receiver() { use futures::stream::StreamExt; diff --git a/src/task.rs b/src/task.rs index e073f29..240991f 100644 --- a/src/task.rs +++ b/src/task.rs @@ -86,6 +86,56 @@ where EXECUTOR.spawn_unbounded_coroutine(f) } +/// Spawns a new asynchronous task with provided context that accepts messages to the task. +/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all +/// (in other words, all handles are dropped), the task would be aborted. +/// +/// # Note +/// If state must be borrowed across awaits, +/// use [`spawn_coroutine_with_receiver_and_context`]. +pub fn spawn_coroutine_with_context(context: C, f: F) -> CommunicationTask +where + F: FnMut(&mut C, T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + C: Send + 'static, + T: Send + 'static, +{ + EXECUTOR.spawn_coroutine_with_context(context, f) +} + +/// Spawns a new asynchronous task with provided context that accepts messages to the task with a set buffer. +/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all +/// (in other words, all handles are dropped), the task would be aborted. +pub fn spawn_coroutine_with_buffer_and_context( + context: C, + buffer: usize, + f: F, +) -> CommunicationTask +where + F: FnMut(&mut C, T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + C: Send + 'static, + T: Send + 'static, +{ + EXECUTOR.spawn_coroutine_with_buffer_and_context(context, buffer, f) +} + +/// Spawns a new asynchronous task with provided context that accepts unbounded messages to the task. +/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all +/// (in other words, all handles are dropped), the task would be aborted. +pub fn spawn_unbounded_coroutine_with_context( + context: C, + f: F, +) -> UnboundedCommunicationTask +where + F: FnMut(&mut C, T) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + C: Send + 'static, + T: Send + 'static, +{ + EXECUTOR.spawn_unbounded_coroutine_with_context(context, f) +} + /// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. From f17b41205c1976dd6a76e6102483b5e9e0148571 Mon Sep 17 00:00:00 2001 From: Darius Clark Date: Tue, 2 Jun 2026 08:16:34 -0500 Subject: [PATCH 4/4] chore: fmt --- src/lib.rs | 8 ++------ src/rt/threadpool.rs | 8 +++----- src/rt/tokio.rs | 8 +++----- src/task.rs | 4 +--- 4 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 625a4c8..6206a96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,9 +10,9 @@ pub mod rc; use std::fmt::{Debug, Formatter}; -use futures::{SinkExt, StreamExt}; use futures::channel::mpsc::{Receiver, UnboundedReceiver}; use futures::future::{AbortHandle, Aborted}; +use futures::{SinkExt, StreamExt}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -430,11 +430,7 @@ pub trait Executor { /// # Note /// If state must be borrowed across awaits, /// use [`Executor::spawn_coroutine_with_receiver_and_context`]. - fn spawn_coroutine_with_context( - &self, - context: C, - f: F, - ) -> CommunicationTask + fn spawn_coroutine_with_context(&self, context: C, f: F) -> CommunicationTask where F: FnMut(&mut C, T) -> Fut + Send + 'static, Fut: Future + Send + 'static, diff --git a/src/rt/threadpool.rs b/src/rt/threadpool.rs index c79a0f7..3e544eb 100644 --- a/src/rt/threadpool.rs +++ b/src/rt/threadpool.rs @@ -119,16 +119,14 @@ mod tests { type Resp = futures::channel::oneshot::Sender; - let mut task = executor.spawn_coroutine_with_context( - 0usize, - |counter: &mut usize, resp: Resp| { + let mut task = + executor.spawn_coroutine_with_context(0usize, |counter: &mut usize, resp: Resp| { *counter += 1; let n = *counter; async move { resp.send(n).unwrap(); } - }, - ); + }); let (tx1, rx1) = futures::channel::oneshot::channel::(); let (tx2, rx2) = futures::channel::oneshot::channel::(); diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index 94f8448..1a5d3d5 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -140,16 +140,14 @@ mod tests { type Resp = futures::channel::oneshot::Sender; - let mut task = executor.spawn_coroutine_with_context( - 0usize, - |counter: &mut usize, resp: Resp| { + let mut task = + executor.spawn_coroutine_with_context(0usize, |counter: &mut usize, resp: Resp| { *counter += 1; let n = *counter; async move { resp.send(n).unwrap(); } - }, - ); + }); let (tx1, rx1) = futures::channel::oneshot::channel::(); let (tx2, rx2) = futures::channel::oneshot::channel::(); diff --git a/src/task.rs b/src/task.rs index 240991f..3389b82 100644 --- a/src/task.rs +++ b/src/task.rs @@ -193,9 +193,7 @@ where /// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc). /// This function returns a handle that allows sending a message, or if there is no reference to the handle at all /// (in other words, all handles are dropped), the task would be aborted. -pub fn spawn_unbounded_coroutine_with_receiver( - f: F, -) -> UnboundedCommunicationTask +pub fn spawn_unbounded_coroutine_with_receiver(f: F) -> UnboundedCommunicationTask where F: FnMut(UnboundedReceiver) -> Fut, Fut: Future + Send + 'static,