Skip to content

Commit 943355b

Browse files
(2.14) [IMPROVED] JetStream header indexing
Signed-off-by: Maurice van Veen <[email protected]>
1 parent ee1261a commit 943355b

File tree

7 files changed

+404
-117
lines changed

7 files changed

+404
-117
lines changed

server/jetstream.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1587,7 +1587,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits, tq c
15871587
if commitEob && seq == state.LastSeq {
15881588
hdr = genHeader(hdr, JSBatchCommit, "1")
15891589
}
1590-
mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, sm.msg, 0, 0, nil, false, true)
1590+
var hdrIdx *jsHdrIndex
1591+
hdr, hdrIdx = indexJsHdr(hdr)
1592+
mset.processJetStreamMsg(sm.subj, _EMPTY_, hdr, hdrIdx, sm.msg, 0, 0, nil, false, true)
1593+
hdrIdx.returnToPool()
15911594
}
15921595
store.Delete(true)
15931596
SKIP:

server/jetstream_batching.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func (batch *batchApply) rejectBatchState(mset *stream) {
238238
// mset.mu lock must NOT be held or used.
239239
// mset.clMu lock must be held.
240240
func checkMsgHeadersPreClusteredProposal(
241-
diff *batchStagedDiff, mset *stream, subject string, hdr []byte, msg []byte, sourced bool, name string,
241+
diff *batchStagedDiff, mset *stream, subject string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, sourced bool, name string,
242242
jsa *jsAccount, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules bool,
243243
discard DiscardPolicy, discardNewPer bool, maxMsgSize int, maxMsgs int64, maxMsgsPer int64, maxBytes int64,
244244
) ([]byte, []byte, uint64, *ApiError, error) {
@@ -252,10 +252,13 @@ func checkMsgHeadersPreClusteredProposal(
252252
err := fmt.Errorf("JetStream header size exceeds limits for '%s > %s'", jsa.acc().Name, mset.cfg.Name)
253253
return hdr, msg, 0, NewJSStreamHeaderExceedsMaximumError(), err
254254
}
255+
}
256+
257+
if hdrIdx != nil {
255258
// Counter increments.
256259
// Only supported on counter streams, and payload must be empty (if not coming from a source).
257260
var ok bool
258-
if incr, ok = getMessageIncr(hdr); !ok {
261+
if incr, ok = hdrIdx.getMessageIncr(); !ok {
259262
apiErr := NewJSMessageIncrInvalidError()
260263
return hdr, msg, 0, apiErr, apiErr
261264
} else if incr != nil && !sourced {
@@ -269,14 +272,14 @@ func checkMsgHeadersPreClusteredProposal(
269272
} else {
270273
// Check for incompatible headers.
271274
var doErr bool
272-
if getRollup(hdr) != _EMPTY_ ||
273-
getExpectedStream(hdr) != _EMPTY_ ||
274-
getExpectedLastMsgId(hdr) != _EMPTY_ ||
275-
getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
275+
if hdrIdx.getRollup() != _EMPTY_ ||
276+
hdrIdx.getExpectedStream() != _EMPTY_ ||
277+
hdrIdx.getExpectedLastMsgId() != _EMPTY_ ||
278+
hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ {
276279
doErr = true
277-
} else if _, ok = getExpectedLastSeq(hdr); ok {
280+
} else if _, ok = hdrIdx.getExpectedLastSeq(); ok {
278281
doErr = true
279-
} else if _, ok = getExpectedLastSeqPerSubject(hdr); ok {
282+
} else if _, ok = hdrIdx.getExpectedLastSeqPerSubject(); ok {
280283
doErr = true
281284
}
282285

@@ -287,11 +290,11 @@ func checkMsgHeadersPreClusteredProposal(
287290
}
288291
}
289292
// Expected stream name can also be pre-checked.
290-
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
293+
if sname := hdrIdx.getExpectedStream(); sname != _EMPTY_ && sname != name {
291294
return hdr, msg, 0, NewJSStreamNotMatchError(), errStreamMismatch
292295
}
293296
// TTL'd messages are rejected entirely if TTLs are not enabled on the stream, or if the TTL is invalid.
294-
if ttl, err := getMessageTTL(hdr); !sourced && (ttl != 0 || err != nil) {
297+
if ttl, err := hdrIdx.getMessageTTL(); !sourced && (ttl != 0 || err != nil) {
295298
if !allowTTL {
296299
return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
297300
} else if err != nil {
@@ -300,7 +303,7 @@ func checkMsgHeadersPreClusteredProposal(
300303
}
301304
// Check for MsgIds here at the cluster level to avoid excessive CLFS accounting.
302305
// Will help during restarts.
303-
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
306+
if msgId := hdrIdx.getMsgId(); msgId != _EMPTY_ {
304307
// Dedupe if staged.
305308
if _, ok = diff.msgIds[msgId]; ok {
306309
return hdr, msg, 0, NewJSAtomicPublishContainsDuplicateMessageError(), errMsgIdDuplicate
@@ -439,9 +442,9 @@ func checkMsgHeadersPreClusteredProposal(
439442
}
440443
}
441444

442-
if len(hdr) > 0 {
445+
if hdrIdx != nil {
443446
// Expected last sequence.
444-
if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.clseq-mset.clfs {
447+
if seq, exists := hdrIdx.getExpectedLastSeq(); exists && seq != mset.clseq-mset.clfs {
445448
mlseq := mset.clseq - mset.clfs
446449
err := fmt.Errorf("last sequence mismatch: %d vs %d", seq, mlseq)
447450
return hdr, msg, 0, NewJSStreamWrongLastSequenceError(mlseq), err
@@ -452,10 +455,10 @@ func checkMsgHeadersPreClusteredProposal(
452455
}
453456

454457
// Expected last sequence per subject.
455-
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists {
458+
if seq, exists := hdrIdx.getExpectedLastSeqPerSubject(); exists {
456459
// Allow override of the subject used for the check.
457460
seqSubj := subject
458-
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
461+
if optSubj := hdrIdx.getExpectedLastSeqPerSubjectForSubject(); optSubj != _EMPTY_ {
459462
seqSubj = optSubj
460463
}
461464

@@ -509,13 +512,13 @@ func checkMsgHeadersPreClusteredProposal(
509512
diff.expectedPerSubject[seqSubj] = e
510513
}
511514
}
512-
} else if getExpectedLastSeqPerSubjectForSubject(hdr) != _EMPTY_ {
515+
} else if hdrIdx.getExpectedLastSeqPerSubjectForSubject() != _EMPTY_ {
513516
apiErr := NewJSStreamExpectedLastSeqPerSubjectInvalidError()
514517
return hdr, msg, 0, apiErr, apiErr
515518
}
516519

517520
// Message scheduling.
518-
if schedule, ok := getMessageSchedule(hdr); !ok {
521+
if schedule, ok := hdrIdx.getMessageSchedule(); !ok {
519522
apiErr := NewJSMessageSchedulesPatternInvalidError()
520523
if !allowMsgSchedules {
521524
apiErr = NewJSMessageSchedulesDisabledError()
@@ -525,12 +528,12 @@ func checkMsgHeadersPreClusteredProposal(
525528
if !allowMsgSchedules {
526529
apiErr := NewJSMessageSchedulesDisabledError()
527530
return hdr, msg, 0, apiErr, apiErr
528-
} else if scheduleTtl, ok := getMessageScheduleTTL(hdr); !ok {
531+
} else if scheduleTtl, ok := hdrIdx.getMessageScheduleTTL(); !ok {
529532
apiErr := NewJSMessageSchedulesTTLInvalidError()
530533
return hdr, msg, 0, apiErr, apiErr
531534
} else if scheduleTtl != _EMPTY_ && !allowTTL {
532535
return hdr, msg, 0, NewJSMessageTTLDisabledError(), errMsgTTLDisabled
533-
} else if scheduleTarget := getMessageScheduleTarget(hdr); scheduleTarget == _EMPTY_ ||
536+
} else if scheduleTarget := hdrIdx.getMessageScheduleTarget(); scheduleTarget == _EMPTY_ ||
534537
!IsValidPublishSubject(scheduleTarget) || SubjectsCollide(scheduleTarget, subject) {
535538
apiErr := NewJSMessageSchedulesTargetInvalidError()
536539
return hdr, msg, 0, apiErr, apiErr
@@ -547,7 +550,7 @@ func checkMsgHeadersPreClusteredProposal(
547550

548551
// Add a rollup sub header if it doesn't already exist.
549552
// Otherwise, it must exist already as a rollup on the subject.
550-
if rollup := getRollup(hdr); rollup == _EMPTY_ {
553+
if rollup := hdrIdx.getRollup(); rollup == _EMPTY_ {
551554
hdr = genHeader(hdr, JSMsgRollup, JSMsgRollupSubject)
552555
} else if rollup != JSMsgRollupSubject {
553556
apiErr := NewJSMessageSchedulesRollupInvalidError()
@@ -557,7 +560,7 @@ func checkMsgHeadersPreClusteredProposal(
557560
}
558561

559562
// Check for any rollups.
560-
if rollup := getRollup(hdr); rollup != _EMPTY_ {
563+
if rollup := hdrIdx.getRollup(); rollup != _EMPTY_ {
561564
if !allowRollup || denyPurge {
562565
err := errors.New("rollup not permitted")
563566
return hdr, msg, 0, NewJSStreamRollupFailedError(err), err

server/jetstream_batching_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,7 +1430,9 @@ func TestJetStreamAtomicBatchPublishStageAndCommit(t *testing.T) {
14301430
hdr = genHeader(hdr, key, value)
14311431
}
14321432
}
1433-
_, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes)
1433+
_, hdrIdx := indexJsHdr(hdr)
1434+
_, _, _, _, err = checkMsgHeadersPreClusteredProposal(diff, mset, m.subject, hdr, hdrIdx, m.msg, false, "TEST", nil, test.allowRollup, test.denyPurge, test.allowTTL, test.allowMsgCounter, test.allowMsgSchedules, discard, discardNewPer, -1, maxMsgs, maxMsgsPer, maxBytes)
1435+
hdrIdx.returnToPool()
14341436
if m.err != nil {
14351437
require_Error(t, err, m.err)
14361438
} else if err != nil {
@@ -1627,7 +1629,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecovery(t *testing.T) {
16271629
require_True(t, commitReady)
16281630

16291631
// Simulate the first message of the batch is committed.
1630-
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true)
1632+
_, hdrIdx := indexJsHdr(hdr1)
1633+
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true)
1634+
hdrIdx.returnToPool()
16311635
require_NoError(t, err)
16321636

16331637
// Simulate a hard kill, upon recovery the rest of the batch should be applied.
@@ -1717,7 +1721,9 @@ func TestJetStreamAtomicBatchPublishSingleServerRecoveryCommitEob(t *testing.T)
17171721
require_True(t, commitReady)
17181722

17191723
// Simulate the first message of the batch is committed.
1720-
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, nil, 0, 0, nil, false, true)
1724+
_, hdrIdx := indexJsHdr(hdr1)
1725+
err = mset.processJetStreamMsg("foo", _EMPTY_, hdr1, hdrIdx, nil, 0, 0, nil, false, true)
1726+
hdrIdx.returnToPool()
17211727
require_NoError(t, err)
17221728

17231729
// Simulate a hard kill, upon recovery the rest of the batch should be applied.

server/jetstream_cluster.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3763,7 +3763,10 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
37633763
mt = mset.getAndDeleteMsgTrace(lseq)
37643764
}
37653765
// Process the actual message here.
3766-
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock)
3766+
var hdrIdx *jsHdrIndex
3767+
hdr, hdrIdx = indexJsHdr(hdr)
3768+
err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock)
3769+
hdrIdx.returnToPool()
37673770

37683771
// If we have inflight make sure to clear after processing.
37693772
// TODO(dlc) - technically check on inflight != nil could cause datarace.
@@ -3830,7 +3833,9 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
38303833
if state.Msgs == 0 {
38313834
mset.store.Compact(lseq + 1)
38323835
// Retry
3833-
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts, mt, sourced, needLock)
3836+
hdr, hdrIdx = indexJsHdr(hdr)
3837+
err = mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, lseq, ts, mt, sourced, needLock)
3838+
hdrIdx.returnToPool()
38343839
}
38353840
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
38363841
// and what we got.
@@ -8927,7 +8932,7 @@ func (mset *stream) stateSnapshotLocked() []byte {
89278932
const streamLagWarnThreshold = 10_000
89288933

89298934
// processClusteredInboundMsg will propose the inbound message to the underlying raft group.
8930-
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg []byte, mt *msgTrace, sourced bool) (retErr error) {
8935+
func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr []byte, hdrIdx *jsHdrIndex, msg []byte, mt *msgTrace, sourced bool) (retErr error) {
89318936
// For possible error response.
89328937
var response []byte
89338938

@@ -8945,7 +8950,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
89458950
// We also invoke this in clustering mode for message tracing when not
89468951
// performing message delivery.
89478952
if node == nil || mt.traceOnly() {
8948-
return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0, mt, sourced, true)
8953+
return mset.processJetStreamMsg(subject, reply, hdr, hdrIdx, msg, 0, 0, mt, sourced, true)
89498954
}
89508955

89518956
// If message tracing (with message delivery), we will need to send the
@@ -9049,7 +9054,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
90499054
err error
90509055
)
90519056
diff := &batchStagedDiff{}
9052-
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
9057+
if hdr, msg, dseq, apiErr, err = checkMsgHeadersPreClusteredProposal(diff, mset, subject, hdr, hdrIdx, msg, sourced, name, jsa, allowRollup, denyPurge, allowTTL, allowMsgCounter, allowMsgSchedules, discard, discardNewPer, maxMsgSize, maxMsgs, maxMsgsPer, maxBytes); err != nil {
90539058
mset.clMu.Unlock()
90549059
if err == errMsgIdDuplicate && dseq > 0 {
90559060
var buf [256]byte

server/jetstream_cluster_4_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4542,7 +4542,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
45424542
validateStreamState(snap)
45434543

45444544
// Simulate a message being stored, but not calling Applied yet.
4545-
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil, false, true)
4545+
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 1, time.Now().UnixNano(), nil, false, true)
45464546
require_NoError(t, err)
45474547

45484548
// Simulate the stream being stopped before we're able to call Applied.

server/jetstream_test.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22023,7 +22023,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
2202322023
mset, err := s.globalAccount().lookupStream("TEST")
2202422024
require_NoError(t, err)
2202522025
for range 2 {
22026-
require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true))
22026+
require_NoError(t, mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 0, 0, nil, false, true))
2202722027
}
2202822028

2202922029
// We'll lock the message blocks such that we can't read, but NumPending should still function.
@@ -22051,7 +22051,7 @@ func TestJetStreamDirectGetBatchParallelWriteDeadlock(t *testing.T) {
2205122051
read.Wait()
2205222052
<-time.After(100 * time.Millisecond)
2205322053
wg.Done()
22054-
mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 0, 0, nil, false, true)
22054+
mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, nil, 0, 0, nil, false, true)
2205522055
}()
2205622056
go func() {
2205722057
// Run some time after we've entered processJetStreamMsg above.
@@ -22329,3 +22329,33 @@ func TestJetStreamServerEncryptionRecoveryWithoutStreamStateFile(t *testing.T) {
2232922329
})
2233022330
}
2233122331
}
22332+
22333+
func TestJetStreamHdrIndexUpdateHdr(t *testing.T) {
22334+
updateKey := "Nats-Update-Header"
22335+
for _, test := range []struct {
22336+
title string
22337+
updateHdr func(hdr []byte)
22338+
}{
22339+
{title: "SetHeader", updateHdr: func(hdr []byte) { setHeader(updateKey, "s", hdr) }},
22340+
{title: "GenHeader", updateHdr: func(hdr []byte) { genHeader(hdr, updateKey, "s") }},
22341+
{title: "RemoveHeaderIfPresent", updateHdr: func(hdr []byte) { removeHeaderIfPresent(hdr, updateKey) }},
22342+
{title: "RemoveHeaderIfPrefixPresent", updateHdr: func(hdr []byte) { removeHeaderIfPrefixPresent(hdr, updateKey) }},
22343+
} {
22344+
t.Run(test.title, func(t *testing.T) {
22345+
hdr := genHeader(nil, "Nats-Batch-Id", "uuid")
22346+
hdr = genHeader(hdr, updateKey, "long_value")
22347+
hdr = genHeader(hdr, "Nats-Batch-Sequence", "seq")
22348+
22349+
var idx *jsHdrIndex
22350+
hdr, idx = indexJsHdr(hdr)
22351+
defer idx.returnToPool()
22352+
require_NotNil(t, idx)
22353+
require_Equal(t, string(idx.batchId), "uuid")
22354+
require_Equal(t, string(idx.batchSeq), "seq")
22355+
22356+
test.updateHdr(hdr)
22357+
require_Equal(t, string(idx.batchId), "uuid")
22358+
require_Equal(t, string(idx.batchSeq), "seq")
22359+
})
22360+
}
22361+
}

0 commit comments

Comments
 (0)