diff --git a/src/Redux.DevTools.Universal/Redux.DevTools.Universal.nuget.props b/src/Redux.DevTools.Universal/Redux.DevTools.Universal.nuget.props index f2f2db6..f9cdb94 100644 --- a/src/Redux.DevTools.Universal/Redux.DevTools.Universal.nuget.props +++ b/src/Redux.DevTools.Universal/Redux.DevTools.Universal.nuget.props @@ -3,9 +3,9 @@ True NuGet - C:\Dev\redux.net\src\Redux.DevTools.Universal\project.lock.json + C:\GH\redux.NET\src\Redux.DevTools.Universal\project.lock.json $(UserProfile)\.nuget\packages\ - C:\Users\Guillaume\.nuget\packages\ + C:\Users\Christer\.nuget\packages\ ProjectJson 4.0.0 diff --git a/src/Redux.Tests/AwaitableStoreTests.cs b/src/Redux.Tests/AwaitableStoreTests.cs new file mode 100644 index 0000000..174e4e8 --- /dev/null +++ b/src/Redux.Tests/AwaitableStoreTests.cs @@ -0,0 +1,429 @@ +namespace Redux.Tests +{ + using System; + using System.Diagnostics; + using System.Reactive.Linq; + using System.Threading.Tasks; + + using NUnit.Framework; + + [TestFixture] + [Timeout(5000)] + public class AwaitableStoreTests + { + private class StoreIncrementAction + { + } + + private class SagaIncrementAction + { + } + + private static int Reducer(int state, object action) + { + switch (action) + { + case StoreIncrementAction _: return state + 1; + default: return state; + } + } + + private static void BlockingIncrementSaga(SagaIncrementAction action, IStore store) + { + Task.Delay(100).Wait(); + store.Dispatch(new StoreIncrementAction()); + } + + private static async Task AsyncIncrementSaga(SagaIncrementAction action, IStore store) + { + await Task.Delay(100); + store.Dispatch(new StoreIncrementAction()); + } + + private static void BlockingThrowingSaga(SagaIncrementAction action, IStore store) + { + throw new Exception(); + } + + private static Task AsyncThrowingSaga(SagaIncrementAction action, IStore store) + { + throw new Exception(); + } + + #region Updating state using delayed saga + + [Test] + public async Task AsyncSaga_When_AwaitingDispatchAsync_Should_GetNewState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + await store.DispatchAsync(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public async Task AsyncSaga_When_NotAwaitingDispatchAsync_Should_GetNewStateAfterSagaCompletes() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + store.DispatchAsync(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(0)); + await Task.Delay(150); + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public async Task AsyncSaga_When_UsingNormalDispatch_Should_GetNewStateAfterSagaCompletes() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + store.Dispatch(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(0)); + await Task.Delay(150); + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public async Task AsyncSaga_When_StoreIsNonAwaitable_Should_GetNewStateAfterSagaCompletes() + { + // Arrange + var store = new ObservableActionStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + store.Dispatch(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(0)); + await Task.Delay(150); + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public async Task AsyncSaga_When_AwaitingFirstOfSeveralDispatches_Should_AwaitAllDispatchesAndGetFinalState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + Task firstDispatch = store.DispatchAsync(new SagaIncrementAction()); + await Task.Delay(30); + store.Dispatch(new SagaIncrementAction()); + await Task.Delay(30); + store.Dispatch(new SagaIncrementAction()); + + await firstDispatch; + + // Assert + Assert.That(store.GetState(), Is.EqualTo(3)); + } + + #endregion + + #region Updating state using blocking saga + + [Test] + public async Task BlockingSaga_When_AwaitingDispatchAsync_Should_GetNewState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsSaga(store, BlockingIncrementSaga); + + // Act + await store.DispatchAsync(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public void BlockingSaga_When_NotAwaitingDispatchAsync_Should_GetNewState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsSaga(store, BlockingIncrementSaga); + + // Act + store.DispatchAsync(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public void BlockingSaga_When_UsingNormalDispatch_Should_GetNewState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsSaga(store, BlockingIncrementSaga); + + // Act + store.Dispatch(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public void BlockingSaga_When_StoreIsNonAwaitable_Should_GetNewState() + { + // Arrange + var store = new ObservableActionStore(Reducer, 0); + store.Actions.OfType().RunsSaga(store, BlockingIncrementSaga); + + // Act + store.Dispatch(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + #endregion + + #region Updating state directly (bypassing saga) + + [Test] + public async Task DirectAction_When_AwaitingDispatchAsync_Should_GetNewState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + await store.DispatchAsync(new StoreIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public void DirectAction_When_NotAwaitingDispatchAsync_Should_GetNewState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + store.DispatchAsync(new StoreIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public void DirectAction_When_UsingNormalDispatch_Should_GetNewState() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + store.Dispatch(new StoreIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public void DirectAction_When_StoreIsNonAwaitable_Should_GetNewState() + { + // Arrange + var store = new ObservableActionStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + store.Dispatch(new StoreIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + #endregion + + #region Exceptions - async saga + + [Test] + public void AsyncThrowingSaga_When_AwaitingDispatchAsync_Should_BubbleUp() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncThrowingSaga); + + // Act/Assert + Assert.That(async () => await store.DispatchAsync(new SagaIncrementAction()), Throws.Exception); + } + + [Test] + public void AsyncThrowingSaga_When_UsingNormalDispatch_Should_BubbleUp() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncThrowingSaga); + + // Act/Assert + Assert.That(() => store.Dispatch(new SagaIncrementAction()), Throws.Exception); + } + + [Test] + public void AsyncThrowingSaga_When_StoreIsNonAwaitable_Should_BubbleUp() + { + // Arrange + var store = new ObservableActionStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncThrowingSaga); + + // Act/Assert + Assert.That(() => store.Dispatch(new SagaIncrementAction()), Throws.Exception); + } + + [Test] + public async Task AsyncThrowingSaga_When_ExceptionThrownAndCaught_Expect_DispatchIsStillAwaitable() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + + // Subscribe a saga that throws an exception, run it and remove it + IDisposable sub = store.Actions.OfType() + .RunsAsyncSaga(store, AsyncThrowingSaga); + Assert.That(async () => await store.DispatchAsync(new SagaIncrementAction()), Throws.Exception); + sub.Dispose(); + + // Act/Assert: The test will time out if the async counter was not decremented after the exception + await store.DispatchAsync(new StoreIncrementAction()); + } + + #endregion + + #region Exceptions - blocking saga + + [Test] + public void BlockingThrowingSaga_When_AwaitingDispatchAsync_Should_BubbleUp() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsSaga(store, BlockingThrowingSaga); + + // Act/Assert + Assert.That(async () => await store.DispatchAsync(new SagaIncrementAction()), Throws.Exception); + } + + [Test] + public void BlockingThrowingSaga_When_UsingNormalDispatch_Should_BubbleUp() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsSaga(store, BlockingThrowingSaga); + + // Act/Assert + Assert.That(() => store.Dispatch(new SagaIncrementAction()), Throws.Exception); + } + + [Test] + public void BlockingThrowingSaga_When_StoreIsNonAwaitable_Should_BubbleUp() + { + // Arrange + var store = new ObservableActionStore(Reducer, 0); + store.Actions.OfType().RunsSaga(store, BlockingThrowingSaga); + + // Act/Assert + Assert.That(() => store.Dispatch(new SagaIncrementAction()), Throws.Exception); + } + + [Test] + public async Task BlockingThrowingSaga_When_ExceptionThrownAndCaught_Expect_DispatchIsStillAwaitable() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + + // Subscribe a saga that throws an exception, run it and remove it + IDisposable sub = store.Actions.OfType().RunsSaga(store, BlockingThrowingSaga); + Assert.That(async () => await store.DispatchAsync(new SagaIncrementAction()), Throws.Exception); + sub.Dispose(); + + // Act/Assert: The test will time out if the async counter was not decremented after the exception + await store.DispatchAsync(new StoreIncrementAction()); + } + + #endregion + + #region Unsubscription + + [Test] + public async Task AsyncSaga_When_Unsubscribed_Should_NotBeInvoked() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + IDisposable sub = store.Actions.OfType() + .RunsAsyncSaga(store, AsyncIncrementSaga); + + // Sanity check + await store.DispatchAsync(new SagaIncrementAction()); + Assert.That(store.GetState(), Is.EqualTo(1)); + + // Act + sub.Dispose(); + await store.DispatchAsync(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + [Test] + public void BlockingSaga_When_Unsubscribed_Should_NotBeInvoked() + { + // Arrange + var store = new ObservableActionStore(Reducer, 0); + IDisposable sub = store.Actions.OfType().RunsSaga(store, BlockingIncrementSaga); + + // Sanity check + store.Dispatch(new SagaIncrementAction()); + Assert.That(store.GetState(), Is.EqualTo(1)); + + // Act + sub.Dispose(); + store.Dispatch(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(1)); + } + + #endregion + + #region Concurrency + + [Test] + public async Task When_MultipleActionsDispatched_Should_BeProcessedConcurrentlyBySaga() + { + // Arrange + var store = new AwaitableStore(Reducer, 0); + store.Actions.OfType().RunsAsyncSaga(store, AsyncIncrementSaga); + + // Act + store.Dispatch(new SagaIncrementAction()); + await Task.Delay(50); + store.Dispatch(new SagaIncrementAction()); + + // Assert + Assert.That(store.GetState(), Is.EqualTo(0)); + await Task.Delay(150); + Assert.That(store.GetState(), Is.EqualTo(2)); + } + } + + #endregion +} \ No newline at end of file diff --git a/src/Redux.Tests/Redux.Tests.csproj b/src/Redux.Tests/Redux.Tests.csproj index 7cd1ad1..b0f933f 100644 --- a/src/Redux.Tests/Redux.Tests.csproj +++ b/src/Redux.Tests/Redux.Tests.csproj @@ -82,6 +82,7 @@ + diff --git a/src/Redux.sln.DotSettings b/src/Redux.sln.DotSettings new file mode 100644 index 0000000..4f05f4a --- /dev/null +++ b/src/Redux.sln.DotSettings @@ -0,0 +1,2 @@ + + <data><IncludeFilters /><ExcludeFilters><Filter ModuleMask="Redux.Tests" ModuleVersionMask="*" ClassMask="*" FunctionMask="*" IsEnabled="True" /></ExcludeFilters></data> \ No newline at end of file diff --git a/src/Redux/AwaitableStore.cs b/src/Redux/AwaitableStore.cs new file mode 100644 index 0000000..b14e7c5 --- /dev/null +++ b/src/Redux/AwaitableStore.cs @@ -0,0 +1,118 @@ +namespace Redux +{ + using System; + using System.Reactive; + using System.Reactive.Disposables; + using System.Reactive.Linq; + using System.Reactive.Subjects; + using System.Threading.Tasks; + + public delegate void Saga(TAction action, IStore store); + + public delegate Task AsyncSaga(TAction action, IStore store); + + public static class ObservableExtensions + { + public static IDisposable RunsSaga( + this IObservable source, + IStore store, + Saga saga) + { + return source.Subscribe(action => saga(action, store)); + } + + public static IDisposable RunsAsyncSaga( + this IObservable source, + IStore store, + AsyncSaga saga) + { + // Using SelectMany is the standard way of running async subscribers, otherwise they become + // async void and exceptions are swallowed. E.g. http://stackoverflow.com/a/24844934/2978652, + // http://stackoverflow.com/a/37412422/2978652, http://stackoverflow.com/a/23011084/2978652. + // + // Note that this will NOT block the queue while the saga runs; a new action can trigger + // the saga while the previous saga invocation still runs (which can be accomplished with + // http://stackoverflow.com/a/30030640/2978652). Note that this should be the expected + // behavior since it allows the sagas themselves to control cancellation of existing tasks + // in response to new invocations. + return source.SelectMany( + async action => + { + if (store is AwaitableStore s) + { + using (s.AsyncOperation()) + { + await saga(action, store); + } + } + else + { + await saga(action, store); + } + return Unit.Default; + }) + .Subscribe(); + } + } + + public interface IObservableActionStore + { + IObservable Actions { get; } + } + + public class ObservableActionStore : Store, IObservableActionStore + { + private readonly ISubject actionsSubject = new Subject(); + + /// + public ObservableActionStore( + Reducer reducer, + TState initialState = default(TState), + params Middleware[] middlewares) : base(reducer, initialState, middlewares) + { + } + + public IObservable Actions => this.actionsSubject; + + protected override object InnerDispatch(object action) + { + object ret = base.InnerDispatch(action); + this.actionsSubject.OnNext(action); + return ret; + } + } + + public interface IAwaitableStore : IStore + { + Task DispatchAsync(object action); + } + + public class AwaitableStore : ObservableActionStore, IAwaitableStore + { + private int numOperations; + private readonly ISubject numOperationsSubject = new BehaviorSubject(0); + + /// + public AwaitableStore( + Reducer reducer, + TState initialState = default(TState), + params Middleware[] middlewares) : base(reducer, initialState, middlewares) + { + } + + private IObservable OngoingOperations => this.numOperationsSubject; + + internal IDisposable AsyncOperation() + { + this.numOperationsSubject.OnNext(++this.numOperations); + return Disposable.Create(() => this.numOperationsSubject.OnNext(--this.numOperations)); + } + + public async Task DispatchAsync(object action) + { + object ret = this.Dispatch(action); + await this.OngoingOperations.FirstAsync(i => i == 0); + return Task.FromResult(ret); + } + } +} \ No newline at end of file diff --git a/src/Redux/Redux.csproj b/src/Redux/Redux.csproj index 8f854a4..b903979 100644 --- a/src/Redux/Redux.csproj +++ b/src/Redux/Redux.csproj @@ -35,6 +35,7 @@ 4 + diff --git a/src/Redux/Store.cs b/src/Redux/Store.cs index ebc1a46..748e746 100644 --- a/src/Redux/Store.cs +++ b/src/Redux/Store.cs @@ -32,7 +32,7 @@ public event Action StateChanged } } - public object Dispatch(object action) + public virtual object Dispatch(object action) { return _dispatcher(action); } @@ -42,7 +42,7 @@ public TState GetState() return _lastState; } - private Dispatcher ApplyMiddlewares(params Middleware[] middlewares) + protected virtual Dispatcher ApplyMiddlewares(params Middleware[] middlewares) { Dispatcher dispatcher = InnerDispatch; foreach (var middleware in middlewares) @@ -52,7 +52,7 @@ private Dispatcher ApplyMiddlewares(params Middleware[] middlewares) return dispatcher; } - private object InnerDispatch(object action) + protected virtual object InnerDispatch(object action) { lock (_syncRoot) {