Skip to content

Commit 88ced00

Browse files
WIP: finalize
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 78fd872 commit 88ced00

File tree

3 files changed

+41
-29
lines changed

3 files changed

+41
-29
lines changed

server/jetstream_batching.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ var (
3434
)
3535

3636
type batching struct {
37-
mu sync.Mutex
38-
group map[string]*batchGroup
37+
mu sync.Mutex
38+
atomic map[string]*batchGroup
39+
fast map[string]*batchGroup
3940
}
4041

4142
type batchGroup struct {
@@ -124,7 +125,7 @@ func (b *batchGroup) readyForCommit() bool {
124125
// Also returns whether a flow control message should be sent.
125126
// Lock should be held.
126127
func (batches *batching) fastBatchRegisterSequences(batchId string, batchSeq, streamSeq uint64) (*batchGroup, bool, string) {
127-
if b, ok := batches.group[batchId]; ok {
128+
if b, ok := batches.fast[batchId]; ok {
128129
b.sseq = streamSeq
129130
b.pseq = batchSeq
130131
// If the PubAck needs to be sent now as a result of a commit.
@@ -209,8 +210,10 @@ func (b *batchGroup) cleanupLocked(batchId string, batches *batching) {
209210
b.timer.Stop()
210211
if b.store != nil {
211212
b.store.Delete(true)
213+
delete(batches.atomic, batchId)
214+
} else {
215+
delete(batches.fast, batchId)
212216
}
213-
delete(batches.group, batchId)
214217
}
215218

216219
// Lock should be held.

server/jetstream_batching_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ func TestJetStreamAtomicBatchPublish(t *testing.T) {
132132

133133
pubAck = JSPubAckResponse{}
134134
require_NoError(t, json.Unmarshal(rmsg.Data, &pubAck))
135+
require_True(t, pubAck.Error == nil)
135136
require_Equal(t, pubAck.Sequence, 6)
136137
require_Equal(t, pubAck.BatchId, "uuid")
137138
require_Equal(t, pubAck.BatchSize, 5)
@@ -393,7 +394,7 @@ func TestJetStreamAtomicBatchPublishLimits(t *testing.T) {
393394
return errors.New("batches not found")
394395
}
395396
batches.mu.Lock()
396-
groups := len(batches.group)
397+
groups := len(batches.atomic)
397398
batches.mu.Unlock()
398399
if groups != 0 {
399400
return fmt.Errorf("expected 0 groups, got %d", groups)
@@ -684,8 +685,8 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) {
684685
require_NotNil(t, batches)
685686
require_NotNil(t, batch)
686687
batches.mu.Lock()
687-
groups := len(batches.group)
688-
b := batches.group["uuid"]
688+
groups := len(batches.atomic)
689+
b := batches.atomic["uuid"]
689690
batches.mu.Unlock()
690691
require_Len(t, groups, 1)
691692
require_NotNil(t, b)
@@ -720,7 +721,7 @@ func TestJetStreamAtomicBatchPublishCleanup(t *testing.T) {
720721
return fmt.Errorf("expected no batches")
721722
}
722723
batches.mu.Lock()
723-
groups = len(batches.group)
724+
groups = len(batches.atomic)
724725
batches.mu.Unlock()
725726
if groups > 0 {
726727
return fmt.Errorf("expected 0 groups, got %d", groups)
@@ -2893,7 +2894,7 @@ func TestJetStreamAtomicBatchPublishCommitUnsupported(t *testing.T) {
28932894
batches := mset.batches
28942895
mset.mu.RUnlock()
28952896
batches.mu.Lock()
2896-
groups := len(batches.group)
2897+
groups := len(batches.atomic)
28972898
batches.mu.Unlock()
28982899
require_Len(t, groups, 0)
28992900
}

server/stream.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4548,14 +4548,18 @@ func (mset *stream) unsubscribeToStream(stopping, shuttingDown bool) error {
45484548
func (mset *stream) deleteInflightBatches(shuttingDown bool) {
45494549
if mset.batches != nil {
45504550
mset.batches.mu.Lock()
4551-
for batchId, b := range mset.batches.group {
4552-
// If shutting down, do fixup during startup. In-memory batches don't require manual cleanup.
4553-
if shuttingDown {
4554-
b.stopLocked()
4555-
} else {
4556-
b.cleanupLocked(batchId, mset.batches)
4551+
clearBatches := func(group map[string]*batchGroup) {
4552+
for batchId, b := range group {
4553+
// If shutting down, do fixup during startup. In-memory batches don't require manual cleanup.
4554+
if shuttingDown {
4555+
b.stopLocked()
4556+
} else {
4557+
b.cleanupLocked(batchId, mset.batches)
4558+
}
45574559
}
45584560
}
4561+
clearBatches(mset.batches.atomic)
4562+
clearBatches(mset.batches.fast)
45594563
mset.batches.mu.Unlock()
45604564
mset.batches = nil
45614565
}
@@ -6396,9 +6400,9 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
63966400
return nil
63976401
}
63986402

6399-
// processJetStreamBatchMsg processes a JetStream message that's part of an atomic or fast batch publish.
6403+
// processJetStreamAtomicBatchMsg processes a JetStream message that's part of an atomic batch publish.
64006404
// Handles constraints around the batch, storing messages, doing consistency checks, and performing the commit.
6401-
func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr, msg []byte, mt *msgTrace) (retErr error) {
6405+
func (mset *stream) processJetStreamAtomicBatchMsg(batchId, subject, reply string, hdr, msg []byte, mt *msgTrace) (retErr error) {
64026406
mset.mu.RLock()
64036407
canRespond := !mset.cfg.NoAck && len(reply) > 0
64046408
name, stype := mset.cfg.Name, mset.cfg.Storage
@@ -6482,9 +6486,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
64826486

64836487
mset.mu.Lock()
64846488
if mset.batches == nil {
6485-
mset.batches = &batching{
6486-
group: make(map[string]*batchGroup, 1),
6487-
}
6489+
mset.batches = &batching{}
64886490
}
64896491
batches := mset.batches
64906492
mset.mu.Unlock()
@@ -6495,7 +6497,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
64956497

64966498
// Get batch.
64976499
batches.mu.Lock()
6498-
b, ok := batches.group[batchId]
6500+
b, ok := batches.atomic[batchId]
64996501
if !ok {
65006502
if batchSeq != 1 {
65016503
batches.mu.Unlock()
@@ -6523,7 +6525,7 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
65236525
}
65246526

65256527
// Confirm we can facilitate an additional batch.
6526-
if len(batches.group)+1 > maxInflightPerStream {
6528+
if len(batches.atomic)+1 > maxInflightPerStream {
65276529
batches.mu.Unlock()
65286530
return respondIncompleteBatch()
65296531
}
@@ -6542,7 +6544,10 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
65426544
batches.mu.Unlock()
65436545
return respondIncompleteBatch()
65446546
}
6545-
batches.group[batchId] = b
6547+
if batches.atomic == nil {
6548+
batches.atomic = make(map[string]*batchGroup, 1)
6549+
}
6550+
batches.atomic[batchId] = b
65466551
}
65476552

65486553
var commit, commitEob bool
@@ -6770,6 +6775,8 @@ func (mset *stream) processJetStreamBatchMsg(batchId, subject, reply string, hdr
67706775
return nil
67716776
}
67726777

6778+
// processJetStreamFastBatchMsg processes a JetStream message that's part of an atomic batch publish.
6779+
// Handles constraints around the batch, storing messages, doing consistency checks, and performing the commit.
67736780
func (mset *stream) processJetStreamFastBatchMsg(subject, reply string, hdr, msg []byte, mt *msgTrace) (retErr error) {
67746781
mset.mu.RLock()
67756782
canRespond := !mset.cfg.NoAck && len(reply) > 0
@@ -6854,16 +6861,14 @@ func (mset *stream) processJetStreamFastBatchMsg(subject, reply string, hdr, msg
68546861

68556862
mset.mu.Lock()
68566863
if mset.batches == nil {
6857-
mset.batches = &batching{
6858-
group: make(map[string]*batchGroup, 1),
6859-
}
6864+
mset.batches = &batching{}
68606865
}
68616866
batches := mset.batches
68626867
mset.mu.Unlock()
68636868

68646869
// Get batch.
68656870
batches.mu.Lock()
6866-
b, ok := batches.group[batch.id]
6871+
b, ok := batches.fast[batch.id]
68676872
if !ok {
68686873
if batch.seq != 1 {
68696874
batches.mu.Unlock()
@@ -6872,7 +6877,10 @@ func (mset *stream) processJetStreamFastBatchMsg(subject, reply string, hdr, msg
68726877
// We'll need a copy as we'll use it as a key and later for cleanup.
68736878
batchId := copyString(batch.id)
68746879
b = batches.newFastBatchGroup(mset, batchId, batch.gapOk, batch.flow)
6875-
batches.group[batchId] = b
6880+
if batches.fast == nil {
6881+
batches.fast = make(map[string]*batchGroup, 1)
6882+
}
6883+
batches.fast[batchId] = b
68766884
}
68776885

68786886
// The required API level can have the batch be rejected. But the header is always removed.
@@ -7264,7 +7272,7 @@ func (mset *stream) internalLoop() {
72647272
if len(im.rply) > 0 && strings.HasSuffix(im.rply, FastBatchSuffix) {
72657273
mset.processJetStreamFastBatchMsg(im.subj, im.rply, im.hdr, im.msg, im.mt)
72667274
} else if batchId := getBatchId(im.hdr); batchId != _EMPTY_ {
7267-
mset.processJetStreamBatchMsg(batchId, im.subj, im.rply, im.hdr, im.msg, im.mt)
7275+
mset.processJetStreamAtomicBatchMsg(batchId, im.subj, im.rply, im.hdr, im.msg, im.mt)
72687276
} else if isClustered {
72697277
mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg, im.mt, false)
72707278
} else {

0 commit comments

Comments
 (0)