Skip to content

Commit 4e1b04e

Browse files
authored
Merge pull request #1295 from nats-io/ttl-211
Per Message TTL Support for 2.11
2 parents fcb126a + 01819be commit 4e1b04e

File tree

8 files changed

+216
-87
lines changed

8 files changed

+216
-87
lines changed

src/main/java/io/nats/client/PublishOptions.java

+67-17
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,17 @@ public class PublishOptions {
4545
private final long expectedLastSeq;
4646
private final long expectedLastSubSeq;
4747
private final String msgId;
48+
private final String messageTtl;
4849

49-
private PublishOptions(String stream, Duration streamTimeout, String expectedStream, String expectedLastId, long expectedLastSeq, long expectedLastSubSeq, String msgId) {
50-
this.stream = stream;
51-
this.streamTimeout = streamTimeout;
52-
this.expectedStream = expectedStream;
53-
this.expectedLastId = expectedLastId;
54-
this.expectedLastSeq = expectedLastSeq;
55-
this.expectedLastSubSeq = expectedLastSubSeq;
56-
this.msgId = msgId;
50+
private PublishOptions(Builder b) {
51+
this.stream = b.stream;
52+
this.streamTimeout = b.streamTimeout;
53+
this.expectedStream = b.expectedStream;
54+
this.expectedLastId = b.expectedLastId;
55+
this.expectedLastSeq = b.expectedLastSeq;
56+
this.expectedLastSubSeq = b.expectedLastSubSeq;
57+
this.msgId = b.msgId;
58+
this.messageTtl = b.messageTtl;
5759
}
5860

5961
/**
@@ -122,6 +124,15 @@ public String getMessageId() {
122124
return this.msgId;
123125
}
124126

127+
/**
128+
* Gets the message ttl string. Might be null. Might be "never".
129+
* 10 seconds would be "10s" for the server
130+
* @return the message ttl string
131+
*/
132+
public String getMessageTtl() {
133+
return messageTtl;
134+
}
135+
125136
/**
126137
* Creates a builder for the options.
127138
* @return the builder
@@ -144,6 +155,7 @@ public static class Builder {
144155
long expectedLastSeq = UNSET_LAST_SEQUENCE;
145156
long expectedLastSubSeq = UNSET_LAST_SEQUENCE;
146157
String msgId;
158+
String messageTtl;
147159

148160
/**
149161
* Constructs a new publish options Builder with the default values.
@@ -169,7 +181,7 @@ public Builder(Properties properties) {
169181
/**
170182
* Sets the stream name for publishing. The default is undefined.
171183
* @param stream The name of the stream.
172-
* @return Builder
184+
* @return The Builder
173185
*/
174186
public Builder stream(String stream) {
175187
this.stream = validateStreamName(stream, false);
@@ -180,7 +192,7 @@ public Builder stream(String stream) {
180192
* Sets the timeout to wait for a publish acknowledgement from a JetStream
181193
* enabled NATS server.
182194
* @param timeout the publish timeout.
183-
* @return Builder
195+
* @return The Builder
184196
*/
185197
public Builder streamTimeout(Duration timeout) {
186198
this.streamTimeout = validateDurationNotRequiredGtOrEqZero(timeout, DEFAULT_TIMEOUT);
@@ -191,7 +203,7 @@ public Builder streamTimeout(Duration timeout) {
191203
* Sets the expected stream for the publish. If the
192204
* stream does not match the server will not save the message.
193205
* @param stream expected stream
194-
* @return builder
206+
* @return The Builder
195207
*/
196208
public Builder expectedStream(String stream) {
197209
expectedStream = validateStreamName(stream, false);
@@ -202,7 +214,7 @@ public Builder expectedStream(String stream) {
202214
* Sets the expected last ID of the previously published message. If the
203215
* message ID does not match the server will not save the message.
204216
* @param lastMsgId the stream
205-
* @return builder
217+
* @return The Builder
206218
*/
207219
public Builder expectedLastMsgId(String lastMsgId) {
208220
expectedLastId = emptyAsNull(lastMsgId);
@@ -212,7 +224,7 @@ public Builder expectedLastMsgId(String lastMsgId) {
212224
/**
213225
* Sets the expected message sequence of the publish
214226
* @param sequence the expected last sequence number
215-
* @return builder
227+
* @return The Builder
216228
*/
217229
public Builder expectedLastSequence(long sequence) {
218230
// 0 has NO meaning to expectedLastSequence but we except 0 b/c the sequence is really a ulong
@@ -223,7 +235,7 @@ public Builder expectedLastSequence(long sequence) {
223235
/**
224236
* Sets the expected subject message sequence of the publish
225237
* @param sequence the expected last subject sequence number
226-
* @return builder
238+
* @return The Builder
227239
*/
228240
public Builder expectedLastSubjectSequence(long sequence) {
229241
expectedLastSubSeq = validateGtEqMinus1(sequence, "Last Subject Sequence");
@@ -234,17 +246,55 @@ public Builder expectedLastSubjectSequence(long sequence) {
234246
* Sets the message id. Message IDs are used for de-duplication
235247
* and should be unique to each message payload.
236248
* @param msgId the unique message id.
237-
* @return builder
249+
* @return The Builder
238250
*/
239251
public Builder messageId(String msgId) {
240252
this.msgId = emptyAsNull(msgId);
241253
return this;
242254
}
243255

256+
/**
257+
* Sets the TTL for this specific message to be published
258+
* @param msgTtlSeconds the ttl in seconds
259+
* @return The Builder
260+
*/
261+
public Builder messageTtlSeconds(int msgTtlSeconds) {
262+
this.messageTtl = msgTtlSeconds < 1 ? null : msgTtlSeconds + "s";
263+
return this;
264+
}
265+
266+
/**
267+
* Sets the TTL for this specific message to be published. Use at your own risk.
268+
* The current specification can be found here @see <a href="https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-43.md#per-message-ttl">JetStream Per-Message TTL</a>
269+
* @param messageTtlCustom the ttl in seconds
270+
* @return The Builder
271+
*/
272+
public Builder messageTtlCustom(String messageTtlCustom) {
273+
if (messageTtlCustom == null) {
274+
this.messageTtl = null;
275+
}
276+
else {
277+
this.messageTtl = messageTtlCustom.trim();
278+
if (this.messageTtl.isEmpty()) {
279+
this.messageTtl = null;
280+
}
281+
}
282+
return this;
283+
}
284+
285+
/**
286+
* Sets the TTL for this specific message to be published and never be expired
287+
* @return The Builder
288+
*/
289+
public Builder messageTtlNever() {
290+
this.messageTtl = "never";
291+
return this;
292+
}
293+
244294
/**
245295
* Clears the expected so the build can be re-used.
246296
* Clears the expectedLastId, expectedLastSequence and messageId fields.
247-
* @return builder
297+
* @return The Builder
248298
*/
249299
public Builder clearExpected() {
250300
expectedLastId = null;
@@ -259,7 +309,7 @@ public Builder clearExpected() {
259309
* @return publish options
260310
*/
261311
public PublishOptions build() {
262-
return new PublishOptions(stream, streamTimeout, expectedStream, expectedLastId, expectedLastSeq, expectedLastSubSeq, msgId);
312+
return new PublishOptions(this);
263313
}
264314
}
265315
}

src/main/java/io/nats/client/api/AckPolicy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public enum AckPolicy {
3333
*/
3434
Explicit("explicit");
3535

36-
private String policy;
36+
private final String policy;
3737

3838
AckPolicy(String p) {
3939
policy = p;

src/main/java/io/nats/client/api/StreamConfiguration.java

+65-65
Original file line numberDiff line numberDiff line change
@@ -71,42 +71,42 @@ public class StreamConfiguration implements JsonSerializable {
7171
private final Duration subjectDeleteMarkerTtl;
7272

7373
static StreamConfiguration instance(JsonValue v) {
74-
Builder builder = new Builder();
75-
builder.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION)));
76-
builder.compressionOption(CompressionOption.get(readString(v, COMPRESSION)));
77-
builder.storageType(StorageType.get(readString(v, STORAGE)));
78-
builder.discardPolicy(DiscardPolicy.get(readString(v, DISCARD)));
79-
builder.name(readString(v, NAME));
80-
builder.description(readString(v, DESCRIPTION));
81-
builder.maxConsumers(readLong(v, MAX_CONSUMERS, -1));
82-
builder.maxMessages(readLong(v, MAX_MSGS, -1));
83-
builder.maxMessagesPerSubject(readLong(v, MAX_MSGS_PER_SUB, -1));
84-
builder.maxBytes(readLong(v, MAX_BYTES, -1));
85-
builder.maxAge(readNanos(v, MAX_AGE));
86-
builder.maximumMessageSize(readInteger(v, MAX_MSG_SIZE, -1));
87-
builder.replicas(readInteger(v, NUM_REPLICAS, 1));
88-
builder.noAck(readBoolean(v, NO_ACK));
89-
builder.templateOwner(readString(v, TEMPLATE_OWNER));
90-
builder.duplicateWindow(readNanos(v, DUPLICATE_WINDOW));
91-
builder.subjects(readStringList(v, SUBJECTS));
92-
builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT)));
93-
builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH)));
94-
builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)));
95-
builder.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)));
96-
builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR)));
97-
builder.sources(Source.optionalListOf(readValue(v, SOURCES)));
98-
builder.sealed(readBoolean(v, SEALED));
99-
builder.allowRollup(readBoolean(v, ALLOW_ROLLUP_HDRS));
100-
builder.allowDirect(readBoolean(v, ALLOW_DIRECT));
101-
builder.mirrorDirect(readBoolean(v, MIRROR_DIRECT));
102-
builder.denyDelete(readBoolean(v, DENY_DELETE));
103-
builder.denyPurge(readBoolean(v, DENY_PURGE));
104-
builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT));
105-
builder.metadata(readStringStringMap(v, METADATA));
106-
builder.firstSequence(readLong(v, FIRST_SEQ, 1));
107-
// builder.allowMessageTtl(readBoolean(v, ALLOW_MSG_TTL));
108-
// builder.subjectDeleteMarkerTtl(readNanos(v, SUBJECT_DELETE_MARKER_TTL));
109-
return builder.build();
74+
return new Builder()
75+
.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION)))
76+
.compressionOption(CompressionOption.get(readString(v, COMPRESSION)))
77+
.storageType(StorageType.get(readString(v, STORAGE)))
78+
.discardPolicy(DiscardPolicy.get(readString(v, DISCARD)))
79+
.name(readString(v, NAME))
80+
.description(readString(v, DESCRIPTION))
81+
.maxConsumers(readLong(v, MAX_CONSUMERS, -1))
82+
.maxMessages(readLong(v, MAX_MSGS, -1))
83+
.maxMessagesPerSubject(readLong(v, MAX_MSGS_PER_SUB, -1))
84+
.maxBytes(readLong(v, MAX_BYTES, -1))
85+
.maxAge(readNanos(v, MAX_AGE))
86+
.maximumMessageSize(readInteger(v, MAX_MSG_SIZE, -1))
87+
.replicas(readInteger(v, NUM_REPLICAS, 1))
88+
.noAck(readBoolean(v, NO_ACK))
89+
.templateOwner(readString(v, TEMPLATE_OWNER))
90+
.duplicateWindow(readNanos(v, DUPLICATE_WINDOW))
91+
.subjects(readStringList(v, SUBJECTS))
92+
.placement(Placement.optionalInstance(readValue(v, PLACEMENT)))
93+
.republish(Republish.optionalInstance(readValue(v, REPUBLISH)))
94+
.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)))
95+
.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)))
96+
.mirror(Mirror.optionalInstance(readValue(v, MIRROR)))
97+
.sources(Source.optionalListOf(readValue(v, SOURCES)))
98+
.sealed(readBoolean(v, SEALED))
99+
.allowRollup(readBoolean(v, ALLOW_ROLLUP_HDRS))
100+
.allowDirect(readBoolean(v, ALLOW_DIRECT))
101+
.mirrorDirect(readBoolean(v, MIRROR_DIRECT))
102+
.denyDelete(readBoolean(v, DENY_DELETE))
103+
.denyPurge(readBoolean(v, DENY_PURGE))
104+
.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT))
105+
.metadata(readStringStringMap(v, METADATA))
106+
.firstSequence(readLong(v, FIRST_SEQ, 1))
107+
.allowMessageTtl(readBoolean(v, ALLOW_MSG_TTL))
108+
.subjectDeleteMarkerTtl(readNanos(v, SUBJECT_DELETE_MARKER_TTL))
109+
.build();
110110
}
111111

112112
// For the builder, assumes all validations are already done in builder
@@ -1060,35 +1060,35 @@ public Builder firstSequence(long firstSeq) {
10601060
return this;
10611061
}
10621062

1063-
// /**
1064-
// * Set to allow per message TTL to true
1065-
// * @return The Builder
1066-
// */
1067-
// public Builder allowMessageTtl() {
1068-
// this.allowMessageTtl = true;
1069-
// return this;
1070-
// }
1071-
1072-
// /**
1073-
// * Set allow per message TTL flag
1074-
// * @param allowMessageTtl the flag
1075-
// * @return The Builder
1076-
// */
1077-
// public Builder allowMessageTtl(boolean allowMessageTtl) {
1078-
// this.allowMessageTtl = allowMessageTtl;
1079-
// return this;
1080-
// }
1081-
1082-
// /**
1083-
// * The time delete marker TTL duration. Server accepts 1 second or more.
1084-
// * CLIENT DOES NOT VALIDATE
1085-
// * @param subjectDeleteMarkerTtl the TTL duration
1086-
// * @return The Builder
1087-
// */
1088-
// public Builder subjectDeleteMarkerTtl(Duration subjectDeleteMarkerTtl) {
1089-
// this.subjectDeleteMarkerTtl = subjectDeleteMarkerTtl;
1090-
// return this;
1091-
// }
1063+
/**
1064+
* Set to allow per message TTL to true
1065+
* @return The Builder
1066+
*/
1067+
public Builder allowMessageTtl() {
1068+
this.allowMessageTtl = true;
1069+
return this;
1070+
}
1071+
1072+
/**
1073+
* Set allow per message TTL flag
1074+
* @param allowMessageTtl the flag
1075+
* @return The Builder
1076+
*/
1077+
public Builder allowMessageTtl(boolean allowMessageTtl) {
1078+
this.allowMessageTtl = allowMessageTtl;
1079+
return this;
1080+
}
1081+
1082+
/**
1083+
* The time delete marker TTL duration. Server accepts 1 second or more.
1084+
* CLIENT DOES NOT VALIDATE
1085+
* @param subjectDeleteMarkerTtl the TTL duration
1086+
* @return The Builder
1087+
*/
1088+
public Builder subjectDeleteMarkerTtl(Duration subjectDeleteMarkerTtl) {
1089+
this.subjectDeleteMarkerTtl = subjectDeleteMarkerTtl;
1090+
return this;
1091+
}
10921092

10931093
/**
10941094
* Builds the StreamConfiguration

src/main/java/io/nats/client/impl/NatsJetStream.java

+1
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ private Headers mergePublishOptions(Headers headers, PublishOptions opts) {
198198
merged = mergeString(merged, EXPECTED_LAST_MSG_ID_HDR, opts.getExpectedLastMsgId());
199199
merged = mergeString(merged, EXPECTED_STREAM_HDR, opts.getExpectedStream());
200200
merged = mergeString(merged, MSG_ID_HDR, opts.getMessageId());
201+
merged = mergeString(merged, MSG_TTL_HDR, opts.getMessageTtl());
201202
}
202203

203204
return merged;

src/main/java/io/nats/client/support/NatsJetStreamConstants.java

+2
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,13 @@ public interface NatsJetStreamConstants {
8585
String EXPECTED_LAST_SEQ_HDR = "Nats-Expected-Last-Sequence";
8686
String EXPECTED_LAST_MSG_ID_HDR = "Nats-Expected-Last-Msg-Id";
8787
String EXPECTED_LAST_SUB_SEQ_HDR = "Nats-Expected-Last-Subject-Sequence";
88+
String MSG_TTL_HDR = "Nats-TTL";
8889

8990
String LAST_CONSUMER_HDR = "Nats-Last-Consumer";
9091
String LAST_STREAM_HDR = "Nats-Last-Stream";
9192
String CONSUMER_STALLED_HDR = "Nats-Consumer-Stalled";
9293
String MSG_SIZE_HDR = "Nats-Msg-Size";
94+
String NATS_MARKER_REASON_HDR = "Nats-Marker-Reason";
9395

9496
String ROLLUP_HDR = "Nats-Rollup";
9597
String ROLLUP_HDR_SUBJECT = "sub";

src/test/java/io/nats/client/api/StreamConfigurationTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ public void testConstruction() {
190190
.metadata(testSc.getMetadata())
191191
.firstSequence(testSc.getFirstSequence())
192192
.consumerLimits(testSc.getConsumerLimits())
193-
// .allowMessageTtl(testSc.isAllowMessageTtl())
194-
// .subjectDeleteMarkerTtl(testSc.getSubjectDeleteMarkerTtl())
193+
.allowMessageTtl(testSc.isAllowMessageTtl())
194+
.subjectDeleteMarkerTtl(testSc.getSubjectDeleteMarkerTtl())
195195
;
196196
validate(builder.build(), false, DEFAULT_STREAM_NAME);
197197
validate(builder.addSources((Source)null).build(), false, DEFAULT_STREAM_NAME);
@@ -518,8 +518,8 @@ private void validate(StreamConfiguration sc, boolean serverTest, String name) {
518518
assertEquals(StorageType.Memory, sc.getStorageType());
519519
assertSame(DiscardPolicy.New, sc.getDiscardPolicy());
520520

521-
// assertTrue(sc.isAllowMessageTtl());
522-
// assertEquals(Duration.ofNanos(73000000000L), sc.getSubjectDeleteMarkerTtl());
521+
assertTrue(sc.isAllowMessageTtl());
522+
assertEquals(Duration.ofNanos(73000000000L), sc.getSubjectDeleteMarkerTtl());
523523

524524
assertNotNull(sc.getPlacement());
525525
assertEquals("clstr", sc.getPlacement().getCluster());

0 commit comments

Comments
 (0)