Skip to content

Commit b702fa8

Browse files
WIP: fast batch reply
Signed-off-by: Maurice van Veen <[email protected]>
1 parent c09af0b commit b702fa8

File tree

5 files changed

+318
-159
lines changed

5 files changed

+318
-159
lines changed

server/errors.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2010,10 +2010,10 @@
20102010
"deprecates": ""
20112011
},
20122012
{
2013-
"constant": "JSBatchPublishMissingSeqErr",
2013+
"constant": "JSBatchPublishInvalidPatternErr",
20142014
"code": 400,
20152015
"error_code": 10203,
2016-
"description": "batch publish sequence is missing",
2016+
"description": "batch publish pattern is invalid",
20172017
"comment": "",
20182018
"help": "",
20192019
"url": "",

server/jetstream.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,7 +1458,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
14581458
smv StoreMsg
14591459
batchId string
14601460
batchSeq uint64
1461-
atomic bool
14621461
commit bool
14631462
commitEob bool
14641463
batchStoreDir string
@@ -1470,10 +1469,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
14701469
if err != nil || sm == nil {
14711470
goto SKIP
14721471
}
1473-
batchId, atomic = getBatchId(sm.hdr)
1472+
batchId = getBatchId(sm.hdr)
14741473
batchSeq, ok = getBatchSequence(sm.hdr)
14751474
commit = len(sliceHeader(JSBatchCommit, sm.hdr)) != 0
1476-
if batchId == _EMPTY_ || !atomic || !ok || commit {
1475+
if batchId == _EMPTY_ || !ok || commit {
14771476
goto SKIP
14781477
}
14791478
// We've observed a partial batch write. Write the remainder of the batch.

server/jetstream_batching_test.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2902,15 +2902,24 @@ func TestJetStreamFastBatchPublish(t *testing.T) {
29022902
_, err := jsStreamCreate(t, nc, cfg)
29032903
require_NoError(t, err)
29042904

2905-
reply := nats.NewInbox()
2906-
sub, err := nc.SubscribeSync(reply)
2905+
inbox := nats.NewInbox()
2906+
generateReply := func(batchId string, batchSeq uint64, flow uint64, gap string, v int) string {
2907+
return fmt.Sprintf("%s.%d.%s.%s.%d.%d.$FI", inbox, flow, gap, batchId, batchSeq, v)
2908+
}
2909+
generateNormalReply := func(batchId string, batchSeq uint64, gap string) string {
2910+
return generateReply(batchId, batchSeq, 0, gap, 0)
2911+
}
2912+
generateCommitReply := func(batchId string, batchSeq uint64, gap string) string {
2913+
return generateReply(batchId, batchSeq, 0, gap, 1)
2914+
}
2915+
2916+
sub, err := nc.SubscribeSync(fmt.Sprintf("%s.>", inbox))
29072917
require_NoError(t, err)
29082918
defer sub.Drain()
29092919

29102920
m := nats.NewMsg("foo.0")
2911-
m.Reply = reply
2921+
m.Reply = generateNormalReply("uuid", 0, JSFastBatchGapFail)
29122922
m.Data = []byte("foo.0")
2913-
m.Header.Set("Nats-Fast-Batch-Id", "uuid")
29142923

29152924
// Publish with batch publish disabled.
29162925
require_NoError(t, nc.PublishMsg(m))
@@ -2925,17 +2934,18 @@ func TestJetStreamFastBatchPublish(t *testing.T) {
29252934
_, err = jsStreamUpdate(t, nc, cfg)
29262935
require_NoError(t, err)
29272936

2928-
// Publish without batch sequence errors.
2937+
// Publish with incorrect batch sequence errors.
2938+
m.Reply = generateNormalReply("uuid", 0, JSFastBatchGapFail)
29292939
require_NoError(t, nc.PublishMsg(m))
29302940
rmsg, err = sub.NextMsg(time.Second)
29312941
require_NoError(t, err)
29322942
pubAck = JSPubAckResponse{}
29332943
require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck))
2934-
require_Error(t, pubAck.Error, NewJSBatchPublishMissingSeqError())
2944+
require_Error(t, pubAck.Error, NewJSBatchPublishInvalidPatternError())
29352945

29362946
// A batch ID must not exceed the maximum length.
29372947
longBatchId := strings.Repeat("A", 65)
2938-
m.Header.Set("Nats-Fast-Batch-Id", longBatchId)
2948+
m.Reply = generateNormalReply(longBatchId, 1, JSFastBatchGapFail)
29392949
require_NoError(t, nc.PublishMsg(m))
29402950
rmsg, err = sub.NextMsg(time.Second)
29412951
require_NoError(t, err)
@@ -2945,8 +2955,7 @@ func TestJetStreamFastBatchPublish(t *testing.T) {
29452955
require_Error(t, pubAck.Error, NewJSBatchPublishInvalidBatchIDError())
29462956

29472957
// Publish a batch, misses start.
2948-
m.Header.Set("Nats-Fast-Batch-Id", "uuid")
2949-
m.Header.Set("Nats-Batch-Sequence", "2")
2958+
m.Reply = generateNormalReply("uuid", 2, JSFastBatchGapFail)
29502959
require_NoError(t, nc.PublishMsg(m))
29512960
rmsg, err = sub.NextMsg(time.Second)
29522961
require_NoError(t, err)
@@ -2955,9 +2964,7 @@ func TestJetStreamFastBatchPublish(t *testing.T) {
29552964
require_Error(t, pubAck.Error, NewJSBatchPublishUnknownBatchIDError())
29562965

29572966
// Publish a "batch" which immediately commits.
2958-
m.Header.Set("Nats-Fast-Batch-Id", "uuid")
2959-
m.Header.Set("Nats-Batch-Sequence", "1")
2960-
m.Header.Set("Nats-Batch-Commit", "1")
2967+
m.Reply = generateCommitReply("uuid", 1, JSFastBatchGapFail)
29612968
require_NoError(t, nc.PublishMsg(m))
29622969
rmsg, err = sub.NextMsg(time.Second)
29632970
require_NoError(t, err)
@@ -2973,13 +2980,13 @@ func TestJetStreamFastBatchPublish(t *testing.T) {
29732980
require_Equal(t, pubAck.BatchSize, 1)
29742981

29752982
// Publish a batch of N messages.
2976-
m.Header.Del("Nats-Batch-Commit")
29772983
for seq, batch := uint64(1), uint64(5); seq <= batch; seq++ {
29782984
m.Subject = fmt.Sprintf("foo.%d", seq)
29792985
m.Data = []byte(m.Subject)
2980-
m.Header.Set("Nats-Batch-Sequence", strconv.FormatUint(seq, 10))
29812986
if seq == batch {
2982-
m.Header.Set("Nats-Batch-Commit", "1")
2987+
m.Reply = generateCommitReply("uuid", seq, JSFastBatchGapFail)
2988+
} else {
2989+
m.Reply = generateNormalReply("uuid", seq, JSFastBatchGapFail)
29832990
}
29842991
require_NoError(t, nc.PublishMsg(m))
29852992

server/jetstream_errors_generated.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ const (
4444
// JSBatchPublishInvalidGapModeErr batch publish gap mode is invalid
4545
JSBatchPublishInvalidGapModeErr ErrorIdentifier = 10206
4646

47-
// JSBatchPublishMissingSeqErr batch publish sequence is missing
48-
JSBatchPublishMissingSeqErr ErrorIdentifier = 10203
47+
// JSBatchPublishInvalidPatternErr batch publish pattern is invalid
48+
JSBatchPublishInvalidPatternErr ErrorIdentifier = 10203
4949

5050
// JSBatchPublishUnknownBatchIDErr batch publish ID unknown
5151
JSBatchPublishUnknownBatchIDErr ErrorIdentifier = 10205
@@ -639,7 +639,7 @@ var (
639639
JSBatchPublishDisabledErr: {Code: 400, ErrCode: 10202, Description: "batch publish is disabled"},
640640
JSBatchPublishInvalidBatchIDErr: {Code: 400, ErrCode: 10204, Description: "batch publish ID is invalid"},
641641
JSBatchPublishInvalidGapModeErr: {Code: 400, ErrCode: 10206, Description: "batch publish gap mode is invalid"},
642-
JSBatchPublishMissingSeqErr: {Code: 400, ErrCode: 10203, Description: "batch publish sequence is missing"},
642+
JSBatchPublishInvalidPatternErr: {Code: 400, ErrCode: 10203, Description: "batch publish pattern is invalid"},
643643
JSBatchPublishUnknownBatchIDErr: {Code: 400, ErrCode: 10205, Description: "batch publish ID unknown"},
644644
JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"},
645645
JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "{err}"},
@@ -999,14 +999,14 @@ func NewJSBatchPublishInvalidGapModeError(opts ...ErrorOption) *ApiError {
999999
return ApiErrors[JSBatchPublishInvalidGapModeErr]
10001000
}
10011001

1002-
// NewJSBatchPublishMissingSeqError creates a new JSBatchPublishMissingSeqErr error: "batch publish sequence is missing"
1003-
func NewJSBatchPublishMissingSeqError(opts ...ErrorOption) *ApiError {
1002+
// NewJSBatchPublishInvalidPatternError creates a new JSBatchPublishInvalidPatternErr error: "batch publish pattern is invalid"
1003+
func NewJSBatchPublishInvalidPatternError(opts ...ErrorOption) *ApiError {
10041004
eopts := parseOpts(opts)
10051005
if ae, ok := eopts.err.(*ApiError); ok {
10061006
return ae
10071007
}
10081008

1009-
return ApiErrors[JSBatchPublishMissingSeqErr]
1009+
return ApiErrors[JSBatchPublishInvalidPatternErr]
10101010
}
10111011

10121012
// NewJSBatchPublishUnknownBatchIDError creates a new JSBatchPublishUnknownBatchIDErr error: "batch publish ID unknown"

0 commit comments

Comments
 (0)