13
13
14
14
package io .nats .client ;
15
15
16
- import io .nats .client .api .ConsumerConfiguration ;
17
16
import io .nats .client .support .JsonParseException ;
18
17
import io .nats .client .support .JsonParser ;
19
18
import io .nats .client .support .JsonSerializable ;
20
19
import io .nats .client .support .JsonValue ;
21
20
22
21
import static io .nats .client .support .ApiConstants .*;
23
22
import static io .nats .client .support .JsonUtils .*;
23
+ import static io .nats .client .support .JsonValueUtils .readBoolean ;
24
24
import static io .nats .client .support .JsonValueUtils .readInteger ;
25
25
import static io .nats .client .support .JsonValueUtils .readLong ;
26
+ import static io .nats .client .support .JsonValueUtils .*;
26
27
27
28
/**
28
29
* Base Consume Options are provided to customize the way the consume and
@@ -40,13 +41,15 @@ public class BaseConsumeOptions implements JsonSerializable {
40
41
protected final int messages ;
41
42
protected final long bytes ;
42
43
protected final long expiresIn ;
43
- protected final long idleHeartbeat ;
44
44
protected final int thresholdPercent ;
45
- protected final boolean noWait ;
45
+ protected final long idleHeartbeat ;
46
+ protected final String group ;
47
+ protected final long minPending ;
48
+ protected final long minAckPending ;
46
49
protected final boolean raiseStatusWarnings ;
47
50
48
- @ SuppressWarnings ( "rawtypes" ) // Don't need the type of the builder to get its vars
49
- protected BaseConsumeOptions ( Builder b ) {
51
+ protected BaseConsumeOptions ( Builder <?, ?> b ) {
52
+ // Message / bytes is part of base and is calculated
50
53
bytes = b .bytes ;
51
54
if (bytes > 0 ) {
52
55
messages = b .messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b .messages ;
@@ -55,23 +58,17 @@ protected BaseConsumeOptions(Builder b) {
55
58
messages = b .messages < 0 ? DEFAULT_MESSAGE_COUNT : b .messages ;
56
59
}
57
60
58
- // validation handled in builder
61
+ // Validation for expiresIn, if any extra, is handled in subclass builder
62
+ expiresIn = b .expiresIn ;
59
63
thresholdPercent = b .thresholdPercent ;
60
- noWait = b .noWait ;
61
- raiseStatusWarnings = b .raiseStatusWarnings ;
62
64
63
- // if it's not noWait, it must have an expiresIn
64
- // we can't check this in the builder because we can't guarantee order
65
- // so we always default to LONG_UNSET in the builder and check it here.
66
- if (b .expiresIn == ConsumerConfiguration .LONG_UNSET && !noWait ) {
67
- expiresIn = DEFAULT_EXPIRES_IN_MILLIS ;
68
- }
69
- else {
70
- expiresIn = b .expiresIn ;
71
- }
72
-
73
- // calculated
65
+ // 3. idleHeartbeat is part of base and is calculated.
74
66
idleHeartbeat = Math .min (MAX_HEARTBEAT_MILLIS , expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100 );
67
+
68
+ this .group = b .group ;
69
+ this .minPending = b .minPending ;
70
+ this .minAckPending = b .minAckPending ;
71
+ raiseStatusWarnings = b .raiseStatusWarnings ;
75
72
}
76
73
77
74
@ Override
@@ -82,11 +79,16 @@ public String toJson() {
82
79
addField (sb , EXPIRES_IN , expiresIn );
83
80
addField (sb , IDLE_HEARTBEAT , idleHeartbeat );
84
81
addField (sb , THRESHOLD_PERCENT , thresholdPercent );
82
+ addField (sb , GROUP , group );
83
+ addField (sb , MIN_PENDING , minPending );
84
+ addField (sb , MIN_ACK_PENDING , minAckPending );
85
85
addFldWhenTrue (sb , RAISE_STATUS_WARNINGS , raiseStatusWarnings );
86
- addFldWhenTrue (sb , NO_WAIT , noWait );
86
+ subclassSpecificToJson (sb );
87
87
return endJson (sb ).toString ();
88
88
}
89
89
90
+ protected void subclassSpecificToJson (StringBuilder sb ) {}
91
+
90
92
public long getExpiresInMillis () {
91
93
return expiresIn ;
92
94
}
@@ -99,21 +101,31 @@ public int getThresholdPercent() {
99
101
return thresholdPercent ;
100
102
}
101
103
102
- public boolean isNoWait () {
103
- return noWait ;
104
- }
105
-
106
104
public boolean raiseStatusWarnings () {
107
105
return raiseStatusWarnings ;
108
106
}
109
107
108
+ public String getGroup () {
109
+ return group ;
110
+ }
111
+
112
+ public long getMinPending () {
113
+ return minPending ;
114
+ }
115
+
116
+ public long getMinAckPending () {
117
+ return minAckPending ;
118
+ }
119
+
110
120
protected static abstract class Builder <B , CO > {
111
121
protected int messages = -1 ;
112
122
protected long bytes = 0 ;
113
123
protected int thresholdPercent = DEFAULT_THRESHOLD_PERCENT ;
114
124
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS ;
115
- protected boolean noWait = false ;
116
125
protected boolean raiseStatusWarnings = false ;
126
+ protected String group ;
127
+ protected long minPending = -1 ;
128
+ protected long minAckPending = -1 ;
117
129
118
130
protected abstract B getThis ();
119
131
@@ -137,6 +149,10 @@ public B jsonValue(JsonValue jsonValue) {
137
149
bytes (readLong (jsonValue , BYTES , -1 ));
138
150
expiresIn (readLong (jsonValue , EXPIRES_IN , MIN_EXPIRES_MILLS ));
139
151
thresholdPercent (readInteger (jsonValue , THRESHOLD_PERCENT , -1 ));
152
+ raiseStatusWarnings (readBoolean (jsonValue , RAISE_STATUS_WARNINGS , false ));
153
+ group (readStringEmptyAsNull (jsonValue , GROUP ));
154
+ minPending (readLong (jsonValue , MIN_PENDING , -1 ));
155
+ minAckPending (readLong (jsonValue , MIN_ACK_PENDING , -1 ));
140
156
return getThis ();
141
157
}
142
158
@@ -210,6 +226,36 @@ public B raiseStatusWarnings(boolean raiseStatusWarnings) {
210
226
return getThis ();
211
227
}
212
228
229
+ /**
230
+ * Sets the group
231
+ * @param group the priority group for this pull
232
+ * @return Builder
233
+ */
234
+ public B group (String group ) {
235
+ this .group = group ;
236
+ return getThis ();
237
+ }
238
+
239
+ /**
240
+ * When specified, the consumer will only receive messages when the consumer has at least this many pending messages.
241
+ * @param minPending the min pending
242
+ * @return the builder
243
+ */
244
+ public B minPending (long minPending ) {
245
+ this .minPending = minPending < 1 ? -1 : minPending ;
246
+ return getThis ();
247
+ }
248
+
249
+ /**
250
+ * When specified, the consumer will only receive messages when the consumer has at least this many ack pending messages.
251
+ * @param minAckPending the min ack pending
252
+ * @return the builder
253
+ */
254
+ public B minAckPending (long minAckPending ) {
255
+ this .minAckPending = minAckPending < 1 ? -1 : minAckPending ;
256
+ return getThis ();
257
+ }
258
+
213
259
/**
214
260
* Build the options.
215
261
* @return the built options
0 commit comments