Skip to content

Commit a63cc2e

Browse files
authored
Merge pull request #1513 from nats-io/doc-examples
Doc examples
2 parents d0c89db + 42dcbb9 commit a63cc2e

9 files changed

+447
-2
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.nats.examples.natsIoDoc;
2+
3+
import io.nats.client.Connection;
4+
import io.nats.client.Dispatcher;
5+
import io.nats.client.Message;
6+
import io.nats.client.Nats;
7+
8+
import java.io.IOException;
9+
import java.nio.charset.StandardCharsets;
10+
import java.time.Duration;
11+
12+
public class QueueGroupsMixedSubscribers {
13+
public static void main(String[] args) {
14+
try (Connection nc = Nats.connect("nats://localhost:4222")) {
15+
16+
// NATS-DOC-START
17+
// Set up 3 instances of the service
18+
Dispatcher dService1 = nc.createDispatcher(msg -> {
19+
byte[] response = calculateResponse(1, msg);
20+
nc.publish(msg.getReplyTo(), response);
21+
});
22+
dService1.subscribe("api.calculate", "api-workers-queue");
23+
24+
Dispatcher dService2 = nc.createDispatcher(msg -> {
25+
byte[] response = calculateResponse(2, msg);
26+
nc.publish(msg.getReplyTo(), response);
27+
});
28+
dService2.subscribe("api.calculate", "api-workers-queue");
29+
30+
Dispatcher dService3 = nc.createDispatcher(msg -> {
31+
byte[] response = calculateResponse(3, msg);
32+
nc.publish(msg.getReplyTo(), response);
33+
});
34+
dService3.subscribe("api.calculate", "api-workers-queue");
35+
36+
// Make requests - messages are balanced among the subscribers in the queue
37+
for (int x = 0; x < 10; x++) {
38+
Message m = nc.request("api.calculate", null, Duration.ofMillis(500));
39+
if (m == null) {
40+
System.out.println(x + ") No Response");
41+
}
42+
else {
43+
System.out.println(x + ") " + new String(m.getData()));
44+
}
45+
}
46+
// NATS-DOC-END
47+
}
48+
catch (InterruptedException e) {
49+
// can be thrown by connect
50+
Thread.currentThread().interrupt();
51+
}
52+
catch (IOException e) {
53+
// can be thrown by connect
54+
}
55+
}
56+
57+
private static byte[] calculateResponse(int i, Message msg) {
58+
return ("Result from service instance " + i).getBytes(StandardCharsets.UTF_8);
59+
}
60+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package io.nats.examples.natsIoDoc;
2+
3+
import io.nats.client.Connection;
4+
import io.nats.client.Dispatcher;
5+
import io.nats.client.Message;
6+
import io.nats.client.Nats;
7+
8+
import java.io.IOException;
9+
import java.nio.charset.StandardCharsets;
10+
11+
public class QueueGroupsRequestReply {
12+
public static void main(String[] args) {
13+
try (Connection nc = Nats.connect("nats://localhost:4222")) {
14+
15+
// NATS-DOC-START
16+
// Audit logger - receives all messages
17+
Dispatcher dAudit = nc.createDispatcher(msg -> {
18+
System.out.printf("[AUDIT] %s: %s\n", msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
19+
});
20+
dAudit.subscribe("orders.>");
21+
22+
Dispatcher dMetrics = nc.createDispatcher(msg -> {
23+
System.out.printf("[METRICS] %s: %s\n", msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
24+
});
25+
dMetrics.subscribe("orders.>");
26+
27+
Dispatcher dNewOrderWorker1 = nc.createDispatcher(msg -> {
28+
System.out.printf("[WORKER 1] %s: %s\n", msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
29+
});
30+
dNewOrderWorker1.subscribe("orders.new", "new-orders-queue");
31+
32+
Dispatcher dNewOrderWorker2 = nc.createDispatcher(msg -> {
33+
System.out.printf("[WORKER 2] %s: %s\n", msg.getSubject(), new String(msg.getData(), StandardCharsets.UTF_8));
34+
});
35+
dNewOrderWorker2.subscribe("orders.new", "new-orders-queue");
36+
37+
// Publish order
38+
nc.publish("orders.new", "Order 123".getBytes(StandardCharsets.UTF_8));
39+
nc.publish("orders.new", "Order 124".getBytes(StandardCharsets.UTF_8));
40+
// Audit and metrics see it, one worker processes it
41+
// NATS-DOC-END
42+
43+
Thread.sleep(100);
44+
}
45+
catch (InterruptedException e) {
46+
// can be thrown by connect
47+
Thread.currentThread().interrupt();
48+
}
49+
catch (IOException e) {
50+
// can be thrown by connect
51+
}
52+
catch (Exception e) {
53+
e.printStackTrace();
54+
}
55+
}
56+
57+
private static String processNewOrder(int i, Message msg) {
58+
return "Order processed by instance " + i;
59+
}
60+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.nats.examples.natsIoDoc;
2+
3+
import io.nats.client.Connection;
4+
import io.nats.client.Dispatcher;
5+
import io.nats.client.Message;
6+
import io.nats.client.Nats;
7+
8+
import java.io.IOException;
9+
import java.nio.charset.StandardCharsets;
10+
import java.time.Duration;
11+
import java.util.Date;
12+
import java.util.concurrent.*;
13+
14+
public class RequestReplyBasic {
15+
public static void main(String[] args) {
16+
try (Connection nc = Nats.connect("nats://localhost:4222")) {
17+
18+
// NATS-DOC-START
19+
// Set up the time service
20+
Dispatcher dTime = nc.createDispatcher(msg -> {
21+
nc.publish(msg.getReplyTo(), ("" + System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
22+
});
23+
dTime.subscribe("time");
24+
25+
// Make a request expecting a future
26+
CompletableFuture<Message> responseFuture = nc.request("time", null);
27+
try {
28+
Message m = responseFuture.get(500, TimeUnit.MILLISECONDS);
29+
System.out.println("1) Time is " + new Date(Long.parseLong(new String(m.getData()))));
30+
}
31+
catch (CancellationException | ExecutionException | TimeoutException e) {
32+
System.out.println("1) No Response");
33+
}
34+
35+
// Make a request with a timeout and direct response
36+
Message m = nc.request("time", null, Duration.ofMillis(500));
37+
if (m == null) {
38+
System.out.println("2) No Response");
39+
}
40+
else {
41+
System.out.println("2) Time is " + new Date(Long.parseLong(new String(m.getData()))));
42+
}
43+
// NATS-DOC-END
44+
}
45+
catch (InterruptedException e) {
46+
// can be thrown by connect
47+
Thread.currentThread().interrupt();
48+
}
49+
catch (IOException e) {
50+
// can be thrown by connect
51+
}
52+
}
53+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.nats.examples.natsIoDoc;
2+
3+
import io.nats.client.Connection;
4+
import io.nats.client.Dispatcher;
5+
import io.nats.client.Message;
6+
import io.nats.client.Nats;
7+
8+
import java.io.IOException;
9+
import java.nio.charset.StandardCharsets;
10+
import java.time.Duration;
11+
import java.util.concurrent.*;
12+
13+
public class RequestReplyCalculator {
14+
public static void main(String[] args) {
15+
try (Connection nc = Nats.connect("nats://localhost:4222")) {
16+
17+
// NATS-DOC-START
18+
// Set up the calculator add service
19+
Dispatcher dCalcAdd = nc.createDispatcher(msg -> {
20+
// data is in the form "x y"
21+
try {
22+
String[] parts = new String(msg.getData()).split(" ");
23+
if (parts.length == 2) {
24+
int x = Integer.parseInt(parts[0]);
25+
int y = Integer.parseInt(parts[1]);
26+
nc.publish(msg.getReplyTo(), ("" + (x + y)).getBytes(StandardCharsets.UTF_8));
27+
}
28+
}
29+
catch (Exception e) {
30+
// you could make some other reply here
31+
}
32+
});
33+
dCalcAdd.subscribe("calc.add");
34+
35+
// Make a request expecting a future
36+
CompletableFuture<Message> responseFuture = nc.request("calc.add", "5 3".getBytes(StandardCharsets.UTF_8));
37+
try {
38+
Message m = responseFuture.get(500, TimeUnit.MILLISECONDS);
39+
System.out.printf("5 + 3 = %s\n", new String(m.getData()));
40+
}
41+
catch (CancellationException | ExecutionException | TimeoutException e) {
42+
System.out.println("1) No Response");
43+
}
44+
45+
// Make a request with a timeout and direct response
46+
Message m = nc.request("calc.add", "10 7".getBytes(StandardCharsets.UTF_8), Duration.ofMillis(500));
47+
if (m == null) {
48+
System.out.println("2) No Response");
49+
}
50+
else {
51+
System.out.printf("10 + 7 = %s\n", new String(m.getData()));
52+
}
53+
// NATS-DOC-END
54+
}
55+
catch (InterruptedException e) {
56+
// can be thrown by connect
57+
Thread.currentThread().interrupt();
58+
}
59+
catch (IOException e) {
60+
// can be thrown by connect
61+
}
62+
}
63+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.nats.examples.natsIoDoc;
2+
3+
import io.nats.client.Connection;
4+
import io.nats.client.Dispatcher;
5+
import io.nats.client.Message;
6+
import io.nats.client.Nats;
7+
import io.nats.client.impl.Headers;
8+
9+
import java.io.IOException;
10+
import java.time.Duration;
11+
import java.util.concurrent.*;
12+
13+
public class RequestReplyHeaders {
14+
public static void main(String[] args) {
15+
try (Connection nc = Nats.connect("nats://localhost:4222")) {
16+
17+
// NATS-DOC-START
18+
// Set up the header echo service
19+
Dispatcher dService = nc.createDispatcher(msg -> {
20+
Headers hIncoming = msg.getHeaders();
21+
Headers hResponse = new Headers();
22+
for (String keys : hIncoming.keySet()) {
23+
hResponse.put(keys, hIncoming.get(keys));
24+
}
25+
nc.publish(msg.getReplyTo(), hResponse, null);
26+
});
27+
dService.subscribe("header.echo");
28+
29+
// Make a request expecting a future
30+
Headers headers1 = new Headers();
31+
headers1.put("X-Request-ID", "1");
32+
headers1.put("X-Priority", "high");
33+
CompletableFuture<Message> responseFuture = nc.request("header.echo", headers1, null);
34+
try {
35+
Message m = responseFuture.get(500, TimeUnit.MILLISECONDS);
36+
Headers hIncoming = m.getHeaders();
37+
System.out.println("Response Headers: " + hIncoming);
38+
}
39+
catch (CancellationException | ExecutionException | TimeoutException e) {
40+
System.out.println("1) No Response");
41+
}
42+
43+
// Make a request with a timeout and direct response
44+
Headers headers2 = new Headers();
45+
headers2.put("X-Request-ID", "2");
46+
headers2.put("X-Priority", "med");
47+
Message m = nc.request("header.echo", headers2, null, Duration.ofMillis(500));
48+
if (m == null) {
49+
System.out.println("2) No Response");
50+
}
51+
else {
52+
Headers hIncoming = m.getHeaders();
53+
System.out.println("Response Headers: " + hIncoming);
54+
}
55+
// NATS-DOC-END
56+
}
57+
catch (InterruptedException e) {
58+
// can be thrown by connect
59+
Thread.currentThread().interrupt();
60+
}
61+
catch (IOException e) {
62+
// can be thrown by connect
63+
}
64+
}
65+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.nats.examples.natsIoDoc;
2+
3+
import io.nats.client.Connection;
4+
import io.nats.client.Dispatcher;
5+
import io.nats.client.Message;
6+
import io.nats.client.Nats;
7+
8+
import java.io.IOException;
9+
import java.nio.charset.StandardCharsets;
10+
import java.time.Duration;
11+
import java.util.concurrent.*;
12+
13+
public class RequestReplyMultipleResponders {
14+
public static void main(String[] args) {
15+
try (Connection nc = Nats.connect("nats://localhost:4222")) {
16+
17+
// NATS-DOC-START
18+
// Set up 2 instances of the service
19+
Dispatcher dService1 = nc.createDispatcher(msg -> {
20+
byte[] response = calculateResponse(1, msg);
21+
nc.publish(msg.getReplyTo(), response);
22+
});
23+
dService1.subscribe("service");
24+
25+
Dispatcher dService2 = nc.createDispatcher(msg -> {
26+
byte[] response = calculateResponse(2, msg);
27+
nc.publish(msg.getReplyTo(), response);
28+
});
29+
dService2.subscribe("service");
30+
31+
// Make a request expecting a future
32+
CompletableFuture<Message> responseFuture = nc.request("service", null);
33+
try {
34+
Message m = responseFuture.get(500, TimeUnit.MILLISECONDS);
35+
System.out.println("1) " + new String(m.getData()));
36+
}
37+
catch (CancellationException | ExecutionException | TimeoutException e) {
38+
System.out.println("1) No Response");
39+
}
40+
41+
// Make a request with a timeout and direct response
42+
Message m = nc.request("service", null, Duration.ofMillis(500));
43+
if (m == null) {
44+
System.out.println("2) No Response");
45+
}
46+
else {
47+
System.out.println("2) " + new String(m.getData()));
48+
}
49+
// NATS-DOC-END
50+
}
51+
catch (InterruptedException e) {
52+
// can be thrown by connect
53+
Thread.currentThread().interrupt();
54+
}
55+
catch (IOException e) {
56+
// can be thrown by connect
57+
}
58+
}
59+
60+
private static byte[] calculateResponse(int i, Message msg) {
61+
return ("Result from service instance " + i).getBytes(StandardCharsets.UTF_8);
62+
}
63+
}

0 commit comments

Comments
 (0)