diff --git a/core/src/main/java/io/temporal/samples/nexuscancellation/README.MD b/core/src/main/java/io/temporal/samples/nexuscancellation/README.MD new file mode 100644 index 000000000..cf39c85c4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscancellation/README.MD @@ -0,0 +1,36 @@ +# Nexus Cancellation + +This sample shows how to cancel a Nexus operation from a caller workflow. + +From more details on Nexus and how to setup to run this samples please see the [Nexus Sample](../nexus/README.MD). + +In separate terminal windows: + +### Nexus handler worker + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexuscancellation.handler.HandlerWorker \ + --args="-target-host localhost:7233 -namespace my-target-namespace" +``` + +### Nexus caller worker + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexuscancellation.caller.CallerWorker \ + --args="-target-host localhost:7233 -namespace my-caller-namespace" +``` + +### Start caller workflow + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexuscancellation.caller.CallerStarter \ + --args="-target-host localhost:7233 -namespace my-caller-namespace" +``` + +### Output + +which should result in: +``` +INFO i.t.s.n.caller.CallerStarter - Started workflow workflowId: 326732dd-a2b1-4de7-9ddd-dcee4f9f0229 runId: d580499f-79d5-461d-bd49-6248b4e522ae +INFO i.t.s.n.caller.CallerStarter - Workflow result: Hallo Nexus πŸ‘‹ +``` diff --git a/core/src/main/java/io/temporal/samples/nexuscancellation/caller/CallerStarter.java b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/CallerStarter.java new file mode 100644 index 000000000..1ee705b1a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/CallerStarter.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscancellation.caller; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.nexus.options.ClientOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerStarter { + private static final Logger logger = LoggerFactory.getLogger(CallerStarter.class); + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(args); + + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder().setTaskQueue(CallerWorker.DEFAULT_TASK_QUEUE_NAME).build(); + HelloCallerWorkflow helloWorkflow = + client.newWorkflowStub(HelloCallerWorkflow.class, workflowOptions); + WorkflowExecution execution = WorkflowClient.start(helloWorkflow::hello, "Nexus"); + logger.info( + "Started workflow workflowId: {} runId: {}", + execution.getWorkflowId(), + execution.getRunId()); + logger.info("Workflow result: {}", helloWorkflow.hello("Nexus")); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscancellation/caller/CallerWorker.java b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/CallerWorker.java new file mode 100644 index 000000000..37f5eaa5f --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/CallerWorker.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscancellation.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.nexus.options.ClientOptions; +import io.temporal.samples.nexus.service.NexusService; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; +import java.util.Collections; + +public class CallerWorker { + public static final String DEFAULT_TASK_QUEUE_NAME = "my-caller-workflow-task-queue"; + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(args); + + WorkerFactory factory = WorkerFactory.newInstance(client); + + Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes( + WorkflowImplementationOptions.newBuilder() + .setNexusServiceOptions( + Collections.singletonMap( + NexusService.class.getSimpleName(), + NexusServiceOptions.newBuilder().setEndpoint("my-nexus-endpoint-name").build())) + .build(), + HelloCallerWorkflowImpl.class); + + factory.start(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscancellation/caller/HelloCallerWorkflow.java b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/HelloCallerWorkflow.java new file mode 100644 index 000000000..7f77cc68c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/HelloCallerWorkflow.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscancellation.caller; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface HelloCallerWorkflow { + @WorkflowMethod + String hello(String message); +} diff --git a/core/src/main/java/io/temporal/samples/nexuscancellation/caller/HelloCallerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/HelloCallerWorkflowImpl.java new file mode 100644 index 000000000..bc3666147 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscancellation/caller/HelloCallerWorkflowImpl.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscancellation.caller; + +import static io.temporal.samples.nexus.service.NexusService.Language.*; + +import io.temporal.failure.CanceledFailure; +import io.temporal.failure.NexusOperationFailure; +import io.temporal.samples.nexus.service.NexusService; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; + +public class HelloCallerWorkflowImpl implements HelloCallerWorkflow { + public static final Logger log = Workflow.getLogger(HelloCallerWorkflowImpl.class); + private static final NexusService.Language[] languages = + new NexusService.Language[] {EN, FR, DE, ES, TR}; + NexusService nexusService = + Workflow.newNexusServiceStub( + NexusService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public String hello(String message) { + List> results = new ArrayList<>(languages.length); + + /* + * Create our CancellationScope. Within this scope we call the nexus operation asynchronously + * hello method asynchronously for each of our defined languages. + */ + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + for (NexusService.Language language : languages) { + results.add( + Async.function( + nexusService::hello, new NexusService.HelloInput(message, language))); + } + }); + + /* + * Execute all nexus operations within the CancellationScope. Note that this execution is + * non-blocking as the code inside our cancellation scope is also non-blocking. + */ + scope.run(); + + // We use "anyOf" here to wait for one of the nexus operation invocations to return + NexusService.HelloOutput result = Promise.anyOf(results).get(); + + // Trigger cancellation of all uncompleted nexus operations invocations within the cancellation + // scope + scope.cancel(); + // Optionally, wait for all nexus operations to complete + // + // Note: Once the workflow completes any pending cancellation requests are dropped by the + // server. + for (Promise promise : results) { + try { + promise.get(); + } catch (NexusOperationFailure e) { + // If the operation was cancelled, we can ignore the failure + if (e.getCause() instanceof CanceledFailure) { + log.info("Operation was cancelled"); + continue; + } + throw e; + } + } + return result.getMessage(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscancellation/handler/HandlerWorker.java b/core/src/main/java/io/temporal/samples/nexuscancellation/handler/HandlerWorker.java new file mode 100644 index 000000000..a70a47abe --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscancellation/handler/HandlerWorker.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscancellation.handler; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.nexus.handler.NexusServiceImpl; +import io.temporal.samples.nexus.options.ClientOptions; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +public class HandlerWorker { + public static final String DEFAULT_TASK_QUEUE_NAME = "my-handler-task-queue"; + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(args); + + WorkerFactory factory = WorkerFactory.newInstance(client); + + Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes(HelloHandlerWorkflowImpl.class); + worker.registerNexusServiceImplementation(new NexusServiceImpl()); + + factory.start(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexuscancellation/handler/HelloHandlerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexuscancellation/handler/HelloHandlerWorkflowImpl.java new file mode 100644 index 000000000..246c25e93 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexuscancellation/handler/HelloHandlerWorkflowImpl.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexuscancellation.handler; + +import io.temporal.failure.ApplicationFailure; +import io.temporal.samples.nexus.handler.HelloHandlerWorkflow; +import io.temporal.samples.nexus.service.NexusService; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class HelloHandlerWorkflowImpl implements HelloHandlerWorkflow { + @Override + public NexusService.HelloOutput hello(NexusService.HelloInput input) { + // Sleep for a random duration to simulate some work + Workflow.sleep(Duration.ofSeconds(Workflow.newRandom().nextInt(5))); + switch (input.getLanguage()) { + case EN: + return new NexusService.HelloOutput("Hello " + input.getName() + " πŸ‘‹"); + case FR: + return new NexusService.HelloOutput("Bonjour " + input.getName() + " πŸ‘‹"); + case DE: + return new NexusService.HelloOutput("Hallo " + input.getName() + " πŸ‘‹"); + case ES: + return new NexusService.HelloOutput("Β‘Hola! " + input.getName() + " πŸ‘‹"); + case TR: + return new NexusService.HelloOutput("Merhaba " + input.getName() + " πŸ‘‹"); + } + throw ApplicationFailure.newFailure( + "Unsupported language: " + input.getLanguage(), "UNSUPPORTED_LANGUAGE"); + } +}