Skip to content

Commit 1fd2eef

Browse files
authored
Sample code for Nexus messaging (#776)
* Status checkin * Updating * Moving packages * Remove yarn.lock and node_modules/.yarn-integrity from tracking * Reviewing * Some tweaks * Splitting test cases apart * Tinkering * Reviewing and some updates * Updating a readme * Updated for code review comments * Fixes from code review * Some changes from code reviews * Fixed package name in a readme * Fixed typo
1 parent f279e5f commit 1fd2eef

29 files changed

Lines changed: 1643 additions & 0 deletions

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ Load client configuration from TOML files with programmatic overrides.
173173

174174
- [**Context/Header Propagation**](/core/src/main/java/io/temporal/samples/nexuscontextpropagation): Demonstrates how to propagate context through Nexus operation headers.
175175

176+
- [**Nexus Messaging**](/core/src/main/java/io/temporal/samples/nexusmessaging): Demonstrates how to send signal, update and query messages through Nexus.
177+
This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus
178+
and sends messages to it.
176179
<!-- @@@SNIPEND -->
177180

178181
### Running SpringBoot Samples
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
This sample shows how to expose a long-running Workflow's queries, updates, and signals as Nexus
2+
operations. There are two self-contained examples, each in its own directory:
3+
4+
| | `callerpattern/` | `ondemandpattern/` |
5+
|---|---|--------------------------------------------------------------|
6+
| **Pattern** | Signal an existing Workflow | Create and run Workflows on demand, and send signals to them |
7+
| **Who creates the Workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation |
8+
| **Who knows the Workflow ID?** | Only the handler | The caller chooses and passes it in every operation |
9+
| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` |
10+
11+
Each directory is fully self-contained for clarity. The
12+
`GreetingWorkflow`, `GreetingWorkflowImpl`, `GreetingActivity` and `GreetingActivityImpl` classes are pretty much the same between the two — only the
13+
Nexus service interface and its implementation differ. This highlights that the same Workflow can be
14+
exposed through Nexus in different ways depending on whether the caller needs lifecycle control.
15+
16+
See each directory's README for running instructions.
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
## Caller pattern
2+
3+
The handler worker starts a `GreetingWorkflow` for a User ID.
4+
`NexusGreetingServiceImpl` holds that ID and routes every Nexus operation to it.
5+
The caller's input does not have that Workflow ID as the caller doesn't know it - but the caller sends in the User ID,
6+
and `NexusGreetingServiceImpl` knows how to get the desired Workflow ID from that User ID (see the getWorkflowId call).
7+
8+
HandlerWorker is using the same getWorkflowId call to generate a Workflow ID from a User ID when it launches the Workflow.
9+
10+
The caller Workflow:
11+
1. Queries for supported languages (`getLanguages` — backed by a `@QueryMethod`)
12+
2. Changes the language to Arabic (`setLanguage` — backed by an `@UpdateMethod` that calls an activity)
13+
3. Confirms the change via a second query (`getLanguage`)
14+
4. Approves the Workflow (`approve` — backed by a `@SignalMethod`)
15+
16+
### Running
17+
18+
Start a Temporal server:
19+
20+
```bash
21+
temporal server start-dev
22+
```
23+
24+
Create the namespaces and Nexus endpoint:
25+
26+
```bash
27+
temporal operator namespace create --namespace nexus-messaging-handler-namespace
28+
temporal operator namespace create --namespace nexus-messaging-caller-namespace
29+
30+
temporal operator nexus endpoint create \
31+
--name nexus-messaging-nexus-endpoint \
32+
--target-namespace nexus-messaging-handler-namespace \
33+
--target-task-queue nexus-messaging-handler-task-queue
34+
```
35+
36+
This sample loads connection settings from `ClientConfigProfile`. The
37+
`nexus-messaging-handler` and `nexus-messaging-caller` profiles are defined in
38+
`core/src/main/resources/config.toml`. You can override settings with environment
39+
variables or by editing the TOML file (see the `envconfig` sample for details).
40+
41+
In one terminal, start the handler worker:
42+
43+
```bash
44+
./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusmessaging.callerpattern.handler.HandlerWorker
45+
```
46+
47+
In a second terminal, start the caller worker:
48+
49+
```bash
50+
./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusmessaging.callerpattern.caller.CallerWorker
51+
```
52+
53+
In a third terminal, run the following command to start the example:
54+
55+
```bash
56+
./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusmessaging.callerpattern.caller.CallerStarter
57+
```
58+
59+
Expected output:
60+
61+
```
62+
Supported languages: [CHINESE, ENGLISH]
63+
Language changed: ENGLISH -> ARABIC
64+
Workflow approved
65+
```
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.temporal.samples.nexusmessaging.callerpattern.caller;
2+
3+
import io.temporal.client.WorkflowClient;
4+
import io.temporal.client.WorkflowOptions;
5+
import io.temporal.envconfig.ClientConfigProfile;
6+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
7+
import io.temporal.serviceclient.WorkflowServiceStubs;
8+
import java.nio.file.Paths;
9+
import java.util.List;
10+
import java.util.UUID;
11+
12+
public class CallerStarter {
13+
14+
public static void main(String[] args) {
15+
ClientConfigProfile profile;
16+
try {
17+
String configFilePath =
18+
Paths.get(CallerStarter.class.getResource("/config.toml").toURI()).toString();
19+
profile =
20+
ClientConfigProfile.load(
21+
LoadClientConfigProfileOptions.newBuilder()
22+
.setConfigFilePath(configFilePath)
23+
.setConfigFileProfile(CallerWorker.CONFIG_PROFILE)
24+
.build());
25+
} catch (Exception e) {
26+
throw new RuntimeException("Failed to load client configuration", e);
27+
}
28+
29+
WorkflowServiceStubs service =
30+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
31+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
32+
33+
CallerWorkflow workflow =
34+
client.newWorkflowStub(
35+
CallerWorkflow.class,
36+
WorkflowOptions.newBuilder()
37+
.setWorkflowId("nexus-messaging-caller-" + UUID.randomUUID())
38+
.setTaskQueue(CallerWorker.TASK_QUEUE)
39+
.build());
40+
41+
// Launch the worker, passing in an identifier which the Nexus service will use
42+
// to find the matching workflow (See NexusGreetingServiceImpl::getWorkflowId)
43+
List<String> log = workflow.run("user-1");
44+
log.forEach(System.out::println);
45+
}
46+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.temporal.samples.nexusmessaging.callerpattern.caller;
2+
3+
import io.temporal.client.WorkflowClient;
4+
import io.temporal.envconfig.ClientConfigProfile;
5+
import io.temporal.envconfig.LoadClientConfigProfileOptions;
6+
import io.temporal.serviceclient.WorkflowServiceStubs;
7+
import io.temporal.worker.Worker;
8+
import io.temporal.worker.WorkerFactory;
9+
import io.temporal.worker.WorkflowImplementationOptions;
10+
import io.temporal.workflow.NexusServiceOptions;
11+
import java.nio.file.Paths;
12+
import java.util.Collections;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
public class CallerWorker {
17+
private static final Logger logger = LoggerFactory.getLogger(CallerWorker.class);
18+
19+
static final String CONFIG_PROFILE = "nexus-messaging-caller";
20+
public static final String TASK_QUEUE = "nexus-messaging-caller-task-queue";
21+
static final String NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint";
22+
23+
public static void main(String[] args) throws InterruptedException {
24+
ClientConfigProfile profile;
25+
try {
26+
String configFilePath =
27+
Paths.get(CallerWorker.class.getResource("/config.toml").toURI()).toString();
28+
profile =
29+
ClientConfigProfile.load(
30+
LoadClientConfigProfileOptions.newBuilder()
31+
.setConfigFilePath(configFilePath)
32+
.setConfigFileProfile(CONFIG_PROFILE)
33+
.build());
34+
} catch (Exception e) {
35+
throw new RuntimeException("Failed to load client configuration", e);
36+
}
37+
38+
WorkflowServiceStubs service =
39+
WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions());
40+
WorkflowClient client = WorkflowClient.newInstance(service, profile.toWorkflowClientOptions());
41+
42+
WorkerFactory factory = WorkerFactory.newInstance(client);
43+
Worker worker = factory.newWorker(TASK_QUEUE);
44+
worker.registerWorkflowImplementationTypes(
45+
WorkflowImplementationOptions.newBuilder()
46+
.setNexusServiceOptions(
47+
// The key must match the @Service-annotated interface name.
48+
Collections.singletonMap(
49+
"NexusGreetingService",
50+
NexusServiceOptions.newBuilder().setEndpoint(NEXUS_ENDPOINT).build()))
51+
.build(),
52+
CallerWorkflowImpl.class);
53+
54+
factory.start();
55+
logger.info("Caller worker started, ctrl+c to exit");
56+
Thread.currentThread().join();
57+
}
58+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.temporal.samples.nexusmessaging.callerpattern.caller;
2+
3+
import io.temporal.workflow.WorkflowInterface;
4+
import io.temporal.workflow.WorkflowMethod;
5+
import java.util.List;
6+
7+
@WorkflowInterface
8+
public interface CallerWorkflow {
9+
@WorkflowMethod
10+
List<String> run(String userId);
11+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.temporal.samples.nexusmessaging.callerpattern.caller;
2+
3+
import io.temporal.failure.ApplicationFailure;
4+
import io.temporal.samples.nexusmessaging.callerpattern.service.Language;
5+
import io.temporal.samples.nexusmessaging.callerpattern.service.NexusGreetingService;
6+
import io.temporal.workflow.NexusOperationOptions;
7+
import io.temporal.workflow.NexusServiceOptions;
8+
import io.temporal.workflow.Workflow;
9+
import java.time.Duration;
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import org.slf4j.Logger;
13+
14+
public class CallerWorkflowImpl implements CallerWorkflow {
15+
16+
private static final Logger logger = Workflow.getLogger(CallerWorkflowImpl.class);
17+
18+
// The endpoint is configured at the worker level in CallerWorker; only operation options are
19+
// set here.
20+
NexusGreetingService greetingService =
21+
Workflow.newNexusServiceStub(
22+
NexusGreetingService.class,
23+
NexusServiceOptions.newBuilder()
24+
.setOperationOptions(
25+
NexusOperationOptions.newBuilder()
26+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
27+
.build())
28+
.build());
29+
30+
@Override
31+
public List<String> run(String userId) {
32+
33+
// Messages in the log array are passed back to the caller who will then log them to report what
34+
// is happening.
35+
// The same message is also logged for demo purposes, so that things are visible in the caller
36+
// workflow output.
37+
List<String> log = new ArrayList<>();
38+
39+
// Call a Nexus operation backed by a query against the entity workflow.
40+
// The workflow must already be running on the handler, otherwise you will
41+
// get an error saying the workflow has already terminated.
42+
NexusGreetingService.GetLanguagesOutput languagesOutput =
43+
greetingService.getLanguages(new NexusGreetingService.GetLanguagesInput(false, userId));
44+
log.add("Supported languages: " + languagesOutput.getLanguages());
45+
logger.info("Supported languages: {}", languagesOutput.getLanguages());
46+
47+
// Following are examples for each of the three messaging types -
48+
// update, query, then signal.
49+
50+
// Call a Nexus operation backed by an update against the entity workflow.
51+
Language previousLanguage =
52+
greetingService.setLanguage(
53+
new NexusGreetingService.SetLanguageInput(Language.ARABIC, userId));
54+
55+
// Call a Nexus operation backed by a query to confirm the language change.
56+
Language currentLanguage =
57+
greetingService.getLanguage(new NexusGreetingService.GetLanguageInput(userId));
58+
if (currentLanguage != Language.ARABIC) {
59+
throw ApplicationFailure.newFailure(
60+
"Expected language ARABIC, got " + currentLanguage, "AssertionError");
61+
}
62+
63+
log.add("Language changed: " + previousLanguage.name() + " -> " + Language.ARABIC.name());
64+
logger.info("Language changed from {} to {}", previousLanguage, Language.ARABIC);
65+
66+
// Call a Nexus operation backed by a signal against the entity workflow.
67+
greetingService.approve(new NexusGreetingService.ApproveInput("caller", userId));
68+
log.add("Workflow approved");
69+
logger.info("Workflow approved");
70+
71+
return log;
72+
}
73+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.temporal.samples.nexusmessaging.callerpattern.handler;
2+
3+
import io.temporal.activity.ActivityInterface;
4+
import io.temporal.activity.ActivityMethod;
5+
import io.temporal.samples.nexusmessaging.callerpattern.service.Language;
6+
7+
@ActivityInterface
8+
public interface GreetingActivity {
9+
// Simulates a call to a remote greeting service. Returns null if the language is not supported.
10+
@ActivityMethod
11+
String callGreetingService(Language language);
12+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.temporal.samples.nexusmessaging.callerpattern.handler;
2+
3+
import io.temporal.samples.nexusmessaging.callerpattern.service.Language;
4+
import java.util.EnumMap;
5+
import java.util.Map;
6+
7+
public class GreetingActivityImpl implements GreetingActivity {
8+
9+
private static final Map<Language, String> GREETINGS = new EnumMap<>(Language.class);
10+
11+
static {
12+
GREETINGS.put(Language.ARABIC, "مرحبا بالعالم");
13+
GREETINGS.put(Language.CHINESE, "你好,世界");
14+
GREETINGS.put(Language.ENGLISH, "Hello, world");
15+
GREETINGS.put(Language.FRENCH, "Bonjour, monde");
16+
GREETINGS.put(Language.HINDI, "नमस्ते दुनिया");
17+
GREETINGS.put(Language.PORTUGUESE, "Olá mundo");
18+
GREETINGS.put(Language.SPANISH, "Hola mundo");
19+
}
20+
21+
@Override
22+
public String callGreetingService(Language language) {
23+
return GREETINGS.get(language);
24+
}
25+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.temporal.samples.nexusmessaging.callerpattern.handler;
2+
3+
import io.temporal.samples.nexusmessaging.callerpattern.service.Language;
4+
import io.temporal.samples.nexusmessaging.callerpattern.service.NexusGreetingService;
5+
import io.temporal.workflow.QueryMethod;
6+
import io.temporal.workflow.SignalMethod;
7+
import io.temporal.workflow.UpdateMethod;
8+
import io.temporal.workflow.UpdateValidatorMethod;
9+
import io.temporal.workflow.WorkflowInterface;
10+
import io.temporal.workflow.WorkflowMethod;
11+
12+
/**
13+
* A long-running "entity" workflow that backs the NexusGreetingService Nexus operations. The
14+
* workflow exposes queries, an update, and a signal. These are private implementation details of
15+
* the Nexus service: the caller only interacts via Nexus operations.
16+
*/
17+
@WorkflowInterface
18+
public interface GreetingWorkflow {
19+
20+
@WorkflowMethod
21+
String run();
22+
23+
// Returns the languages currently supported by the workflow.
24+
@QueryMethod
25+
NexusGreetingService.GetLanguagesOutput getLanguages(
26+
NexusGreetingService.GetLanguagesInput input);
27+
28+
// Returns the currently active language.
29+
@QueryMethod
30+
Language getLanguage();
31+
32+
// Approves the workflow, allowing it to complete.
33+
@SignalMethod
34+
void approve(NexusGreetingService.ApproveInput input);
35+
36+
// Changes the active language synchronously (only supports languages already in the greetings
37+
// map).
38+
@UpdateMethod
39+
Language setLanguage(NexusGreetingService.SetLanguageInput input);
40+
41+
@UpdateValidatorMethod(updateName = "setLanguage")
42+
void validateSetLanguage(NexusGreetingService.SetLanguageInput input);
43+
44+
// Changes the active language, calling an activity to fetch a greeting for new languages.
45+
@UpdateMethod
46+
Language setLanguageUsingActivity(NexusGreetingService.SetLanguageInput input);
47+
}

0 commit comments

Comments
 (0)