From 257eeec2e216ea3ff772664cd20c3e8adc79a2b5 Mon Sep 17 00:00:00 2001 From: Jan Ypma Date: Tue, 18 Mar 2025 09:26:54 +0100 Subject: [PATCH] Introduce Task monad This commit introduces Task, a data structure that represents a recipe, or program, for producing a value of type T (or failing with an exception). It is similar in semantics to RunnableGraph[T], but intended as first-class building block. It has the following properties: - A task can have resources associated to it, which are guaranteed to be released if the task is cancelled or fails - Tasks can be forked so multiple ones can run concurrently - Such forked tasks can be cancelled A Task can be created from a RunnableGraph which has a KillSwitch, by connecting a Source and a Sink through a KillSwitch, or by direct lambda functions. --- .../apache/pekko/japi/function/Function.scala | 38 +- .../apache/pekko/task/javadsl/TaskTest.java | 270 ++++++++++ .../org/apache/pekko/task/javadsl/Clock.java | 35 ++ .../pekko/task/javadsl/CollectionHelpers.java | 36 ++ .../org/apache/pekko/task/javadsl/Fiber.java | 65 +++ .../apache/pekko/task/javadsl/Promise.java | 56 ++ .../apache/pekko/task/javadsl/Resource.java | 106 ++++ .../pekko/task/javadsl/RunningGraphs.java | 43 ++ .../apache/pekko/task/javadsl/Runtime.java | 72 +++ .../org/apache/pekko/task/javadsl/Task.java | 478 ++++++++++++++++++ .../apache/pekko/task/AbstractRuntime.scala | 138 +++++ .../org/apache/pekko/task/ClockDef.scala | 51 ++ .../org/apache/pekko/task/FiberDef.scala | 111 ++++ .../org/apache/pekko/task/PromiseDef.scala | 85 ++++ .../org/apache/pekko/task/ResourceDef.scala | 63 +++ .../org/apache/pekko/task/RunningGraph.scala | 38 ++ .../scala/org/apache/pekko/task/TaskDef.scala | 195 +++++++ .../apache/pekko/task/scaladsl/Fiber.scala | 28 + .../apache/pekko/task/scaladsl/Flows.scala | 31 ++ .../apache/pekko/task/scaladsl/Promise.scala | 36 ++ .../apache/pekko/task/scaladsl/Resource.scala | 54 ++ .../apache/pekko/task/scaladsl/Runtime.scala | 38 ++ .../apache/pekko/task/scaladsl/Sinks.scala | 36 ++ .../org/apache/pekko/task/scaladsl/Task.scala | 86 ++++ 24 files changed, 2187 insertions(+), 2 deletions(-) create mode 100644 stream-tests/src/test/java/org/apache/pekko/task/javadsl/TaskTest.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/Clock.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/CollectionHelpers.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/Fiber.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/Promise.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/Resource.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/RunningGraphs.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/Runtime.java create mode 100644 stream/src/main/java/org/apache/pekko/task/javadsl/Task.java create mode 100644 stream/src/main/scala/org/apache/pekko/task/AbstractRuntime.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/ClockDef.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/FiberDef.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/PromiseDef.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/ResourceDef.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/RunningGraph.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/TaskDef.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/scaladsl/Fiber.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/scaladsl/Flows.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/scaladsl/Promise.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/scaladsl/Resource.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/scaladsl/Runtime.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/scaladsl/Sinks.scala create mode 100644 stream/src/main/scala/org/apache/pekko/task/scaladsl/Task.scala diff --git a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala index 3e0713ea94d..235f8ed0ff1 100644 --- a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala +++ b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala @@ -25,9 +25,17 @@ import scala.annotation.nowarn @nowarn("msg=@SerialVersionUID has no effect") @SerialVersionUID(1L) @FunctionalInterface -trait Function[-T, +R] extends java.io.Serializable { +trait Function[-T, +R] extends java.io.Serializable { outer => @throws(classOf[Exception]) def apply(param: T): R + + /** Returns a function that applies [fn] to the result of this function. */ + def andThen[U](fn: Function[R, U]): Function[T, U] = new Function[T, U] { + override def apply(param: T) = fn(outer.apply(param)) + } + + /** Returns a Scala function representation for this function. */ + def toScala[T1 <: T, R1 >: R]: T1 => R1 = t => apply(t) } object Function { @@ -63,6 +71,21 @@ trait Function2[-T1, -T2, +R] extends java.io.Serializable { trait Procedure[-T] extends java.io.Serializable { @throws(classOf[Exception]) def apply(param: T): Unit + + def toScala[T1 <: T]: T1 => Unit = t => apply(t) +} + +/** + * A BiProcedure is like a BiFunction, but it doesn't produce a return value. + * `Serializable` is needed to be able to grab line number for Java 8 lambdas. + * Supports throwing `Exception` in the apply, which the `java.util.function.Consumer` counterpart does not. + */ +@nowarn("msg=@SerialVersionUID has no effect") +@SerialVersionUID(1L) +@FunctionalInterface +trait BiProcedure[-T1, -T2] extends java.io.Serializable { + @throws(classOf[Exception]) + def apply(t1: T1, t2: T2): Unit } /** @@ -77,6 +100,9 @@ trait Effect extends java.io.Serializable { @throws(classOf[Exception]) def apply(): Unit + + /** Returns a Scala function representation for this function. */ + def toScala: () => Unit = () => apply() } /** @@ -98,11 +124,19 @@ trait Predicate[-T] extends java.io.Serializable { @nowarn("msg=@SerialVersionUID has no effect") @SerialVersionUID(1L) @FunctionalInterface -trait Creator[+T] extends Serializable { +trait Creator[+T] extends Serializable { outer => /** * This method must return a different instance upon every call. */ @throws(classOf[Exception]) def create(): T + + /** Returns a function that applies [fn] to the result of this function. */ + def andThen[U](fn: Function[T, U]): Creator[U] = new Creator[U] { + override def create() = fn(outer.create()) + } + + /** Returns a Scala function representation for this function. */ + def toScala[T1 >: T]: () => T1 = () => create() } diff --git a/stream-tests/src/test/java/org/apache/pekko/task/javadsl/TaskTest.java b/stream-tests/src/test/java/org/apache/pekko/task/javadsl/TaskTest.java new file mode 100644 index 00000000000..593b4b9939d --- /dev/null +++ b/stream-tests/src/test/java/org/apache/pekko/task/javadsl/TaskTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import org.apache.pekko.stream.StreamTest; +import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; +import org.apache.pekko.testkit.PekkoSpec; +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.Done; + +import org.junit.ClassRule; +import org.junit.Test; + +import org.apache.pekko.japi.function.Creator; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import java.util.Optional; +import java.time.Duration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TaskTest extends StreamTest { + private final Runtime runtime = Runtime.create(Materializer.createMaterializer(system)); + + public TaskTest() { + super(actorSystemResource); + } + + @ClassRule + public static PekkoJUnitActorSystemResource actorSystemResource = + new PekkoJUnitActorSystemResource("TaskTest", PekkoSpec.testConf()); + + private T run(Task task) throws Throwable { + return runtime.run(task.timeout(Duration.ofSeconds(2))); + } + + @Test + public void can_run_task_from_lambda() throws Throwable { + assertEquals("Hello", run(Task.run(() -> "Hello"))); + } + + @Test + public void can_map() throws Throwable { + assertEquals(25, run(Task.run(() -> "25").map(Integer::parseInt)).intValue()); + } + + @Test + public void can_flatMap_to_run() throws Throwable { + assertEquals( + 25, run(Task.run(() -> "25").flatMap(s -> Task.run(() -> Integer.parseInt(s)))).intValue()); + } + + @Test + public void can_zipPar_two_tasks() throws Throwable { + Task task = + Task.run( + () -> { + return "Hello"; + }); + assertEquals("HelloHello", run(task.zipPar(task, (s1, s2) -> s1 + s2))); + } + + @Test + public void zipPar_interrupts_first_on_error_in_second() throws Throwable { + AtomicLong check = new AtomicLong(); + Task task1 = + Task.succeed("A").delayed(Duration.ofMillis(100)).before(Task.run(check::incrementAndGet)); + Task task2 = Task.fail(new RuntimeException("simulated failure")); + org.junit.Assert.assertThrows( + RuntimeException.class, () -> run(task1.zipPar(task2, (a, b) -> a + b))); + assertEquals(0, check.get()); + } + + @Test + public void zipPar_interrupts_second_on_error_in_first() throws Throwable { + AtomicLong check = new AtomicLong(); + Task task1 = + Task.succeed("A").delayed(Duration.ofMillis(100)).before(Task.run(check::incrementAndGet)); + Task task2 = Task.fail(new RuntimeException("simulated failure")); + org.junit.Assert.assertThrows( + RuntimeException.class, () -> run(task2.zipPar(task1, (a, b) -> a + b))); + assertEquals(0, check.get()); + } + + @Test + public void can_interrupt_forked_task() throws Throwable { + AtomicLong check = new AtomicLong(); + Task task = Task.run(() -> check.incrementAndGet()).delayed(Duration.ofMillis(100)); + run(task.forkDaemon().flatMap(fiber -> fiber.interrupt().map(cancelled -> "cancelled"))); + assertEquals(0, check.get()); + } + + @Test(expected = InterruptedException.class) + public void joining_interrupted_fiber_yields_exception() throws Throwable { + Task task = Task.succeed(42L).delayed(Duration.ofMillis(100)); + run(task.forkDaemon().flatMap(fiber -> fiber.interrupt().flatMap(cancelled -> fiber.join()))); + } + + @Test + public void can_run_graph() throws Throwable { + assertEquals( + Optional.of("hello"), run(Task.connect(Source.single("hello"), Sink.headOption()))); + } + + @Test + public void can_interrupt_graph() throws Throwable { + AtomicLong check = new AtomicLong(); + assertEquals( + Done.getInstance(), + run( + Task.connect( + Source.tick(Duration.ofMillis(1), Duration.ofMillis(1), ""), + Sink.foreach(s -> check.incrementAndGet())) + .forkDaemon() + .flatMap(fiber -> fiber.interrupt()))); + Thread.sleep(100); + assertTrue(check.get() < 10); + } + + @Test + public void resource_is_acquired_and_released() throws Throwable { + AtomicLong check = new AtomicLong(); + Resource res = + Resource.acquireRelease( + Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet())); + Task task = res.use(i -> Task.succeed(i)); + assertEquals(1L, run(task).longValue()); + assertEquals(0L, check.get()); + } + + @Test + public void resource_is_released_on_failure() throws Throwable { + AtomicLong check = new AtomicLong(); + Resource res = + Resource.acquireRelease( + Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet())); + Task task = res.use(i -> Task.fail(new RuntimeException("Simulated failure"))); + try { + run(task); + } catch (Exception ignored) { + } + assertEquals(0L, check.get()); + } + + @Test + public void resource_closes_AutoCloseable() throws Throwable { + AtomicLong created = new AtomicLong(); + AtomicLong closed = new AtomicLong(); + Resource res = + Resource.autoCloseable( + Task.run( + () -> { + created.incrementAndGet(); + return () -> closed.incrementAndGet(); + })); + run(res.use(ac -> Task.done)); + assertEquals(1L, created.get()); + assertEquals(1L, closed.get()); + } + + @Test + public void resource_is_released_when_interrupted() throws Throwable { + AtomicLong check = new AtomicLong(); + AtomicLong started = new AtomicLong(); + + Resource res = + Resource.acquireRelease( + Task.run( + () -> { + return check.incrementAndGet(); + }), + i -> + Task.run( + () -> { + return check.decrementAndGet(); + })); + + Task task = + res.use( + i -> + Task.run(() -> started.incrementAndGet()) + .before(Clock.sleep(Duration.ofMillis(100)))); + run(task.forkDaemon().flatMap(fiber -> fiber.interrupt().delayed(Duration.ofMillis(50)))); + + assertEquals(0L, check.get()); + assertEquals(1L, started.get()); + } + + @Test + public void resource_can_fork() throws Throwable { + AtomicLong check = new AtomicLong(); + Resource res = + Resource.acquireRelease(Task.run(() -> check.incrementAndGet()), i -> Task.done); + Task task = res.fork().use(fiber -> fiber.join()); + run(task); + assertEquals(1L, check.get()); + } + + @Test + public void resource_is_released_when_fork_is_interrupted() throws Throwable { + AtomicLong check = new AtomicLong(); + Resource res = + Resource.acquireRelease( + Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet())); + Task task = res.fork().use(fiber -> fiber.interrupt()); + run(task); + assertEquals(0L, check.get()); + } + + @Test + public void resource_is_released_when_fork_is_completed() throws Throwable { + AtomicLong check = new AtomicLong(); + Resource res = + Resource.acquireRelease( + Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet())); + Task task = res.fork().use(fiber -> fiber.join()); + run(task); + assertEquals(0L, check.get()); + } + + @Test + public void can_create_and_complete_promise() throws Throwable { + Task task = + Promise.make() + .flatMap( + promise -> + promise + .await() + .forkDaemon() + .flatMap(fiber -> promise.succeed(42).andThen(fiber.join()))); + assertEquals(42, run(task).intValue()); + } + + @Test + public void can_race_two_tasks() throws Throwable { + Task task1 = Task.succeed(0).delayed(Duration.ofMillis(100)); + Task task2 = Task.succeed(42); + Task task = Task.raceAll(task1, task2); + assertEquals(42, run(task).intValue()); + } + + @Test + public void can_race_task_with_never() throws Throwable { + Task task1 = Task.succeed(42).delayed(Duration.ofMillis(100)); + Task task2 = Task.never(); + Task task = Task.raceAll(task1, task2); + assertEquals(42, run(task).intValue()); + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/Clock.java b/stream/src/main/java/org/apache/pekko/task/javadsl/Clock.java new file mode 100644 index 00000000000..0fca21e6fcb --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/Clock.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import java.time.Duration; +import java.time.Instant; + +import org.apache.pekko.task.GetRuntimeDef$; +import org.apache.pekko.task.ClockDef$; +import org.apache.pekko.task.TaskDef; +import org.apache.pekko.Done; + +public class Clock { + public static final Task nanoTime = new Task<>(ClockDef$.MODULE$.nanoTime()); + public static final Task now = new Task<>(ClockDef$.MODULE$.now()); + + public static Task sleep(Duration duration) { + return new Task<>(ClockDef$.MODULE$.sleep(duration)).asDone(); + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/CollectionHelpers.java b/stream/src/main/java/org/apache/pekko/task/javadsl/CollectionHelpers.java new file mode 100644 index 00000000000..6b807d8fe1e --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/CollectionHelpers.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import java.util.ArrayList; +import java.util.function.Function; + +/** + * Functional helpers for collections. Prefixed with "c" so they don't conflict with local methods + * of the same name (in Java 8's limited name resolution) + */ +public class CollectionHelpers { + public static ArrayList cmap( + Iterable src, Function fn) { + ArrayList list = new ArrayList<>(); + for (T t : src) { + list.add(fn.apply(t)); + } + return list; + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/Fiber.java b/stream/src/main/java/org/apache/pekko/task/javadsl/Fiber.java new file mode 100644 index 00000000000..9bd2e9f02f8 --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/Fiber.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import org.apache.pekko.task.FiberDef; +import org.apache.pekko.task.AwaitDef; +import org.apache.pekko.task.InterruptDef; +import org.apache.pekko.Done; + +/** + * A fiber represents the ongoing execution of a Task, eventually resulting in a value T or failing + * with an exception. + */ +public class Fiber { + private final FiberDef definition; + + public Fiber(FiberDef definition) { + this.definition = definition; + } + + /** Returns a Task that will complete when this fiber does. */ + public Task join() { + return new Task<>(definition.join()); + } + + /** + * Returns a Task that will interrupt this fiber, causing its execution to stop. If the fiber has + * already completed, the task will complete with the fiber's result. Otherwise, the task will + * complete with InterruptedException. + */ + public Task interruptAndGet() { + return new Task<>(definition.interruptAndGet()); + } + + /** + * Returns a Task that will interrupt this fiber, causing its execution to stop. The task will + * complete once interruption has finished. + */ + public Task interrupt() { + return new Task<>(definition.interrupt()).asDone(); + } + + /** + * Returns a Resource wrapping this running fiber, which will interrupt the fiber when the + * resource's lifetime ends. + */ + public Resource> toResource() { + return Resource.acquireRelease(Task.succeed(this), f -> f.interrupt()); + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/Promise.java b/stream/src/main/java/org/apache/pekko/task/javadsl/Promise.java new file mode 100644 index 00000000000..0844e874378 --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/Promise.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import org.apache.pekko.Done; +import org.apache.pekko.task.AwaitDef; +import org.apache.pekko.task.CompleteDef; +import org.apache.pekko.task.PromiseDef; + +public class Promise { + private final PromiseDef definition; + + Promise(PromiseDef definition) { + this.definition = definition; + } + + public static Task> make() { + return Task.succeed(new Promise<>(new PromiseDef<>())); + } + + public Task await() { + return new Task<>(new AwaitDef<>(definition)); + } + + /** + * Completes the promise with the given result, or does nothing if the promise was already + * completed. + */ + // FIXME add a Boolean result here, returning if the promise was already completed. + public Task succeed(T result) { + return completeWith(Task.succeed(result)); + } + + public Task fail(Throwable exception) { + return completeWith(Task.fail(exception)); + } + + public Task completeWith(Task task) { + return new Task<>(new CompleteDef<>(definition, task.definition())).asDone(); + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/Resource.java b/stream/src/main/java/org/apache/pekko/task/javadsl/Resource.java new file mode 100644 index 00000000000..7f52a4d1065 --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/Resource.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import org.apache.pekko.japi.function.Function; +import org.apache.pekko.japi.function.Function2; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.Done; +import org.apache.pekko.task.AbstractTask; +import org.apache.pekko.task.AbstractResource; +import org.apache.pekko.task.ResourceDef; + +import static org.apache.pekko.task.javadsl.Task.task; +import static org.apache.pekko.task.javadsl.Task.runTask; + +/** + * A Resource represents a value that can be created, but needs to be cleaned up after use. The use + * of Resource guarantees that values aren't leaked even in case of failure or interrupted fibers. + */ +public class Resource implements AbstractResource { + /** + * Creates a Resource that creates a value using [acquire], invoking [release] when such value + * needs to be cleaned up. It is important that only the Task returned by [release] performs any + * cleanup, not the lambda itself. + */ + public static Resource acquireRelease( + Task acquire, Function> release) { + return new Resource<>( + ResourceDef.acquireRelease( + acquire.definition(), release.andThen(t -> t.definition()).toScala())); + } + + public static Resource autoCloseable(Task acquire) { + return acquireRelease(acquire, t -> Task.run(() -> t.close())); + } + + public static Resource succeedTask(Task task) { + return new Resource<>(ResourceDef.succeedTask(() -> task.definition())); + } + + public static Resource succeed(T value) { + return new Resource<>(ResourceDef.succeed(() -> value)); + } + + private final ResourceDef definition; + + /** + * @param create Task that returns the value that this resource governs, and a task can clean up + * that value. + */ + Resource(ResourceDef definition) { + this.definition = definition; + } + + public ResourceDef definition() { + return definition; + } + + public Resource map(Function fn) { + return new Resource<>(definition.map(fn.toScala())); + } + + public Resource mapTask(Function> fn) { + return new Resource<>(definition.mapTask(fn.andThen(t -> t.definition()).toScala())); + } + + public Resource flatMap(Function> fn) { + return new Resource<>(definition.flatMap(fn.andThen(t -> t.definition()).toScala())); + } + + public Resource zip( + Resource that, Function2 combine) { + return flatMap(t -> that.map(u -> combine.apply(t, u))); + } + + /** + * Returns a Resource that creates this resource's value in a background fiber, making sure to + * interrupt that fiber when closing the resource. + */ + public Resource> fork() { + return new Resource<>(definition.fork().map(Fiber::new)); + } + + /** + * Returns a Task that can safely use this resource in the given function, guaranteeing cleanup + * even if the function fails or the current fiber is interrupted. + */ + public Task use(Function> fn) { + return new Task<>(definition.use(fn.andThen(t -> t.definition()).toScala())); + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/RunningGraphs.java b/stream/src/main/java/org/apache/pekko/task/javadsl/RunningGraphs.java new file mode 100644 index 00000000000..bce1d955978 --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/RunningGraphs.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import java.util.concurrent.CompletionStage; + +import org.apache.pekko.task.RunningGraph; +import org.apache.pekko.Done; +import org.apache.pekko.stream.KillSwitch; + +public class RunningGraphs { + /** + * Creates a RunningGraph from the given CompletionStage, using shutdown on the given ks to + * interrupt + */ + public static RunningGraph withShutdown(KillSwitch ks, CompletionStage cs) { + return RunningGraph.create(Task.completeUninterruptable(cs), Task.run(() -> ks.shutdown())); + } + + /** + * Creates a RunningGraph from the given CompletionStage, using abort on the given ks to interrupt + */ + public static RunningGraph withAbort(KillSwitch ks, CompletionStage cs) { + return RunningGraph.create( + Task.completeUninterruptable(cs), + Task.run(() -> ks.abort(new InterruptedException("Task was interrupted")))); + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/Runtime.java b/stream/src/main/java/org/apache/pekko/task/javadsl/Runtime.java new file mode 100644 index 00000000000..ce3ea2de232 --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/Runtime.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import org.apache.pekko.stream.Materializer; +import org.apache.pekko.task.ClockDef; +import org.apache.pekko.task.AbstractRuntime; +import org.apache.pekko.task.AbstractTask; +import org.apache.pekko.task.FiberRuntime; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.time.Duration; + +public class Runtime extends AbstractRuntime { + public static Runtime create(Materializer materializer) { + return create(materializer, ClockDef.system()); + } + + public static Runtime create(Materializer materializer, ClockDef clock) { + return new Runtime(materializer, clock); + } + + private Runtime(Materializer materializer, ClockDef clock) { + super(materializer, clock); + } + + /** Runs the task, and returns a CompletableFuture that will complete with the task's result. */ + public CompletableFuture runAsync(AbstractTask task) { + CompletableFuture fut = new CompletableFuture<>(); + run( + new FiberRuntime(), + task.definition(), + res -> { + if (res.isSuccess()) { + fut.complete(res.get()); + } else { + fut.completeExceptionally(res.failed().get()); + } + return null; + }); + return fut; + } + + /** + * Runs the task, and blocks the current thread indefinitely. If you want a timeout, use this + * method with Task.timeout(). If the task fails, its failing exception is thrown (not wrapped in + * an ExecptionException). If the task was interrupted, InterruptedException is thrown. + */ + public T run(AbstractTask task) throws Throwable { + try { + return runAsync(task).get(); + } catch (ExecutionException x) { + throw x.getCause(); + } + } +} diff --git a/stream/src/main/java/org/apache/pekko/task/javadsl/Task.java b/stream/src/main/java/org/apache/pekko/task/javadsl/Task.java new file mode 100644 index 00000000000..d88ea91e89b --- /dev/null +++ b/stream/src/main/java/org/apache/pekko/task/javadsl/Task.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.javadsl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; + +import org.apache.pekko.Done; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.japi.function.BiProcedure; +import org.apache.pekko.japi.function.Creator; +import org.apache.pekko.japi.function.Effect; +import org.apache.pekko.japi.function.Function2; +import org.apache.pekko.japi.function.Function; +import org.apache.pekko.japi.function.Procedure; +import org.apache.pekko.japi.function.Predicate; +import org.apache.pekko.stream.ActorAttributes.Dispatcher; +import org.apache.pekko.stream.ClosedShape; +import org.apache.pekko.stream.Graph; +import org.apache.pekko.stream.KillSwitch; +import org.apache.pekko.stream.KillSwitches; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.task.AbstractRuntime; +import org.apache.pekko.task.AbstractTask; +import org.apache.pekko.task.CallbackDef; +import org.apache.pekko.task.FiberDef; +import org.apache.pekko.task.FlatMapDef; +import org.apache.pekko.task.ForkDef; +import org.apache.pekko.task.InterruptabilityDef; +import org.apache.pekko.task.MapDef; +import org.apache.pekko.task.RunningGraph; +import org.apache.pekko.task.TaskDef; +import org.apache.pekko.task.ValueDef; + +import scala.Tuple2; +import scala.util.Failure; +import scala.util.Success; +import scala.util.Try; + +import static org.apache.pekko.task.javadsl.CollectionHelpers.*; + +import static scala.jdk.javaapi.FutureConverters.*; +import static scala.jdk.javaapi.CollectionConverters.*; + +/** + * A Task is a description of functionality (also called an "effect", or a "program") that, when + * run, either asynchronously yields a value of type T, or fails with an exception. + */ +public class Task implements AbstractTask { + public static final Task done = succeed(Done.getInstance()); + + /** Returns a task that runs all the given task in sequence, returning all results. */ + @SafeVarargs + public static Task> all(AbstractTask... tasks) { + return all(Arrays.asList(tasks)); + } + + /** Returns a task that runs all the given task in sequence, returning all results. */ + public static Task> all(Iterable> tasks) { + return new Task<>(TaskDef.all(asScala(cmap(tasks, t -> t.definition())))) + .map(seq -> asJava(seq)); + } + + /** + * Returns a task that runs all the given task in parallel, returning all results. If any task + * fails, the rest is interrupted. + */ + @SafeVarargs + public static Task> allPar(AbstractTask... tasks) { + return allPar(Arrays.asList(tasks)); + } + + /** + * Returns a task that runs all the given task in parallel, returning all results. If any task + * fails, the rest is interrupted. + */ + public static Task> allPar(Iterable> tasks) { + return new Task<>(TaskDef.allPar(asScala(cmap(tasks, t -> t.definition())))) + .map(seq -> asJava(seq)); + } + + /** + * Creates a Task that can integrate external code that can report results through a callback. The + * launch function is given a callback to invoke, and should return a task that can handle + * interruption of the running process. + * + *

For example: + * Task.callback(cb -> { + * myExternalProcess.onSuccess(t -> { cb.accept(t, null); }); + * myExternalProcess.onFailure(x -> { cb.accept(null, x); }); + * return Task.run(() -> myExternalProcess.cancel()); + * }) + * + */ + public static Task callback(Function, Task> launch) { + return new Task<>( + new CallbackDef<>( + callback -> + unchecked(() -> launch.apply((t, x) -> callback.apply(tryOf(t, x))).definition()))); + } + + /** Returns a task that completes the given CompletionStage, cancelling it on interruption. */ + public static Task complete(CompletionStage stage) { + return complete(() -> stage); + } + + /** Returns a task that creates and completes a CompletionStage, cancelling it on interruption. */ + public static Task complete(Creator> stage) { + return complete(stage, fut -> fut.cancel(false)); + } + + /** + * Returns a task that completes the given CompletionStage, leaving it running on interruption. + */ + public static Task completeUninterruptable(CompletionStage stage) { + return completeUninterruptable(() -> stage); + } + + /** + * Returns a task that creates and completes a CompletionStage, leaving it running on + * interruption. + */ + public static Task completeUninterruptable(Creator> stage) { + return complete(stage, fut -> {}); + } + + /** + * Returns a Task that connects the given source to a KillSwitch, and then through the given sink, + * shutting down the kill switch on interrupt. + */ + public static Task connect( + Source source, Sink> sink) { + return run(source.viaMat(KillSwitches.single(), Keep.right()), sink); + } + + /** Returns a Task that fails with the given exception. */ + public static Task fail(Throwable x) { + return new Task<>(TaskDef.fail(() -> x)); + } + + /** Returns a Task that fails with the given exception. */ + public static Task fail(Creator x) { + return new Task<>(TaskDef.fail(x.toScala())); + } + + /** Returns a Task that never completes. */ + public static Task never() { + return new Task<>(TaskDef.never()); + } + + /** + * Returns a Task which executes all given tasks in parallel, returning whichever of them + * completes first, and the interrupts the rest. + */ + public static Task raceAll(Iterable> tasks) { + return new Task<>(TaskDef.raceAll(asScala(cmap(tasks, t -> t.definition())))); + } + + /** + * Returns a Task which executes all given tasks in parallel, returning whichever of them + * completes first, and the interrupts the rest. + */ + @SafeVarargs + public static Task raceAll(AbstractTask... tasks) { + return raceAll(Arrays.asList(tasks)); + } + + /** + * Returns a Task that invokes the given function when run, completing with its return value. The + * function is executed on the default dispatcher. + */ + public static Task run(Creator fn) { + return new Task<>(TaskDef.succeed(fn.toScala())); + } + + /** Returns a Task that invokes the given function when run, completing with Done. */ + public static Task run(Effect fn) { + return new Task<>(TaskDef.succeed(fn.toScala())).asDone(); + } + + /** + * Returns a Task that runs the given source through the given sink, shutting down the kill switch + * on interrupt. + */ + public static Task run( + Source source, Sink> sink) { + return run(source.toMat(sink, RunningGraphs::withShutdown)); + } + + /** Returns the given runnable graph, which must materialize into an instance of RunningGraph. */ + public static Task run(Graph> graph) { + return new Task(TaskDef.run(graph)); + } + + /** + * Returns a Task that invokes the given function when run, completing with its return value. The + * function is executed on the given dispatcher. For example, provide + * org.apache.pekko.dispatch.Dispatchers.DefaultBlockingDispatcherId to run this task on pekko's + * blocking I/O thread pool. + */ + public static Task runOn(String dispatcher, Creator fn) { + return new Task<>(TaskDef.succeedOn(dispatcher, fn.toScala())); + } + + /** Returns a Task that invokes the given function when run, completing with Done. */ + public static Task runOn(String dispatcher, Effect fn) { + return new Task<>(TaskDef.succeedOn(dispatcher, fn.toScala())).asDone(); + } + + /** Returns a Task that invokes the given function when run, completing with its returned Task. */ + public static Task runTask(Creator> fn) { + return run(fn).flatMap(r -> r); + } + + /** Returns a Task that invokes the given function when run, completing with its returned Task. */ + public static Task runTaskOn(String dispatcher, Creator> fn) { + return runOn(dispatcher, fn).flatMap(r -> r); + } + + /** Returns a Task that succeeds with the given value */ + public static Task succeed(T value) { + return new Task<>(TaskDef.succeed(() -> value)); + } + + /** + * Returns a Task that uses the given function to construct an uninterruptable Task, which is + * allowed to contain interruptable blocks inside. For example: + * Task.uninterruptableMask(makeInterruptable -> + * acquireResource().andThen(makeInterruptable.apply(doSomeWork)).andThen(releaseResource()) + * ) + * + */ + public static Task uninterruptableMask(Function, Task>, Task> fn) { + return new Task( + TaskDef.uninterruptableMask( + restorer -> { + Function, Task> javaRestorer = + task -> new Task(restorer.apply(task.definition())); + return Task.runTask(() -> fn.apply(javaRestorer)).definition(); + })); + } + + // ================ Instance methods ==================== + + /** Returns a task that replaces this task's successful result with the given one. */ + public Task as(U value) { + return map(t -> value); + } + + /** Returns a task that replaces this task's successful result with Done. */ + public Task asDone() { + return as(Done.getInstance()); + } + + /** + * Returns a task that runs [that] after this task completes with success, yielding the new task's + * result. + */ + public Task andThen(AbstractTask that) { + return flatMap(t -> that); + } + + /** + * Returns a task that runs [that] after this task completes with success, yielding this task's + * result. + */ + public Task before(AbstractTask that) { + return flatMap(t -> task(that).as(t)); + } + + /** Returns a task that recovers from the matching exceptions using the given handler. */ + public Task catchSome(Class exceptionType, Function handler) { + return catchSomeWith(exceptionType, x -> true, x -> succeed(handler.apply(x))); + } + + /** Returns a task that recovers from the matching exceptions using the given handler. */ + @SuppressWarnings("unchecked") + public Task catchSomeWith( + Class exceptionType, Function> handler) { + return catchSomeWith(exceptionType, x -> true, handler); + } + + /** Returns a task that recovers from the matching exceptions using the given handler. */ + @SuppressWarnings("unchecked") + public Task catchSomeWith( + Class exceptionType, Predicate test, Function> handler) { + return flatMapResult( + (t, x) -> { + if (x != null) { + return (exceptionType.isInstance(x) && test.test((X) x) + ? handler.apply((X) x) + : fail(x)); + } else { + return succeed(t); + } + }); + } + + /** Returns a Task that executes this task after the given duration. */ + public Task delayed(Duration duration) { + return Clock.sleep(duration).andThen(this); + } + + /** + * Returns a task that maps this task's successful value through the given function, and runs the + * resulting task after this one. + */ + public Task flatMap(Function> fn) { + return new Task(definition.flatMap(t -> unchecked(() -> fn.apply(t).definition()))); + } + + /** + * Returns a task that forks this task into a background Fiber, which does not stop when the + * calling fiber stops. + */ + public Task> forkDaemon() { + return new Task<>(new ForkDef<>(definition)).map(Fiber::new); + } + + /** + * Return a Resource which runs this task in a background fiber when used, automatically + * interrupting the fiber when the resource is closed. + */ + public Resource> forkResource() { + return Resource.acquireRelease(forkDaemon(), fiber -> fiber.interrupt()); + } + + /** + * Returns a task that maps this task's result through the given function, and runs the resulting + * task after this one. On success, the function will receive a non-null first argument. On + * failure, the function will receive a non-null second argument. + */ + public Task flatMapResult(Function2> fn) { + return new Task( + definition.flatMapResult(res -> unchecked(() -> applyTry(res, fn).definition()))); + } + + /** Returns a task that maps this task's successful result through the given function */ + public Task map(Function fn) { + return new Task(definition.map(t -> unchecked(() -> fn.apply(t)))); + } + + /** + * Returns a task that runs the given side effect when this task completes. On success, the + * function will receive a non-null first argument. On failure, the function will receive a + * non-null second argument. + */ + public Task onComplete(Function2> fn) { + return new Task(definition.onComplete(res -> applyTry(res, fn).definition())); + } + + /** + * Runs this task and [that] in parallel, returning whichever completes first, interrupting the + * other. + */ + public Task race(AbstractTask that) { + return Task.raceAll(this, that); + } + + /** + * Returns a Task that after the given duration, automatically interrupts itself and fails with + * java.util.concurrent.TimeoutException. + */ + public Task timeout(Duration duration) { + return race( + Task.fail(() -> new TimeoutException("Task timed out after " + duration)) + .delayed(duration)); + } + + /** Returns this task as a resource, without a cleanup action. */ + public Resource toResource() { + return Resource.succeedTask(this); + } + + /** + * Returns a task that runs [that] after this task completes with success, using [combine] to + * combine the results. + */ + public Task zip( + AbstractTask that, Function2 combine) { + return flatMap(t -> task(that).map(u -> combine.apply(t, u))); + } + + /** + * Returns a task that runs this and another task in parallel, using [combine] to combine the + * results. + */ + public Task zipPar( + AbstractTask that, Function2 combine) { + return new Task<>( + definition + .zipPar(that.definition()) + .map(t -> unchecked(() -> combine.apply(t._1(), t._2())))); + } + + /** Returns the underlying task definition. Not part of the Task API. */ + public TaskDef definition() { + return definition; + } + + private final TaskDef definition; + + Task(TaskDef definition) { + this.definition = definition; + } + + /** + * Returns a Task that when ran, creates the given completion stage, completing the task with its + * result, and invoking onCancel when the task's fiber is cancelled. + */ + private static Task complete( + Creator> stage, Procedure> onCancel) { + return callback( + cb -> { + CompletableFuture fut = stage.create().toCompletableFuture(); + fut.whenComplete( + (t, x) -> { + cb.accept(t, x); + }); + return Task.run(() -> onCancel.apply(fut)); + }); + } + + static Task task(AbstractTask t) { + return (t instanceof Task) ? (Task) t : new Task<>(t.definition()); + } + + private static Try tryOf(Creator fn) { + return Try.apply(fn.toScala()); + } + + private static Try tryOf(T success, Throwable failure) { + return (success != null) ? new Success<>(success) : new Failure<>(failure); + } + + private static T unchecked(Creator fn) { + try { + return fn.create(); + } catch (Throwable t) { + return sneakyThrow(t); + } + } + + @SuppressWarnings("unchecked") + private static R sneakyThrow(Throwable t) throws T { + throw (T) t; + } + + private static U applyTry(Try res, Function2 fn) { + T t = (res.isSuccess()) ? res.get() : null; + Throwable x = (res.isFailure()) ? res.failed().get() : null; + + return unchecked(() -> fn.apply(t, x)); + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/AbstractRuntime.scala b/stream/src/main/scala/org/apache/pekko/task/AbstractRuntime.scala new file mode 100644 index 00000000000..c9128495f24 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/AbstractRuntime.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task + +import scala.util.{ Failure, Success, Try } + +import org.apache.pekko.stream.Materializer +import org.apache.pekko.dispatch.Dispatchers + +object AbstractRuntime { + private val interruptedFailure = Failure(new InterruptedException("Fiber interrupted.")) +} + +abstract class AbstractRuntime(val materializer: Materializer, val clock: ClockDef) { + import AbstractRuntime._ + + private val executor = materializer.system.dispatchers.lookup(Dispatchers.DefaultDispatcherId) + private val blockingExecutor = materializer.system.dispatchers.lookup(Dispatchers.DefaultBlockingDispatcherId) + + protected def run[T](fiber: FiberRuntime[_], task: TaskDef[T])(onComplete: Try[T] => Unit): Unit = { + task match { + // The tasks below respond differently when a fiber is interrupted (rather than just being safely skipped). + case InterruptabilityDef(interruptable, mkTask) => + val (prev, callbacks) = fiber.setInterruptable(interruptable) + run(fiber, callbacks) { _ => + val restorer = new RestorerDef { + override def apply[U](task: TaskDef[U]) = InterruptabilityDef(prev, _ => task) + } + run(fiber, mkTask(restorer)) { res => + run(fiber, fiber.setInterruptable(prev)._2)(_ => onComplete(res)) + } + } + + case OnInterruptDef(base, handler) => + run(fiber, fiber.onInterrupt(handler)) { _ => + run(fiber, base) { res => + fiber.removeOnInterrupt(handler) + onComplete(res) + } + } + + case m @ MapDef(base, fn) => + def go(res: Try[m.Base]): Unit = onComplete(Try(fn.apply(res)).flatMap(t => t)) + + run(fiber, base) { res => + if (fiber.shouldInterrupt) { + go(interruptedFailure) + } else { + go(res) + } + } + + case f @ FlatMapDef(base, fn) => + def go(res: Try[f.Base]): Unit = executor.execute(() => { + Try(fn.apply(res)) match { + case Success(next) => run(fiber, next)(onComplete) + case Failure(x) => onComplete(Failure(x)) + } + }) + + run(fiber, base) { res => + if (fiber.shouldInterrupt) { + go(interruptedFailure) + } else { + go(res) + } + } + + case _ => + // The tasks below can be safely skipped when a fiber is interrupted. + if (fiber.shouldInterrupt) { + onComplete(interruptedFailure) + } else task match { + case GetRuntimeDef => + onComplete(Success(this)) + + case CallbackDef(launch) => + val promise = new PromiseDef[T] + val callback: (Try[T]) => Unit = promise.complete(_) + val cancel: TaskDef[Any] = launch(callback).andThen(TaskDef.succeed(onComplete(interruptedFailure))) + run(fiber, fiber.onInterrupt(cancel)) { _ => + promise.onComplete { res => + onComplete(res) + fiber.removeOnInterrupt(cancel) + } + } + case ValueDef(fn, dispatcher) => + val ex = if (dispatcher eq Dispatchers.DefaultDispatcherId) + executor + else if (dispatcher eq Dispatchers.DefaultBlockingDispatcherId) + blockingExecutor + else + materializer.system.dispatchers.lookup(dispatcher) + ex.execute(() => { + onComplete(Try(fn.apply()).flatMap(r => r)) + }) + case f @ ForkDef(task) => + val childFiber = new FiberRuntime[f.Res]() + run(childFiber, task) { res => + childFiber.result.complete(res) + } + onComplete(Success(FiberDef(childFiber))) + case InterruptDef(cancelledFiber) => + run(fiber, cancelledFiber.interruptNow()) { _ => + cancelledFiber.onComplete { onComplete(_) } + } + case AwaitDef(promise) => + promise.onComplete { res => + onComplete(res) + } + case CompleteDef(promise, task) => + run(fiber, task) { res => + promise.complete(res) + onComplete(Success(())) + } + case MapDef(_, _) => throw new IllegalStateException // Unreachable, handled above + case FlatMapDef(_, _) => throw new IllegalStateException // Unreachable, handled above + case InterruptabilityDef(_, _) => throw new IllegalStateException // Unreachable, handled above + case OnInterruptDef(_, _) => throw new IllegalStateException // Unreachable, handled above + } + } + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/ClockDef.scala b/stream/src/main/scala/org/apache/pekko/task/ClockDef.scala new file mode 100644 index 00000000000..068629273f5 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/ClockDef.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task + +import java.time.Duration +import java.time.Instant +import org.apache.pekko.util.JavaDurationConverters._ +import scala.util.Success + +trait ClockDef { + def nanoTime: TaskDef[java.lang.Long] + def now: TaskDef[Instant] + def sleep(duration: Duration): TaskDef[Unit] +} + +object ClockDef { + val nanoTime: TaskDef[java.lang.Long] = GetRuntimeDef.flatMap(_.clock.nanoTime) + val now: TaskDef[Instant] = GetRuntimeDef.flatMap(_.clock.now) + def sleep(duration: Duration): TaskDef[Unit] = GetRuntimeDef.flatMap(_.clock.sleep(duration)) + + val system: ClockDef = new ClockDef { + override def nanoTime = TaskDef.succeed(System.nanoTime) + override def now = TaskDef.succeed(Instant.now()) + override def sleep(duration: Duration) = GetRuntimeDef.flatMap(runtime => + CallbackDef { cb => + val scheduled = runtime.materializer.scheduleOnce(duration.asScala, + () => { + cb(Success(())) + }) + TaskDef.succeed(() -> { + scheduled.cancel() + }) + } + ) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/FiberDef.scala b/stream/src/main/scala/org/apache/pekko/task/FiberDef.scala new file mode 100644 index 00000000000..79ff997152c --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/FiberDef.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task + +import java.util.concurrent.atomic.AtomicReference +import scala.collection.immutable.HashSet +import scala.util.Try + +trait FiberDef[+T] { base => + def join: TaskDef[T] + + def interruptAndGet: TaskDef[T] = InterruptDef(this) + + def interrupt: TaskDef[Unit] = interruptAndGet.unit.catchSome { + case _: InterruptedException => () + } + + private[task] def onComplete(handler: Try[T] => Unit): Unit + private[task] def interruptNow(): TaskDef[Unit] + + def map[U](fn: T => U): FiberDef[U] = new FiberDef[U] { + override def join = base.join.map(fn) + override def onComplete(handler: Try[U] => Unit) = base.onComplete(res => handler(res.map(fn))) + override def interruptNow() = base.interruptNow() + } +} + +object FiberDef { + def apply[T](rt: FiberRuntime[T]): FiberDef[T] = new FiberDef[T] { + override def join = AwaitDef(rt.result) + override def onComplete(handler: Try[T] => Unit) = rt.result.onComplete(handler) + override def interruptNow() = rt.interruptNow() + } +} + +object FiberRuntime { + type OnInterrupt = TaskDef[_] +} + +class FiberRuntime[T] { + import FiberRuntime._ + + val result = new PromiseDef[T] + + case class State( + interruptable: Boolean = true, + interrupted: Boolean = false, + onInterrupt: Set[OnInterrupt] = HashSet.empty + ) { + def shouldInterrupt: Boolean = interruptable && interrupted + + def interrupt() = copy(interrupted = true) + + def addOnInterrupt(handler: OnInterrupt) = copy(onInterrupt = onInterrupt + handler) + def removeOnInterrupt(handler: OnInterrupt) = copy(onInterrupt = onInterrupt - handler) + } + + private def invokeOnInterrupt(): TaskDef[Unit] = { + val s = state.getAndUpdate(s => if (s.shouldInterrupt) s.copy(onInterrupt = HashSet.empty) else s) + if (s.shouldInterrupt) { + s.onInterrupt.foldLeft(TaskDef.unit)(_.before(_)) + } else { + TaskDef.unit + } + } + + def onInterrupt(handler: OnInterrupt): TaskDef[Unit] = { + state.updateAndGet(_.addOnInterrupt(handler)) + invokeOnInterrupt() + } + + def removeOnInterrupt(handler: OnInterrupt): Unit = { + state.updateAndGet(_.removeOnInterrupt(handler)) + } + + private val state = new AtomicReference(State()) + + /** Sends the interrupt signal and invokes handlers. Doesn't join on the fiber. */ + private[task] def interruptNow(): TaskDef[Unit] = { + // We only set the interrupted flag if we're still running. + result.whenRunning { + state.getAndUpdate(_.interrupt()) + } + + // Invoke the handlers if we can interrupt now as well. Otherwise, "withInterruptable" will take care of it later. + invokeOnInterrupt() + } + + def shouldInterrupt: Boolean = state.get.shouldInterrupt + + /** Sets the interruptable flag, and returns the old state of the flag and any callbacks that must be made now. */ + private[task] def setInterruptable(i: Boolean): (Boolean, TaskDef[Unit]) = { + val s = state.getAndUpdate(_.copy(interruptable = i)) + (s.interruptable, invokeOnInterrupt()) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/PromiseDef.scala b/stream/src/main/scala/org/apache/pekko/task/PromiseDef.scala new file mode 100644 index 00000000000..b830d4de020 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/PromiseDef.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task + +import scala.util.Try +import java.util.concurrent.atomic.AtomicReference + +object PromiseDef { + def make[T]: TaskDef[PromiseDef[T]] = TaskDef.succeed(new PromiseDef) +} + +class PromiseDef[T] { + type OnComplete = Try[T] => Unit + + val state = new AtomicReference(State()) + + sealed trait Completion + case class Running(onComplete: Seq[OnComplete] = Seq.empty) extends Completion { + def addOnComplete(f: OnComplete) = copy(onComplete = onComplete :+ f) + def invokeOnComplete(res: Try[T]): Unit = for (f <- onComplete) f(res) + } + case class Completed(result: Try[T]) extends Completion + case class State(completion: Completion = Running()) { + def complete(res: Try[T]) = completion match { + case Running(_) => copy(completion = Completed(res)) + case _ => this + } + def addOnComplete(f: OnComplete) = completion match { + case r @ Running(_) => copy(completion = r.addOnComplete(f)) + case _ => this + } + } + + def whenRunning(f: => Unit) = { + state.getAndUpdate(s => + s match { + case State(Running(_)) => f; s + case _ => s + }) + } + + def completeWith(task: TaskDef[T]): TaskDef[Unit] = CompleteDef(this, task) + + def await: TaskDef[T] = AwaitDef(this) + + def complete(res: Try[T]): Unit = { + // Complete the state, but only if it's still running + val s = state.getAndUpdate(_.complete(res)) + + s match { + case State(r @ Running(_)) => + // Invoke callbacks if we were running before + r.invokeOnComplete(res) + case _ => + // We were already completed, so nothing to do. + } + } + + def onComplete(f: OnComplete): Unit = { + // Either it's already completed (don't add a listener), or it is completed (add a listener) + val s = state.getAndUpdate(_.addOnComplete(f)) + + // Directly apply the callback if we're already completed + s match { + case State(Completed(res)) => + f.apply(res) + case _ => + } + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/ResourceDef.scala b/stream/src/main/scala/org/apache/pekko/task/ResourceDef.scala new file mode 100644 index 00000000000..5893a22724d --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/ResourceDef.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task + +trait AbstractResource[+T] { + def definition: ResourceDef[T] +} + +object ResourceDef { + def acquireRelease[T](acquire: TaskDef[T])(release: T => TaskDef[_]): ResourceDef[T] = + ResourceDef(acquire.map(t => (t, release(t).unit))) + + def succeedTask[T](task: => TaskDef[T]) = ResourceDef(task.map(value => (value, TaskDef.unit))) + + def succeed[T](value: => T) = ResourceDef(TaskDef.succeed((value, TaskDef.unit))) + + def all[T](elements: Iterable[ResourceDef[T]]): ResourceDef[Seq[T]] = elements.size match { + case 0 => succeed(Seq.empty) + case 1 => elements.head.map(Vector(_)) + case _ => elements.foldLeft(succeed(Seq.empty[T]))((result, elem) => result.flatMap(seq => elem.map(t => seq :+ t))) + } +} + +case class ResourceDef[+T](create: TaskDef[(T, TaskDef[Unit])]) { + def map[U](fn: T => U): ResourceDef[U] = ResourceDef(create.map { case (t, cleanup) => (fn(t), cleanup) }) + + def mapTask[U](fn: T => TaskDef[U]): ResourceDef[U] = ResourceDef(create.flatMap { case (t, cleanup) => + fn(t).map(u => (u, cleanup)) + }) + + def flatMap[U](fn: T => ResourceDef[U]): ResourceDef[U] = ResourceDef(create.flatMap { case (t, cleanupT) => + fn(t).create.map { case (u, cleanupU) => + (u, cleanupU.andThen(cleanupT)) + } + }) + + def zip[U, R](that: ResourceDef[U], combine: (T, U) => R): ResourceDef[R] = flatMap(t => that.map(u => combine(t, u))) + + def use[U](fn: T => TaskDef[U]): TaskDef[U] = { + TaskDef.uninterruptableMask[U](restore => + create.flatMap { case (t, cleanup) => + restore(fn(t)).onComplete(_ => cleanup) + }) + } + + def fork: ResourceDef[FiberDef[T]] = + ResourceDef.acquireRelease(ForkDef(create))(fiber => InterruptDef(fiber).flatMap(_._2)).map(_.map(_._1)) +} diff --git a/stream/src/main/scala/org/apache/pekko/task/RunningGraph.scala b/stream/src/main/scala/org/apache/pekko/task/RunningGraph.scala new file mode 100644 index 00000000000..5d5f0efc441 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/RunningGraph.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task + +object RunningGraph { + def create[T](_completion: AbstractTask[T], _interrupt: AbstractTask[_]): RunningGraph[T] = new RunningGraph[T] { + def completion = _completion + def interrupt = _interrupt + } +} + +/** + * A interface that represents a currently running graph with an eventual result, and the ability to + * be interrupted. + */ +trait RunningGraph[+T] { + + /** Returns a Task that will complete with the completion result of the graph. */ + def completion: AbstractTask[T] + + /** Returns a Task that, when invoked, will interrupt the graph. */ + def interrupt: AbstractTask[_] +} diff --git a/stream/src/main/scala/org/apache/pekko/task/TaskDef.scala b/stream/src/main/scala/org/apache/pekko/task/TaskDef.scala new file mode 100644 index 00000000000..8af21e17024 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/TaskDef.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task + +import scala.util.{ Failure, Success, Try } +import java.time.Duration +import org.apache.pekko.dispatch.Dispatchers +import org.apache.pekko.stream.Graph +import org.apache.pekko.stream.ClosedShape +import java.util.concurrent.atomic.AtomicInteger + +trait AbstractTask[+T] { + def definition: TaskDef[T] +} + +object TaskDef { + def run[T](graph: Graph[ClosedShape, RunningGraph[T]]): TaskDef[T] = GetRuntimeDef.flatMap { runtime => + val running = runtime.materializer.materialize(graph) + OnInterruptDef(running.completion.definition, running.interrupt.definition) + } + def fromTry[T](res: => Try[T]) = ValueDef(() => res) + def fail[T](x: => Throwable): TaskDef[T] = ValueDef(() => Failure(x)) + def succeed[T](t: => T): TaskDef[T] = ValueDef(() => Success(t)) + def succeedOn[T](dispatcher: String)(t: => T): TaskDef[T] = ValueDef(() => Success(t), dispatcher) + def unit: TaskDef[Unit] = ValueDef(() => Success(())) + def narrow[T](t: TaskDef[? <: T]): TaskDef[T] = t + def never[T]: TaskDef[T] = CallbackDef(_ => unit) + + def uninterruptableMask[T](fn: RestorerDef => TaskDef[T]): TaskDef[T] = { + InterruptabilityDef(false, r => fn(r)) + } + + /** Returns a task that runs all the given task in sequence, returning all results. */ + def all[T](elements: Iterable[TaskDef[T]]): TaskDef[Seq[T]] = elements.size match { + case 0 => succeed(Seq.empty) + case 1 => elements.head.map(Vector(_)) + case _ => elements.foldLeft(succeed(Seq.empty[T]))((result, elem) => result.flatMap(seq => elem.map(t => seq :+ t))) + } + + /** Returns a task that runs all the given task in parallel, returning all results. If any task fails, the rest is interrupted. */ + def allPar[T](tasks: Iterable[TaskDef[T]]): TaskDef[Seq[T]] = (for { + done <- PromiseDef.make[Unit].toResource + waiting = new AtomicInteger(tasks.size) + fibers <- ResourceDef.all(tasks.toVector.map(task => + forkResource(task.onComplete { res => + TaskDef.succeed { + val remaining = waiting.decrementAndGet() + if (res.isFailure) { + done.complete(Failure(res.failed.get)) + } else if (remaining <= 0) { + done.complete(Success(())) + } + } + }))) + _ <- done.await.toResource + r <- TaskDef.all(fibers.map(_.join)).toResource + } yield r).use(t => TaskDef.succeed(t)) + + def forkDaemon[T](task: TaskDef[T]): TaskDef[FiberDef[T]] = ForkDef(task) + + def forkResource[T](task: TaskDef[T]): ResourceDef[FiberDef[T]] = + ResourceDef.acquireRelease[FiberDef[T]](ForkDef(task))(InterruptDef(_)) + + /** + * Returns a Task which executes all given tasks in parallel, returning whichever of them + * completes first, and the interrupts the rest. + */ + def raceAll[T](tasks: Iterable[TaskDef[T]]): TaskDef[T] = (for { + result <- PromiseDef.make[T].toResource + _ <- ResourceDef.all(tasks.toVector.map(task => forkResource(result.completeWith(task)))) + r <- result.await.toResource + } yield r).use(t => TaskDef.succeed(t)) +} +sealed trait TaskDef[+T] { + import TaskDef._ + + def map[U](fn: T => U): TaskDef[U] = MapDef(this, (res: Try[T]) => res.map(fn)) + def as[U](value: U): TaskDef[U] = map(_ => value) + def unit: TaskDef[Unit] = as(()) + + def zipPar[U](that: TaskDef[U]): TaskDef[(T, U)] = + TaskDef.allPar(Seq(this, that)).map(res => (res(0).asInstanceOf[T], res(1).asInstanceOf[U])) + + def asResult[U](result: Try[U]): TaskDef[U] = flatMapResult(_ => fromTry(result)) + + def flatMap[U](fn: T => TaskDef[U]): TaskDef[U] = FlatMapDef(this, + (_: Try[T]) match { + case Success(t) => fn(t) + case Failure(x) => fail(x) + }) + + def flatMapResult[U](fn: Try[T] => TaskDef[U]): TaskDef[U] = + FlatMapDef[T, U](this, res => fn(res)) + + def flatMapError[T1 >: T](fn: Throwable => TaskDef[T1]): TaskDef[T1] = FlatMapDef(this, + (_: Try[T]) match { + case Success(t) => succeed(t) + case Failure(x) => fn(x) + }) + + def onComplete(fn: Try[T] => TaskDef[?]): TaskDef[T] = + FlatMapDef[T, T](this, res => fn(res).asResult(res)) + + def andThen[U](that: TaskDef[U]): TaskDef[U] = FlatMapDef(this, (_: Try[T]) => that) + + def before(that: TaskDef[Any]): TaskDef[T] = FlatMapDef(this, (res: Try[T]) => that.asResult(res)) + + def toResource: ResourceDef[T] = ResourceDef(this.map(t => (t, TaskDef.unit))) + + def after(duration: Duration) = GetRuntimeDef.flatMap { rt => rt.clock.sleep(duration) }.andThen(this) + + def catchSome[T1 >: T](pf: PartialFunction[Throwable, T1]): TaskDef[T1] = catchSomeWith(pf.andThen(succeed(_))) + + def catchSomeWith[T1 >: T](pf: PartialFunction[Throwable, TaskDef[T1]]): TaskDef[T1] = + flatMapError(x => pf.applyOrElse(x, (_: Throwable) => fail(x))) +} + +/** A TaskDef that retrieves the current runtime */ +case object GetRuntimeDef extends TaskDef[AbstractRuntime] + +/** A TaskDef that attaches a temporary extra onInterrupt handler to a given task */ +case class OnInterruptDef[T](base: TaskDef[T], onInterrupt: TaskDef[_]) extends TaskDef[T] + +/** + * A TaskDef that turns a callback into a Task. The [launch] function receives a "callback" argument which + * can be used to complete the task. [launch] should return a task itself, which will be invoked to interrupt + * the task. + */ +case class CallbackDef[T](launch: (Try[T] => Unit) => TaskDef[_]) extends TaskDef[T] { + type Res = T +} + +/** A TaskDef that runs a function to return a value. */ +case class ValueDef[+T](value: () => Try[T], dispatcher: String = Dispatchers.DefaultDispatcherId) extends TaskDef[T] + +/** A TaskDef that runs a function with the result of another TaskDef as input, returning a new value */ +case class MapDef[T, U](base: TaskDef[T], fn: Try[T] => Try[U]) extends TaskDef[U] { + type Base = T +} + +/** A TaskDef that runs a function with the result of another TaskDef as input, returning another TaskDef */ +case class FlatMapDef[T, U](base: TaskDef[T], fn: Try[T] => TaskDef[U]) extends TaskDef[U] { + type Base = T +} + +/** A TaskDef that waits for the given promise to complete, returning its result. */ +case class AwaitDef[T](promise: PromiseDef[T]) extends TaskDef[T] + +/** A TaskDef that completes a promise with the result of a task. */ +case class CompleteDef[T](promise: PromiseDef[T], result: TaskDef[T]) extends TaskDef[Unit] { + type Target = T +} + +/** A TaskDef that starts running another TaskDef in the background, returning a Fiber to interact with that process. */ +case class ForkDef[T](task: TaskDef[T]) extends TaskDef[FiberDef[T]] { + type Res = T +} + +/** + * A TaskDef that interrupts a background process, completing when + * that process has completely stopped. The returned task holds the + * fiber result if it was already completed, or a failed + * InterruptException if the fiber was interrupted. + */ +case class InterruptDef[T](fiber: FiberDef[T]) extends TaskDef[T] { + type Res = T +} + +//case class OnCompleteDef[T](fiber: FiberDef[T], fn: Try[T] => TaskDef[Unit]) extends TaskDef[Unit] + +trait RestorerDef { + def apply[T](task: TaskDef[T]): TaskDef[T] +} + +/** + * A TaskDef that changes whether the fiber running it is allowed to be interrupted. [mkTask] is + * invoked to create the actual task to run. That function can use the [Restorer] argument to wrap tasks + * that should run with the earlier value of the interruptable flag. + */ +case class InterruptabilityDef[T](interruptable: Boolean, mkTask: RestorerDef => TaskDef[T]) extends TaskDef[T] diff --git a/stream/src/main/scala/org/apache/pekko/task/scaladsl/Fiber.scala b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Fiber.scala new file mode 100644 index 00000000000..5e1644b9be0 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Fiber.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.scaladsl + +import org.apache.pekko.task.FiberDef + +class Fiber[+T](definition: FiberDef[T]) { + def join: Task[T] = Task(definition.join) + + def interruptAndGet: Task[T] = Task(definition.interruptAndGet) + + def interrupt: Task[Unit] = Task(definition.interrupt) +} diff --git a/stream/src/main/scala/org/apache/pekko/task/scaladsl/Flows.scala b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Flows.scala new file mode 100644 index 00000000000..55700342c00 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Flows.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.scaladsl + +import org.apache.pekko.task.AbstractTask +import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.NotUsed + +object Flows { + def mapAsync[T, U](parallelism: Int)(fn: T => AbstractTask[U]): Flow[T, U, NotUsed] = { + Flow.fromMaterializer { (mat, _) => + val runtime = Runtime(mat) + Flow.apply[T].mapAsync(parallelism)(t => runtime.runAsync(fn(t))) + }.mapMaterializedValue(_ => NotUsed) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/scaladsl/Promise.scala b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Promise.scala new file mode 100644 index 00000000000..9da08c47599 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Promise.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.scaladsl + +import org.apache.pekko.task.AwaitDef +import org.apache.pekko.task.CompleteDef +import org.apache.pekko.task.PromiseDef + +object Promise { + def make[T]: Task[Promise[T]] = Task.succeed(new Promise(new PromiseDef())) +} + +class Promise[T](definition: PromiseDef[T]) { + def await: Task[T] = Task(AwaitDef(definition)) + + def completeWith(task: Task[T]): Task[Unit] = Task(CompleteDef(definition, task.definition)) + + def succeed(result: => T) = completeWith(Task.succeed(result)) + + def fail(x: => Throwable) = completeWith(Task.fail(x)) +} diff --git a/stream/src/main/scala/org/apache/pekko/task/scaladsl/Resource.scala b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Resource.scala new file mode 100644 index 00000000000..714d47eb97c --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Resource.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.scaladsl + +import org.apache.pekko.task.AbstractTask +import org.apache.pekko.task.ResourceDef +import org.apache.pekko.task.AbstractResource + +object Resource { + def acquireRelease[T](acquire: AbstractTask[T], release: T => AbstractTask[_]): Resource[T] = + Resource(ResourceDef.acquireRelease(acquire.definition)(release.andThen(_.definition))) + + def succeed[T](value: => T) = Resource(ResourceDef.succeed(value)) + + def succeedTask[T](task: => AbstractTask[T]) = Resource(ResourceDef.succeedTask(task.definition)) + + def all[T](elements: Iterable[AbstractResource[T]]): Resource[Seq[T]] = + Resource(ResourceDef.all(elements.map(_.definition))) + + private[scaladsl] def resource[T](r: AbstractResource[T]): Resource[T] = + if (r.isInstanceOf[Resource[T]]) r.asInstanceOf[Resource[T]] else Resource(r.definition) +} + +case class Resource[+T](definition: ResourceDef[T]) extends AbstractResource[T] { + import Resource._ + + def map[U](fn: T => U): Resource[U] = Resource(definition.map(fn)) + + def mapTask[U](fn: T => AbstractTask[U]): Resource[U] = Resource(definition.mapTask(fn.andThen(_.definition))) + + def flatMap[U](fn: T => AbstractResource[U]): Resource[U] = Resource(definition.flatMap(fn.andThen(_.definition))) + + def zipWith[U, R](that: AbstractResource[U])(combine: (T, U) => R): Resource[R] = + flatMap(t => resource(that).map(u => combine(t, u))) + + def zip[U](that: Resource[U]): Resource[(T, U)] = zipWith(that)((_, _)) + + def use[U](fn: T => AbstractTask[U]): Task[U] = Task(definition.use(fn.andThen(_.definition))) +} diff --git a/stream/src/main/scala/org/apache/pekko/task/scaladsl/Runtime.scala b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Runtime.scala new file mode 100644 index 00000000000..bb93c876c14 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Runtime.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.scaladsl + +import org.apache.pekko.stream.Materializer +import org.apache.pekko.task.ClockDef +import org.apache.pekko.task.AbstractRuntime +import org.apache.pekko.task.AbstractTask +import scala.concurrent.Future +import org.apache.pekko.task.FiberRuntime + +object Runtime { + def apply(materializer: Materializer, clock: ClockDef = ClockDef.system): Runtime = new Runtime(materializer, clock) +} + +class Runtime(materializer: Materializer, clock: ClockDef = ClockDef.system) + extends AbstractRuntime(materializer, clock) { + def runAsync[T](task: AbstractTask[T]): Future[T] = { + val p = scala.concurrent.Promise[T]() + run(new FiberRuntime[T](), task.definition)(res => p.complete(res)) + p.future + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/scaladsl/Sinks.scala b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Sinks.scala new file mode 100644 index 00000000000..18903ad3cd9 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Sinks.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.scaladsl + +import org.apache.pekko + +import pekko.Done +import pekko.dispatch.ExecutionContexts +import pekko.stream.scaladsl.Sink +import pekko.task.AbstractTask + +import scala.concurrent.Future + +object Sinks { + def foreach[T](fn: T => AbstractTask[Unit]): Sink[T, Future[Done]] = { + Sink.fromMaterializer { (mat, _) => + val runtime = Runtime(mat) + Sink.foreachAsync[T](1)(t => runtime.runAsync(fn(t))) + }.mapMaterializedValue(f => f.flatMap(r => r)(ExecutionContexts.parasitic)) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/task/scaladsl/Task.scala b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Task.scala new file mode 100644 index 00000000000..9e34f689084 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/task/scaladsl/Task.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.task.scaladsl + +import org.apache.pekko.task.AbstractTask +import org.apache.pekko.task.RestorerDef +import org.apache.pekko.task.TaskDef + +import scala.util.Try + +object Task { + def fromTry[T](res: => Try[T]) = Task(TaskDef.fromTry(res)) + + def fail[T](x: => Throwable): Task[T] = Task(TaskDef.fail(x)) + + def succeed[T](value: => T): Task[T] = Task(TaskDef.succeed(value)) + + val unit: Task[Unit] = Task(TaskDef.unit) + + trait Restorer { + def apply[T](task: AbstractTask[T]): Task[T] + } + object Restorer { + def apply(r: RestorerDef) = new Restorer { + override def apply[T](task: AbstractTask[T]): Task[T] = Task(r.apply(task.definition)) + } + } + + def uninterruptableMask[T](fn: Restorer => AbstractTask[T]): Task[T] = { + Task(TaskDef.uninterruptableMask(restorerDef => fn(Restorer(restorerDef)).definition)) + } + + def all[T](tasks: Iterable[Task[T]]): Task[Seq[T]] = Task(TaskDef.all(tasks.map(_.definition))) + + def raceAll[T](tasks: Iterable[Task[T]]): Task[T] = Task(TaskDef.raceAll(tasks.map(_.definition))) + + private[scaladsl] def task[T](t: AbstractTask[T]): Task[T] = + if (t.isInstanceOf[Task[T]]) t.asInstanceOf[Task[T]] else Task(t.definition) +} + +case class Task[+T](definition: TaskDef[T]) extends AbstractTask[T] { + import Task._ + + def map[U](fn: T => U): Task[U] = Task(definition.map(fn)) + + def as[U](value: U): Task[U] = map(_ => value) + + def asResult[U](result: Try[U]): Task[U] = flatMapResult(_ => fromTry(result)) + + def unit: Task[Unit] = as(()) + + def flatMap[U](fn: T => AbstractTask[U]): Task[U] = Task(definition.flatMap(fn.andThen(_.definition))) + + def zipWith[U, R](that: AbstractTask[U])(combine: (T, U) => R): Task[R] = + flatMap(t => task(that).map(u => combine(t, u))) + + def zip[U](that: AbstractTask[U]): Task[(T, U)] = zipWith(that)((_, _)) + + def andThen[U](that: AbstractTask[U]): Task[U] = flatMap(_ => that) + + def flatMapResult[U](fn: Try[T] => AbstractTask[U]): Task[U] = + Task(definition.flatMapResult(fn.andThen(_.definition))) + + def onComplete(fn: Try[T] => AbstractTask[?]) = Task(definition.onComplete(fn.andThen(_.definition))) + + def forkDaemon: Task[Fiber[T]] = Task(TaskDef.forkDaemon(definition).map(f => new Fiber(f))) + + def forkResource: Resource[Fiber[T]] = Resource(TaskDef.forkResource(definition).map(f => new Fiber(f))) + + def toResource: Resource[T] = Resource.succeedTask(this) +}