Skip to content

Commit 90eb625

Browse files
authored
Simplification of Builder Use (#897)
Added `of()` static functions to MQTT5 packet types to allow simplified creation with basic arguments.
1 parent 450ec89 commit 90eb625

4 files changed

Lines changed: 131 additions & 0 deletions

File tree

src/main/java/software/amazon/awssdk/crt/mqtt5/packets/PublishPacket.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.function.Function;
1212
import java.util.stream.Collectors;
1313
import java.util.stream.Stream;
14+
import java.util.Objects;
1415

1516
/**
1617
* Data model of an <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901100">MQTT5 PUBLISH</a> packet
@@ -181,6 +182,32 @@ public List<Long> getSubscriptionIdentifiers() {
181182
return this.subscriptionIdentifiers;
182183
}
183184

185+
/**
186+
* Creates a {@link PublishPacket} containing only the most common fields:
187+
* <em>topic</em>, <em>QoS</em>, and <em>payload</em>.
188+
* <p>
189+
* Internally this is just syntactic sugar around
190+
* {@link PublishPacketBuilder#PublishPacketBuilder(String, QOS, byte[])}
191+
* followed by {@link PublishPacketBuilder#build()}.
192+
* All optional MQTT 5 properties (retain flag, user properties, etc.)
193+
* are left {@code null} / unset.
194+
*
195+
* @param topic The topic this message should be published to.
196+
* @param qos The MQTT quality of service level the message should be delivered with.
197+
* @param payload The payload for the publish message.
198+
* @return an immutable {@code PublishPacket} ready for use
199+
*
200+
* @throws NullPointerException if {@code topic}, {@code qos}, or
201+
* {@code payload} is {@code null}
202+
*/
203+
public static PublishPacket of(String topic, QOS qos, byte[] payload) {
204+
Objects.requireNonNull(topic, "topic");
205+
Objects.requireNonNull(qos, "qos");
206+
Objects.requireNonNull(payload, "payload");
207+
208+
return new PublishPacketBuilder(topic, qos, payload).build();
209+
}
210+
184211
/**
185212
* Creates a Mqtt5Client options instance
186213
* @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT client structure

src/main/java/software/amazon/awssdk/crt/mqtt5/packets/SubscribePacket.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.function.Function;
1111
import java.util.stream.Collectors;
1212
import java.util.stream.Stream;
13+
import java.util.Objects;
1314

1415
import software.amazon.awssdk.crt.mqtt5.QOS;
1516

@@ -28,6 +29,27 @@ private SubscribePacket(SubscribePacketBuilder builder) {
2829
this.userProperties = builder.userProperties;
2930
}
3031

32+
/**
33+
* Creates a {@link SubscribePacket} containing only a single subscription topic and qos:
34+
* <em>topicFilter</em>, <em>QoS</em>.
35+
* <p>
36+
* Internally this is just syntactic sugar around
37+
* {@link SubscribePacketBuilder#SubscribePacketBuilder(String, QOS)}
38+
* followed by {@link SubscribePacketBuilder#build()}.
39+
*
40+
* @param topicFilter The topic filter to subscribe to.
41+
* @param qos The maximum QoS on which the subscriber will accept publish messages.
42+
* @return an immutable {@code SubscribePacket} ready for use
43+
*
44+
* @throws NullPointerException if {@code topicFilter} or {@code qos} is {@code null}
45+
*/
46+
public static SubscribePacket of(String topicFilter, QOS qos) {
47+
Objects.requireNonNull(topicFilter, "topicFilter");
48+
Objects.requireNonNull(qos, "qos");
49+
50+
return new SubscribePacketBuilder(topicFilter, qos).build();
51+
}
52+
3153
/**
3254
* Returns the list of subscriptions that the client wishes to listen to
3355
*

src/main/java/software/amazon/awssdk/crt/mqtt5/packets/UnsubscribePacket.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.util.ArrayList;
88
import java.util.List;
9+
import java.util.Objects;
910

1011
/**
1112
* Data model of an <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc384800445">MQTT5 UNSUBSCRIBE</a> packet.
@@ -20,6 +21,25 @@ private UnsubscribePacket(UnsubscribePacketBuilder builder) {
2021
this.subscriptions = builder.subscriptions;
2122
}
2223

24+
/**
25+
* Creates an {@link UnsubscribePacket} containing only a single subscription topic to unsubscribe from:
26+
* <em>topicFilter</em>.
27+
* <p>
28+
* Internally this is just syntactic sugar around
29+
* {@link UnsubscribePacketBuilder#UnsubscribePacketBuilder(String)}
30+
* followed by {@link UnsubscribePacketBuilder#build()}.
31+
*
32+
* @param topicFilter The topic filter to unsubscribe from.
33+
* @return an immutable {@code UnsubscribePacket} ready for use
34+
*
35+
* @throws NullPointerException if {@code topicFilter} is {@code null}
36+
*/
37+
public static UnsubscribePacket of(String topicFilter) {
38+
Objects.requireNonNull(topicFilter, "topicFilter");
39+
40+
return new UnsubscribePacketBuilder(topicFilter).build();
41+
}
42+
2343
/**
2444
* Returns a list of subscriptions that the client wishes to unsubscribe from.
2545
*

src/test/java/software/amazon/awssdk/crt/test/Mqtt5ClientTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2101,6 +2101,68 @@ public void Op_SharedSubscription() throws Exception {
21012101
CrtResource.waitForNoResources();
21022102
}
21032103

2104+
public void doOp_DirectPacketBuilders() {
2105+
String testUUID = UUID.randomUUID().toString();
2106+
String testTopic = "test/MQTT5_Binding_Java_" + testUUID;
2107+
try {
2108+
Mqtt5ClientOptionsBuilder builder = new Mqtt5ClientOptionsBuilder(AWS_TEST_MQTT5_IOT_CORE_HOST, 8883l);
2109+
LifecycleEvents_Futured events = new LifecycleEvents_Futured();
2110+
builder.withLifecycleEvents(events);
2111+
2112+
TlsContextOptions tlsOptions = TlsContextOptions.createWithMtlsFromPath(
2113+
AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
2114+
TlsContext tlsContext = new TlsContext(tlsOptions);
2115+
tlsOptions.close();
2116+
builder.withTlsContext(tlsContext);
2117+
2118+
PublishEvents_Futured publishEvents = new PublishEvents_Futured();
2119+
builder.withPublishEvents(publishEvents);
2120+
2121+
PublishPacket publishPacket = PublishPacket.of(testTopic, QOS.AT_LEAST_ONCE, "Hello World".getBytes());
2122+
SubscribePacket subscribePacket = SubscribePacket.of(testTopic, QOS.AT_LEAST_ONCE);
2123+
UnsubscribePacket unsubscribePacket = UnsubscribePacket.of(testTopic);
2124+
2125+
try (Mqtt5Client client = new Mqtt5Client(builder.build())) {
2126+
client.start();
2127+
events.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2128+
2129+
client.subscribe(subscribePacket).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2130+
2131+
client.publish(publishPacket).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2132+
publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2133+
2134+
publishEvents.publishReceivedFuture = new CompletableFuture<>();
2135+
publishEvents.publishPacket = null;
2136+
client.unsubscribe(unsubscribePacket).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2137+
client.publish(publishPacket).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
2138+
2139+
assertEquals(
2140+
"Publish after unsubscribe still arrived!",
2141+
publishEvents.publishPacket,
2142+
null);
2143+
2144+
client.stop();
2145+
}
2146+
2147+
if (tlsContext != null) {
2148+
tlsContext.close();
2149+
}
2150+
2151+
} catch (Exception ex) {
2152+
throw new RuntimeException(ex);
2153+
}
2154+
}
2155+
2156+
@Test
2157+
public void Op_DirectPacketBuilders() throws Exception {
2158+
skipIfNetworkUnavailable();
2159+
Assume.assumeNotNull(AWS_TEST_MQTT5_IOT_CORE_HOST, AWS_TEST_MQTT5_IOT_CORE_RSA_CERT, AWS_TEST_MQTT5_IOT_CORE_RSA_KEY);
2160+
2161+
TestUtils.doRetryableTest(this::doOp_DirectPacketBuilders, TestUtils::isRetryableTimeout, MAX_TEST_RETRIES, TEST_RETRY_SLEEP_MILLIS);
2162+
2163+
CrtResource.waitForNoResources();
2164+
}
2165+
21042166
/**
21052167
* ============================================================
21062168
* Error Operation Tests

0 commit comments

Comments
 (0)