Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per Message TTL Support for 2.11 #1295

Merged
merged 3 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 67 additions & 17 deletions src/main/java/io/nats/client/PublishOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ public class PublishOptions {
private final long expectedLastSeq;
private final long expectedLastSubSeq;
private final String msgId;
private final String messageTtl;

private PublishOptions(String stream, Duration streamTimeout, String expectedStream, String expectedLastId, long expectedLastSeq, long expectedLastSubSeq, String msgId) {
this.stream = stream;
this.streamTimeout = streamTimeout;
this.expectedStream = expectedStream;
this.expectedLastId = expectedLastId;
this.expectedLastSeq = expectedLastSeq;
this.expectedLastSubSeq = expectedLastSubSeq;
this.msgId = msgId;
private PublishOptions(Builder b) {
this.stream = b.stream;
this.streamTimeout = b.streamTimeout;
this.expectedStream = b.expectedStream;
this.expectedLastId = b.expectedLastId;
this.expectedLastSeq = b.expectedLastSeq;
this.expectedLastSubSeq = b.expectedLastSubSeq;
this.msgId = b.msgId;
this.messageTtl = b.messageTtl;
}

/**
Expand Down Expand Up @@ -122,6 +124,15 @@ public String getMessageId() {
return this.msgId;
}

/**
* Gets the message ttl string. Might be null. Might be "never".
* 10 seconds would be "10s" for the server
* @return the message ttl string
*/
public String getMessageTtl() {
return messageTtl;
}

/**
* Creates a builder for the options.
* @return the builder
Expand All @@ -144,6 +155,7 @@ public static class Builder {
long expectedLastSeq = UNSET_LAST_SEQUENCE;
long expectedLastSubSeq = UNSET_LAST_SEQUENCE;
String msgId;
String messageTtl;

/**
* Constructs a new publish options Builder with the default values.
Expand All @@ -169,7 +181,7 @@ public Builder(Properties properties) {
/**
* Sets the stream name for publishing. The default is undefined.
* @param stream The name of the stream.
* @return Builder
* @return The Builder
*/
public Builder stream(String stream) {
this.stream = validateStreamName(stream, false);
Expand All @@ -180,7 +192,7 @@ public Builder stream(String stream) {
* Sets the timeout to wait for a publish acknowledgement from a JetStream
* enabled NATS server.
* @param timeout the publish timeout.
* @return Builder
* @return The Builder
*/
public Builder streamTimeout(Duration timeout) {
this.streamTimeout = validateDurationNotRequiredGtOrEqZero(timeout, DEFAULT_TIMEOUT);
Expand All @@ -191,7 +203,7 @@ public Builder streamTimeout(Duration timeout) {
* Sets the expected stream for the publish. If the
* stream does not match the server will not save the message.
* @param stream expected stream
* @return builder
* @return The Builder
*/
public Builder expectedStream(String stream) {
expectedStream = validateStreamName(stream, false);
Expand All @@ -202,7 +214,7 @@ public Builder expectedStream(String stream) {
* Sets the expected last ID of the previously published message. If the
* message ID does not match the server will not save the message.
* @param lastMsgId the stream
* @return builder
* @return The Builder
*/
public Builder expectedLastMsgId(String lastMsgId) {
expectedLastId = emptyAsNull(lastMsgId);
Expand All @@ -212,7 +224,7 @@ public Builder expectedLastMsgId(String lastMsgId) {
/**
* Sets the expected message sequence of the publish
* @param sequence the expected last sequence number
* @return builder
* @return The Builder
*/
public Builder expectedLastSequence(long sequence) {
// 0 has NO meaning to expectedLastSequence but we except 0 b/c the sequence is really a ulong
Expand All @@ -223,7 +235,7 @@ public Builder expectedLastSequence(long sequence) {
/**
* Sets the expected subject message sequence of the publish
* @param sequence the expected last subject sequence number
* @return builder
* @return The Builder
*/
public Builder expectedLastSubjectSequence(long sequence) {
expectedLastSubSeq = validateGtEqMinus1(sequence, "Last Subject Sequence");
Expand All @@ -234,17 +246,55 @@ public Builder expectedLastSubjectSequence(long sequence) {
* Sets the message id. Message IDs are used for de-duplication
* and should be unique to each message payload.
* @param msgId the unique message id.
* @return builder
* @return The Builder
*/
public Builder messageId(String msgId) {
this.msgId = emptyAsNull(msgId);
return this;
}

/**
* Sets the TTL for this specific message to be published
* @param msgTtlSeconds the ttl in seconds
* @return The Builder
*/
public Builder messageTtlSeconds(int msgTtlSeconds) {
this.messageTtl = msgTtlSeconds < 1 ? null : msgTtlSeconds + "s";
return this;
}

/**
* Sets the TTL for this specific message to be published. Use at your own risk.
* The current specification can be found here @see <a href="https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-43.md#per-message-ttl">JetStream Per-Message TTL</a>
* @param messageTtlCustom the ttl in seconds
* @return The Builder
*/
public Builder messageTtlCustom(String messageTtlCustom) {
if (messageTtlCustom == null) {
this.messageTtl = null;
}
else {
this.messageTtl = messageTtlCustom.trim();
if (this.messageTtl.isEmpty()) {
this.messageTtl = null;
}
}
return this;
}

/**
* Sets the TTL for this specific message to be published and never be expired
* @return The Builder
*/
public Builder messageTtlNever() {
this.messageTtl = "never";
return this;
}

/**
* Clears the expected so the build can be re-used.
* Clears the expectedLastId, expectedLastSequence and messageId fields.
* @return builder
* @return The Builder
*/
public Builder clearExpected() {
expectedLastId = null;
Expand All @@ -259,7 +309,7 @@ public Builder clearExpected() {
* @return publish options
*/
public PublishOptions build() {
return new PublishOptions(stream, streamTimeout, expectedStream, expectedLastId, expectedLastSeq, expectedLastSubSeq, msgId);
return new PublishOptions(this);
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/api/AckPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public enum AckPolicy {
*/
Explicit("explicit");

private String policy;
private final String policy;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happened to be here and the IDE yelled at me, so I fixed it.


AckPolicy(String p) {
policy = p;
Expand Down
130 changes: 65 additions & 65 deletions src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,42 +71,42 @@ public class StreamConfiguration implements JsonSerializable {
private final Duration subjectDeleteMarkerTtl;

static StreamConfiguration instance(JsonValue v) {
Builder builder = new Builder();
builder.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION)));
builder.compressionOption(CompressionOption.get(readString(v, COMPRESSION)));
builder.storageType(StorageType.get(readString(v, STORAGE)));
builder.discardPolicy(DiscardPolicy.get(readString(v, DISCARD)));
builder.name(readString(v, NAME));
builder.description(readString(v, DESCRIPTION));
builder.maxConsumers(readLong(v, MAX_CONSUMERS, -1));
builder.maxMessages(readLong(v, MAX_MSGS, -1));
builder.maxMessagesPerSubject(readLong(v, MAX_MSGS_PER_SUB, -1));
builder.maxBytes(readLong(v, MAX_BYTES, -1));
builder.maxAge(readNanos(v, MAX_AGE));
builder.maximumMessageSize(readInteger(v, MAX_MSG_SIZE, -1));
builder.replicas(readInteger(v, NUM_REPLICAS, 1));
builder.noAck(readBoolean(v, NO_ACK));
builder.templateOwner(readString(v, TEMPLATE_OWNER));
builder.duplicateWindow(readNanos(v, DUPLICATE_WINDOW));
builder.subjects(readStringList(v, SUBJECTS));
builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT)));
builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH)));
builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)));
builder.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)));
builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR)));
builder.sources(Source.optionalListOf(readValue(v, SOURCES)));
builder.sealed(readBoolean(v, SEALED));
builder.allowRollup(readBoolean(v, ALLOW_ROLLUP_HDRS));
builder.allowDirect(readBoolean(v, ALLOW_DIRECT));
builder.mirrorDirect(readBoolean(v, MIRROR_DIRECT));
builder.denyDelete(readBoolean(v, DENY_DELETE));
builder.denyPurge(readBoolean(v, DENY_PURGE));
builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT));
builder.metadata(readStringStringMap(v, METADATA));
builder.firstSequence(readLong(v, FIRST_SEQ, 1));
// builder.allowMessageTtl(readBoolean(v, ALLOW_MSG_TTL));
// builder.subjectDeleteMarkerTtl(readNanos(v, SUBJECT_DELETE_MARKER_TTL));
return builder.build();
return new Builder()
.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION)))
.compressionOption(CompressionOption.get(readString(v, COMPRESSION)))
.storageType(StorageType.get(readString(v, STORAGE)))
.discardPolicy(DiscardPolicy.get(readString(v, DISCARD)))
.name(readString(v, NAME))
.description(readString(v, DESCRIPTION))
.maxConsumers(readLong(v, MAX_CONSUMERS, -1))
.maxMessages(readLong(v, MAX_MSGS, -1))
.maxMessagesPerSubject(readLong(v, MAX_MSGS_PER_SUB, -1))
.maxBytes(readLong(v, MAX_BYTES, -1))
.maxAge(readNanos(v, MAX_AGE))
.maximumMessageSize(readInteger(v, MAX_MSG_SIZE, -1))
.replicas(readInteger(v, NUM_REPLICAS, 1))
.noAck(readBoolean(v, NO_ACK))
.templateOwner(readString(v, TEMPLATE_OWNER))
.duplicateWindow(readNanos(v, DUPLICATE_WINDOW))
.subjects(readStringList(v, SUBJECTS))
.placement(Placement.optionalInstance(readValue(v, PLACEMENT)))
.republish(Republish.optionalInstance(readValue(v, REPUBLISH)))
.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM)))
.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS)))
.mirror(Mirror.optionalInstance(readValue(v, MIRROR)))
.sources(Source.optionalListOf(readValue(v, SOURCES)))
.sealed(readBoolean(v, SEALED))
.allowRollup(readBoolean(v, ALLOW_ROLLUP_HDRS))
.allowDirect(readBoolean(v, ALLOW_DIRECT))
.mirrorDirect(readBoolean(v, MIRROR_DIRECT))
.denyDelete(readBoolean(v, DENY_DELETE))
.denyPurge(readBoolean(v, DENY_PURGE))
.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT))
.metadata(readStringStringMap(v, METADATA))
.firstSequence(readLong(v, FIRST_SEQ, 1))
.allowMessageTtl(readBoolean(v, ALLOW_MSG_TTL))
.subjectDeleteMarkerTtl(readNanos(v, SUBJECT_DELETE_MARKER_TTL))
.build();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no sure why I didn't use the chain method before. This wasn't a necessary change.

}

// For the builder, assumes all validations are already done in builder
Expand Down Expand Up @@ -1060,35 +1060,35 @@ public Builder firstSequence(long firstSeq) {
return this;
}

// /**
// * Set to allow per message TTL to true
// * @return The Builder
// */
// public Builder allowMessageTtl() {
// this.allowMessageTtl = true;
// return this;
// }

// /**
// * Set allow per message TTL flag
// * @param allowMessageTtl the flag
// * @return The Builder
// */
// public Builder allowMessageTtl(boolean allowMessageTtl) {
// this.allowMessageTtl = allowMessageTtl;
// return this;
// }

// /**
// * The time delete marker TTL duration. Server accepts 1 second or more.
// * CLIENT DOES NOT VALIDATE
// * @param subjectDeleteMarkerTtl the TTL duration
// * @return The Builder
// */
// public Builder subjectDeleteMarkerTtl(Duration subjectDeleteMarkerTtl) {
// this.subjectDeleteMarkerTtl = subjectDeleteMarkerTtl;
// return this;
// }
/**
* Set to allow per message TTL to true
* @return The Builder
*/
public Builder allowMessageTtl() {
this.allowMessageTtl = true;
return this;
}

/**
* Set allow per message TTL flag
* @param allowMessageTtl the flag
* @return The Builder
*/
public Builder allowMessageTtl(boolean allowMessageTtl) {
this.allowMessageTtl = allowMessageTtl;
return this;
}

/**
* The time delete marker TTL duration. Server accepts 1 second or more.
* CLIENT DOES NOT VALIDATE
* @param subjectDeleteMarkerTtl the TTL duration
* @return The Builder
*/
public Builder subjectDeleteMarkerTtl(Duration subjectDeleteMarkerTtl) {
this.subjectDeleteMarkerTtl = subjectDeleteMarkerTtl;
return this;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coded, then commented out, then back in.


/**
* Builds the StreamConfiguration
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ private Headers mergePublishOptions(Headers headers, PublishOptions opts) {
merged = mergeString(merged, EXPECTED_LAST_MSG_ID_HDR, opts.getExpectedLastMsgId());
merged = mergeString(merged, EXPECTED_STREAM_HDR, opts.getExpectedStream());
merged = mergeString(merged, MSG_ID_HDR, opts.getMessageId());
merged = mergeString(merged, MSG_TTL_HDR, opts.getMessageTtl());
}

return merged;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@ public interface NatsJetStreamConstants {
String EXPECTED_LAST_SEQ_HDR = "Nats-Expected-Last-Sequence";
String EXPECTED_LAST_MSG_ID_HDR = "Nats-Expected-Last-Msg-Id";
String EXPECTED_LAST_SUB_SEQ_HDR = "Nats-Expected-Last-Subject-Sequence";
String MSG_TTL_HDR = "Nats-TTL";

String LAST_CONSUMER_HDR = "Nats-Last-Consumer";
String LAST_STREAM_HDR = "Nats-Last-Stream";
String CONSUMER_STALLED_HDR = "Nats-Consumer-Stalled";
String MSG_SIZE_HDR = "Nats-Msg-Size";
String NATS_MARKER_REASON_HDR = "Nats-Marker-Reason";

String ROLLUP_HDR = "Nats-Rollup";
String ROLLUP_HDR_SUBJECT = "sub";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ public void testConstruction() {
.metadata(testSc.getMetadata())
.firstSequence(testSc.getFirstSequence())
.consumerLimits(testSc.getConsumerLimits())
// .allowMessageTtl(testSc.isAllowMessageTtl())
// .subjectDeleteMarkerTtl(testSc.getSubjectDeleteMarkerTtl())
.allowMessageTtl(testSc.isAllowMessageTtl())
.subjectDeleteMarkerTtl(testSc.getSubjectDeleteMarkerTtl())
;
validate(builder.build(), false, DEFAULT_STREAM_NAME);
validate(builder.addSources((Source)null).build(), false, DEFAULT_STREAM_NAME);
Expand Down Expand Up @@ -518,8 +518,8 @@ private void validate(StreamConfiguration sc, boolean serverTest, String name) {
assertEquals(StorageType.Memory, sc.getStorageType());
assertSame(DiscardPolicy.New, sc.getDiscardPolicy());

// assertTrue(sc.isAllowMessageTtl());
// assertEquals(Duration.ofNanos(73000000000L), sc.getSubjectDeleteMarkerTtl());
assertTrue(sc.isAllowMessageTtl());
assertEquals(Duration.ofNanos(73000000000L), sc.getSubjectDeleteMarkerTtl());

assertNotNull(sc.getPlacement());
assertEquals("clstr", sc.getPlacement().getCluster());
Expand Down
Loading
Loading