Skip to content

Commit e702d1a

Browse files
committed
fix(sink): don't override dynamic properties
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 53e3836 commit e702d1a

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

internal/io/mqtt/sink.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
1+
// Copyright 2021-2025 EMQ Technologies Co., Ltd.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -109,12 +109,16 @@ func (ms *Sink) Collect(ctx api.StreamContext, item api.RawTuple) error {
109109
if transformed {
110110
tpc = temp
111111
}
112+
newProps := make(map[string]string, len(props))
112113
for k, v := range props {
113114
nv, ok := dp.DynamicProps(v)
114115
if ok {
115-
props[k] = nv
116+
newProps[k] = nv
117+
} else {
118+
newProps[k] = v
116119
}
117120
}
121+
props = newProps
118122
}
119123
traced, _, span := tracenode.TraceInput(ctx, item, fmt.Sprintf("%s_emit", ctx.GetOpId()))
120124
if traced {

0 commit comments

Comments
 (0)