Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 146 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ pub mod rc;

use std::fmt::{Debug, Formatter};

use futures::SinkExt;
use futures::channel::mpsc::{Receiver, UnboundedReceiver};
use futures::future::{AbortHandle, Aborted};
use futures::{SinkExt, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -365,25 +365,155 @@ pub trait Executor {
self.spawn(future);
}

/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// Spawns a new asynchronous task that accepts messages to the task.
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_coroutine<T, F, Fut>(&self, f: F) -> CommunicationTask<T>
where
F: FnMut(Receiver<T>) -> Fut,
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
T: Send + 'static,
{
Self::spawn_coroutine_with_buffer(self, 1, f)
}

/// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// Spawns a new asynchronous task that accepts messages to the task with a set buffer.
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_coroutine_with_buffer<T, F, Fut>(
&self,
buffer: usize,
mut f: F,
) -> CommunicationTask<T>
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
T: Send + 'static,
{
let (tx, mut rx) = futures::channel::mpsc::channel(buffer);
let _task_handle = self.spawn_abortable(async move {
while let Some(msg) = rx.next().await {
f(msg).await;
}
});
CommunicationTask {
_task_handle,
_channel_tx: tx,
}
}

/// Spawns a new asynchronous task that accepts unbounded messages to the task.
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_unbounded_coroutine<T, F, Fut>(&self, mut f: F) -> UnboundedCommunicationTask<T>
where
F: FnMut(T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
T: Send + 'static,
{
let (tx, mut rx) = futures::channel::mpsc::unbounded();
let _task_handle = self.spawn_abortable(async move {
while let Some(msg) = rx.next().await {
f(msg).await;
}
});
UnboundedCommunicationTask {
_task_handle,
_channel_tx: tx,
}
}

/// Spawns a new asynchronous task with provided context that accepts messages to the task.
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
///
/// # Note
/// If state must be borrowed across awaits,
/// use [`Executor::spawn_coroutine_with_receiver_and_context`].
fn spawn_coroutine_with_context<T, C, F, Fut>(&self, context: C, f: F) -> CommunicationTask<T>
where
F: FnMut(&mut C, T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
C: Send + 'static,
T: Send + 'static,
{
Self::spawn_coroutine_with_buffer_and_context(self, context, 1, f)
}

/// Spawns a new asynchronous task with provided context that accepts messages to the task with a set buffer.
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_coroutine_with_buffer_and_context<T, C, F, Fut>(
&self,
context: C,
buffer: usize,
mut f: F,
) -> CommunicationTask<T>
where
F: FnMut(&mut C, T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
C: Send + 'static,
T: Send + 'static,
{
let (tx, mut rx) = futures::channel::mpsc::channel(buffer);
let _task_handle = self.spawn_abortable(async move {
let mut context = context;
while let Some(msg) = rx.next().await {
f(&mut context, msg).await;
}
});
CommunicationTask {
_task_handle,
_channel_tx: tx,
}
}

/// Spawns a new asynchronous task with provided context that accepts unbounded messages to the task.
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_unbounded_coroutine_with_context<T, C, F, Fut>(
&self,
context: C,
mut f: F,
) -> UnboundedCommunicationTask<T>
where
F: FnMut(&mut C, T) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
C: Send + 'static,
T: Send + 'static,
{
let (tx, mut rx) = futures::channel::mpsc::unbounded();
let _task_handle = self.spawn_abortable(async move {
let mut context = context;
while let Some(msg) = rx.next().await {
f(&mut context, msg).await;
}
});
UnboundedCommunicationTask {
_task_handle,
_channel_tx: tx,
}
}

/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_coroutine_with_receiver<T, F, Fut>(&self, f: F) -> CommunicationTask<T>
where
F: FnMut(Receiver<T>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
Self::spawn_coroutine_with_receiver_and_buffer(self, 1, f)
}

/// Spawns a new asynchronous task with a set channel buffer that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_coroutine_with_receiver_and_buffer<T, F, Fut>(
&self,
buffer: usize,
mut f: F,
) -> CommunicationTask<T>
where
F: FnMut(Receiver<T>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
Expand All @@ -400,18 +530,22 @@ pub trait Executor {
/// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_coroutine_with_context<T, F, C, Fut>(&self, context: C, f: F) -> CommunicationTask<T>
fn spawn_coroutine_with_receiver_and_context<T, F, C, Fut>(
&self,
context: C,
f: F,
) -> CommunicationTask<T>
where
F: FnMut(C, Receiver<T>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
Self::spawn_coroutine_with_buffer_and_context(self, context, 1, f)
Self::spawn_coroutine_with_receiver_buffer_and_context(self, context, 1, f)
}

/// Spawns a new asynchronous task with a set channel buffer and provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_coroutine_with_buffer_and_context<T, F, C, Fut>(
fn spawn_coroutine_with_receiver_buffer_and_context<T, F, C, Fut>(
&self,
context: C,
buffer: usize,
Expand All @@ -433,7 +567,10 @@ pub trait Executor {
/// Spawns a new asynchronous task that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_unbounded_coroutine<T, F, Fut>(&self, mut f: F) -> UnboundedCommunicationTask<T>
fn spawn_unbounded_coroutine_with_receiver<T, F, Fut>(
&self,
mut f: F,
) -> UnboundedCommunicationTask<T>
where
F: FnMut(UnboundedReceiver<T>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
Expand All @@ -450,7 +587,7 @@ pub trait Executor {
/// Spawns a new asynchronous task with provided context that accepts messages to the task using [`channels`](futures::channel::mpsc).
/// This function returns a handle that allows sending a message, or if there is no reference to the handle at all
/// (in other words, all handles are dropped), the task would be aborted.
fn spawn_unbounded_coroutine_with_context<T, F, C, Fut>(
fn spawn_unbounded_coroutine_with_receiver_and_context<T, F, C, Fut>(
&self,
context: C,
mut f: F,
Expand Down
Loading
Loading