diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c6540511e4a..853e359aa32 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -23503,3 +23503,99 @@ func TestInterestStreamWithFilterSubjectsConsumer(t *testing.T) { t.Fatalf("expected 2 messages got %d", nfo.State.Msgs) } } + +func TestJetStreamHeadersToKeep(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _ = js + acc := s.GlobalAccount() + if _, err := acc.addStream(&StreamConfig{ + Name: "test", + Subjects: []string{"test.*"}, + HeadersToKeep: []string{ + "bbb", + "ddd", + }, + // The that both options are mutually exclusive. + HeadersToRemove: []string{ + "bbb", + "ddd", + }, + }); err != nil { + t.Fatalf("Failed to add stream: %v", err) + } + + // Now add a message with a header. + pubAck, err := js.PublishMsg(&nats.Msg{ + Subject: "test.foo", + Header: nats.Header{ + JSMsgId: []string{"1234"}, + "aaa": []string{"111"}, + "bbb": []string{"222"}, + "ccc": []string{"333"}, + "ddd": []string{"444"}, + }, + }) + require_NoError(t, err) + + // Now check that the header was removed. + rawMsg, err := js.GetMsg("test", pubAck.Sequence) + require_NoError(t, err) + + require_Equal(t, rawMsg.Header.Get(JSMsgId), "1234") + require_Equal(t, rawMsg.Header.Get("aaa"), "") + require_Equal(t, rawMsg.Header.Get("bbb"), "222") + require_Equal(t, rawMsg.Header.Get("ccc"), "") + require_Equal(t, rawMsg.Header.Get("ddd"), "444") +} + +func TestJetStreamHeadersToRemove(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + // Client for API requests. + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _ = js + acc := s.GlobalAccount() + if _, err := acc.addStream(&StreamConfig{ + Name: "test", + Subjects: []string{"test.*"}, + HeadersToRemove: []string{ + "aaa", + "ccc", + JSMsgId, + }, + }); err != nil { + t.Fatalf("Failed to add stream: %v", err) + } + + // Now add a message with a header. + pubAck, err := js.PublishMsg(&nats.Msg{ + Subject: "test.foo", + Header: nats.Header{ + JSMsgId: []string{"1234"}, + "aaa": []string{"111"}, + "bbb": []string{"222"}, + "ccc": []string{"333"}, + "ddd": []string{"444"}, + }, + }) + require_NoError(t, err) + + // Now check that the header was removed. + rawMsg, err := js.GetMsg("test", pubAck.Sequence) + require_NoError(t, err) + + require_Equal(t, rawMsg.Header.Get(JSMsgId), "1234") + require_Equal(t, rawMsg.Header.Get("aaa"), "") + require_Equal(t, rawMsg.Header.Get("bbb"), "222") + require_Equal(t, rawMsg.Header.Get("ccc"), "") + require_Equal(t, rawMsg.Header.Get("ddd"), "444") +} diff --git a/server/stream.go b/server/stream.go index 0f7aff269fe..73746e6fe0e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -75,6 +75,14 @@ type StreamConfig struct { // Allow KV like semantics to also discard new on a per subject basis DiscardNewPer bool `json:"discard_new_per_subject,omitempty"` + // Specifies a list of headers that should be kept in each message. + // If empty, all headers will be kept. + // This property is mutually exclusive with HeadersToRemove. + HeadersToKeep []string `json:"headers_to_keep,omitempty"` + + // Specifies a list of headers that should be removed from each message. + HeadersToRemove []string `json:"headers_to_remove,omitempty"` + // Optional qualifiers. These can not be modified after set to true. // Sealed will seal a stream so no messages can get out or in. @@ -279,6 +287,12 @@ type stream struct { // to know if trace event should be sent after processing. mt map[uint64]*msgTrace + // headers to keep in each message + headersToKeep map[string]struct{} + + // headers to remove from each message + headersToRemove map[string]struct{} + // For non limits policy streams when they process an ack before the actual msg. // Can happen in stretch clusters, multi-cloud, or during catchup for a restarted server. preAcks map[uint64]map[*consumer]struct{} @@ -558,28 +572,49 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return nil, fmt.Errorf("no applicable tier found") } + // Create a hashmap for headers to keep or removed. + var headersToKeep, headersToRemove map[string]struct{} + if len(cfg.HeadersToKeep) > 0 { + headersToKeep = make(map[string]struct{}, len(cfg.HeadersToKeep)+1) + headersToKeep[JSMsgId] = struct{}{} // Always keep the message ID. + for _, h := range cfg.HeadersToKeep { + headersToKeep[h] = struct{}{} + } + } else if len(cfg.HeadersToRemove) > 0 { + headersToRemove = make(map[string]struct{}, len(cfg.HeadersToRemove)) + for _, h := range cfg.HeadersToRemove { + if h == JSMsgId { + continue + } + + headersToRemove[h] = struct{}{} + } + } + // Setup the internal clients. c := s.createInternalJetStreamClient() ic := s.createInternalJetStreamClient() qpfx := fmt.Sprintf("[ACC:%s] stream '%s' ", a.Name, config.Name) mset := &stream{ - acc: a, - jsa: jsa, - cfg: cfg, - js: js, - srv: s, - client: c, - sysc: ic, - tier: tier, - stype: cfg.Storage, - consumers: make(map[string]*consumer), - msgs: newIPQueue[*inMsg](s, qpfx+"messages"), - gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), - qch: make(chan struct{}), - mqch: make(chan struct{}), - uch: make(chan struct{}, 4), - sch: make(chan struct{}, 1), + acc: a, + jsa: jsa, + cfg: cfg, + js: js, + srv: s, + client: c, + sysc: ic, + tier: tier, + stype: cfg.Storage, + consumers: make(map[string]*consumer), + msgs: newIPQueue[*inMsg](s, qpfx+"messages"), + gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), + qch: make(chan struct{}), + mqch: make(chan struct{}), + uch: make(chan struct{}, 4), + sch: make(chan struct{}, 1), + headersToKeep: headersToKeep, + headersToRemove: headersToRemove, } // Start our signaling routine to process consumers. @@ -3402,6 +3437,48 @@ func streamAndSeq(shdr string) (string, string, uint64) { } +func keepHeaders(hdr []byte, headers map[string]struct{}) []byte { + return processHeaders(hdr, headers, true) +} + +func removeHeaders(hdr []byte, headers map[string]struct{}) []byte { + return processHeaders(hdr, headers, false) +} + +func processHeaders(hdr []byte, headers map[string]struct{}, keep bool) []byte { + var index int + for { + if index >= len(hdr) { + return hdr + } + + // Find the end of the line + end := bytes.Index(hdr[index:], []byte(_CRLF_)) + if end < 0 { + return hdr + } + end += index + + // Find the end of the key + endKey := bytes.Index(hdr[index:end], []byte(":")) + if endKey < 0 { + index = end + len(_CRLF_) + continue + } + endKey += index + + if _, ok := headers[string(hdr[index:endKey])]; ok != keep { + hdr = append(hdr[:index], hdr[end+len(_CRLF_):]...) + + if len(hdr) <= len(emptyHdrLine) { + return nil + } + } else { + index = end + len(_CRLF_) + } + } +} + // Lock should be held. func (mset *stream) setStartingSequenceForSources(iNames map[string]struct{}) { var state StreamState @@ -4662,6 +4739,13 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } + // Remove any headers that are not needed for storage. + if mset.headersToKeep != nil { + hdr = keepHeaders(hdr, mset.headersToKeep) + } else if mset.headersToRemove != nil { + hdr = removeHeaders(hdr, mset.headersToRemove) + } + // Response Ack. var ( response []byte