Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main for server v2.11 #1239

Merged
merged 66 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
62a73b3
Direct Batch EOD and loop changes
scottf Oct 10, 2024
c83faff
Direct Batch EOD and loop changes
scottf Oct 10, 2024
aaecbff
Direct Batch EOD and loop changes
scottf Oct 10, 2024
1ee791b
Consumer Priority Group Overflow (#1233)
scottf Oct 10, 2024
ad60f23
Merge branch 'main' into main-2-11
scottf Oct 10, 2024
cfa5cbc
Merge branch 'main' into main-2-11
scottf Oct 10, 2024
3dbc5fb
Merge branch 'main' into main-2-11
scottf Oct 11, 2024
d4667b4
Merge branch 'main' into main-2-11
scottf Oct 15, 2024
2b21962
Merge branch 'main' into main-2-11
scottf Oct 18, 2024
93510f2
Refactoring based on testing
scottf Oct 18, 2024
6b8d3df
Removed debug
scottf Oct 21, 2024
ad6694b
Removed debug
scottf Oct 21, 2024
f32791d
Support setting [min] sequence when needed
scottf Oct 21, 2024
6b82bca
In progress waiting for server PR's
scottf Oct 28, 2024
aa97394
Merge branch 'main' into main-2-11
scottf Oct 31, 2024
9cf1e16
Merge branch 'main' into main-2-11
scottf Nov 1, 2024
788c160
Server main branch has all 2.11 features
scottf Nov 1, 2024
7a9e0a5
Merge branch 'main' into main-2-11
scottf Nov 1, 2024
e0df8c0
Fixing removal
scottf Nov 1, 2024
ed87573
MessageBatchGetRequest reform based on adr
scottf Nov 5, 2024
b7379bb
more testing
scottf Nov 5, 2024
e242724
multi last for with batch
scottf Nov 7, 2024
3bfa65c
Merge branch 'refs/heads/main' into main-2-11
scottf Nov 14, 2024
6e9c067
Merge branch 'main' into main-2-11
scottf Nov 14, 2024
c97c535
Added MessageInfo getMessage(String streamName, MessageGetRequest mes…
scottf Nov 15, 2024
151b309
Merge branch 'main' into main-2-11
scottf Nov 15, 2024
050f106
Merge branch 'main' into main-2-11
scottf Nov 15, 2024
9c13f6a
Merge branch 'main' into main-2-11
scottf Nov 22, 2024
6a405ba
Update Doc
scottf Nov 24, 2024
0318b12
tuned request message batch
scottf Nov 25, 2024
592f596
Merge branch 'main' into main-2-11
scottf Nov 26, 2024
96271a1
tuned request message batch
scottf Nov 26, 2024
0aeb173
Merge branch 'main' into main-2-11
scottf Dec 3, 2024
e37d659
Merge branch 'main' into main-2-11
scottf Dec 4, 2024
e34817a
Merge branch 'main' into main-2-11
scottf Dec 4, 2024
800a684
Merge branch 'main' into main-2-11
scottf Dec 18, 2024
c1d1b58
Merge branch 'main' into main-2-11
scottf Jan 6, 2025
6884f7e
Merge branch 'main' into main-2-11
scottf Jan 9, 2025
5108a14
Merge branch 'main' into main-2-11
scottf Jan 16, 2025
b2ca330
Removed Direct Batch to Orbit
scottf Feb 13, 2025
c371e64
Merge branch 'main' into main-2-11
scottf Feb 17, 2025
e95930a
Merge branch 'main' into main-2-11
scottf Feb 18, 2025
af57d78
Merge branch 'main' into main-2-11
scottf Feb 19, 2025
0b11ac0
Merge branch 'main' into main-2-11
scottf Feb 20, 2025
804808c
Add Message TTL Stream Configuration (#1280)
scottf Feb 24, 2025
2b795b0
Merge branch 'main' into main-2-11
scottf Feb 24, 2025
18d8a43
Merge branch 'main' into main-2-11
scottf Feb 25, 2025
8f3d5fc
Fix tests after merge
scottf Feb 25, 2025
debd85d
Merge branch 'main' into main-2-11
scottf Mar 10, 2025
3791f26
Replace ed25519 with BouncyCastle
scottf Mar 15, 2025
e3b8930
Replace ed25519 with BouncyCastle
scottf Mar 15, 2025
7c3d7e3
Replace ed25519 with BouncyCastle
scottf Mar 15, 2025
c9f13a4
Replace ed25519 with BouncyCastle
scottf Mar 15, 2025
21e89c6
Merge pull request #1288 from nats-io/bouncy-castle
scottf Mar 17, 2025
219b911
Merge branch 'main' into main-2-11
scottf Mar 17, 2025
12b7e05
Merge branch 'main' into main-2-11
scottf Mar 17, 2025
8d7af6f
Merge branch 'main' into main-2-11
scottf Mar 18, 2025
63ab265
commenting out the ability to set per message ttl on stream
scottf Mar 18, 2025
e3c9b5d
Merge branch 'refs/heads/main' into main-2-11
scottf Mar 19, 2025
e9a50f7
Merge branch 'refs/heads/main' into main-2-11
scottf Mar 19, 2025
8aab5df
merge main cleanup
scottf Mar 19, 2025
df354d8
Merge pull request #1294 from nats-io/main-2-11-merge-safe
scottf Mar 19, 2025
ca351e8
missed delete marker ttl - commented out for now
scottf Mar 19, 2025
9b88979
Merge remote-tracking branch 'origin/main-2-11' into main-2-11
scottf Mar 19, 2025
69d5a76
missed delete marker ttl - commented out for now
scottf Mar 19, 2025
7dfd719
better consume options
scottf Mar 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
cd $GITHUB_WORKSPACE
git clone https://github.com/nats-io/nats-server.git
cd nats-server
git checkout consumer-groups
go get
go build main.go
mkdir -p ~/.local/bin
Expand Down
71 changes: 63 additions & 8 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
package io.nats.client;

import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.support.JsonParseException;
import io.nats.client.support.JsonParser;
import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonValue;
import io.nats.client.support.*;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonUtils.*;
import static io.nats.client.support.JsonValueUtils.readBoolean;
import static io.nats.client.support.JsonValueUtils.readInteger;
import static io.nats.client.support.JsonValueUtils.readLong;
import static io.nats.client.support.JsonValueUtils.*;

/**
* Base Consume Options are provided to customize the way the consume and
Expand All @@ -44,17 +42,23 @@ public class BaseConsumeOptions implements JsonSerializable {
protected final long idleHeartbeat;
protected final int thresholdPercent;
protected final boolean noWait;
protected final String group;
protected final long minPending;
protected final long minAckPending;

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
protected BaseConsumeOptions(Builder b) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out the correct way to do this

protected BaseConsumeOptions(Builder<?, ?> b) {
bytes = b.bytes;
if (bytes > 0) {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
}
else {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT : b.messages;
}

this.group = b.group;
this.minPending = b.minPending;
this.minAckPending = b.minAckPending;

// validation handled in builder
thresholdPercent = b.thresholdPercent;
noWait = b.noWait;
Expand Down Expand Up @@ -82,6 +86,9 @@ public String toJson() {
addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
addField(sb, THRESHOLD_PERCENT, thresholdPercent);
addFldWhenTrue(sb, NO_WAIT, noWait);
JsonUtils.addField(sb, GROUP, group);
JsonUtils.addField(sb, MIN_PENDING, minPending);
JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
return endJson(sb).toString();
}

Expand All @@ -101,12 +108,27 @@ public boolean isNoWait() {
return noWait;
}

public String getGroup() {
return group;
}

public long getMinPending() {
return minPending;
}

public long getMinAckPending() {
return minAckPending;
}

protected static abstract class Builder<B, CO> {
protected int messages = -1;
protected long bytes = 0;
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT;
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
protected boolean noWait = false;
protected String group;
protected long minPending = -1;
protected long minAckPending = -1;

protected abstract B getThis();

Expand Down Expand Up @@ -137,6 +159,9 @@ public B jsonValue(JsonValue jsonValue) {
if (readBoolean(jsonValue, NO_WAIT, false)) {
noWait();
}
group(readStringEmptyAsNull(jsonValue, GROUP));
minPending(readLong(jsonValue, MIN_PENDING, -1));
minAckPending(readLong(jsonValue, MIN_ACK_PENDING, -1));
return getThis();
}

Expand Down Expand Up @@ -190,6 +215,36 @@ public B thresholdPercent(int thresholdPercent) {
return getThis();
}

/**
* Sets the group
* @param group the priority group for this pull
* @return Builder
*/
public B group(String group) {
this.group = group;
return getThis();
}

/**
* When specified, the consumer will only receive messages when the consumer has at least this many pending messages.
* @param minPending the min pending
* @return the builder
*/
public B minPending(long minPending) {
this.minPending = minPending < 1 ? -1 : minPending;
return getThis();
}

/**
* When specified, the consumer will only receive messages when the consumer has at least this many ack pending messages.
* @param minAckPending the min ack pending
* @return the builder
*/
public B minAckPending(long minAckPending) {
this.minAckPending = minAckPending < 1 ? -1 : minAckPending;
return getThis();
}

/**
* Build the options.
* @return the built options
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/io/nats/client/PullRequestOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ public class PullRequestOptions implements JsonSerializable {
private final boolean noWait;
private final Duration expiresIn;
private final Duration idleHeartbeat;
private final String group;
private final long minPending;
private final long minAckPending;

public PullRequestOptions(Builder b) {
this.batchSize = b.batchSize;
this.maxBytes = b.maxBytes;
this.noWait = b.noWait;
this.expiresIn = b.expiresIn;
this.idleHeartbeat = b.idleHeartbeat;
this.group = b.group;
this.minPending = b.minPending < 0 ? -1 : b.minPending;
this.minAckPending = b.minAckPending < 0 ? -1 : b.minAckPending;
}

@Override
Expand All @@ -48,6 +54,10 @@ public String toJson() {
JsonUtils.addFldWhenTrue(sb, NO_WAIT, noWait);
JsonUtils.addFieldAsNanos(sb, EXPIRES, expiresIn);
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);

JsonUtils.addField(sb, GROUP, group);
JsonUtils.addField(sb, MIN_PENDING, minPending);
JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
return JsonUtils.endJson(sb).toString();
}

Expand Down Expand Up @@ -91,6 +101,18 @@ public Duration getIdleHeartbeat() {
return idleHeartbeat;
}

public String getGroup() {
return group;
}

public long getMinPending() {
return minPending;
}

public long getMinAckPending() {
return minAckPending;
}

/**
* Creates a builder for the pull options, with batch size since it's always required
* @param batchSize the size of the batch. Must be greater than 0
Expand All @@ -115,6 +137,9 @@ public static class Builder {
private boolean noWait;
private Duration expiresIn;
private Duration idleHeartbeat;
private String group;
private long minPending = -1;
private long minAckPending = -1;

/**
* Set the batch size for the pull
Expand Down Expand Up @@ -195,6 +220,37 @@ public Builder idleHeartbeat(Duration idleHeartbeat) {
return this;
}

/**
* Sets the group
* Replaces any other groups set in the builder
* @param group the priority group for this pull
* @return Builder
*/
public Builder group(String group) {
this.group = group;
return this;
}

/**
* When specified, the pull request will only receive messages when the consumer has at least this many pending messages.
* @param minPending the min pending
* @return the builder
*/
public Builder minPending(long minPending) {
this.minPending = minPending < 1 ? -1 : minPending;
return this;
}

/**
* When specified, this Pull request will only receive messages when the consumer has at least this many ack pending messages.
* @param minAckPending the min ack pending
* @return the builder
*/
public Builder minAckPending(long minAckPending) {
this.minAckPending = minAckPending < 1 ? -1 : minAckPending;
return this;
}

/**
* Build the PullRequestOptions.
* <p>Validates that the batch size is greater than 0</p>
Expand Down
Loading
Loading