Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ var (
ExtraScrapeMetrics: boolPtr(false),
MetricNameValidationScheme: model.UTF8Validation,
MetricNameEscapingScheme: model.AllowUTF8,
// Default to 1 MiB to avoid crashes from the 16 MiB encoding limit.
LabelNameLengthLimit: 1 << 20,
LabelValueLengthLimit: 1 << 21,
}

DefaultRuntimeConfig = RuntimeConfig{
Expand Down Expand Up @@ -698,6 +701,9 @@ func (c *GlobalConfig) UnmarshalYAML(unmarshal func(any) error) error {
return fmt.Errorf("%w for global config", err)
}
}
if gc.LabelNameLengthLimit == 0 {
gc.LabelNameLengthLimit = DefaultGlobalConfig.LabelNameLengthLimit
}

*c = *gc
return nil
Expand Down
6 changes: 6 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2843,6 +2843,8 @@ func TestGetScrapeConfigs(t *testing.T) {
AlwaysScrapeClassicHistograms: boolPtr(opts.AlwaysScrapeClassicHistograms),
ConvertClassicHistogramsToNHCB: boolPtr(opts.ConvertClassicHistToNHCB),
ExtraScrapeMetrics: boolPtr(opts.ExtraScrapeMetrics),
LabelNameLengthLimit: DefaultGlobalConfig.LabelNameLengthLimit,
LabelValueLengthLimit: DefaultGlobalConfig.LabelValueLengthLimit,
}
if opts.ScrapeProtocols == nil {
sc.ScrapeProtocols = DefaultScrapeProtocols
Expand Down Expand Up @@ -2927,6 +2929,8 @@ func TestGetScrapeConfigs(t *testing.T) {
AlwaysScrapeClassicHistograms: boolPtr(false),
ConvertClassicHistogramsToNHCB: boolPtr(false),
ExtraScrapeMetrics: boolPtr(false),
LabelNameLengthLimit: DefaultGlobalConfig.LabelValueLengthLimit,
LabelValueLengthLimit: DefaultGlobalConfig.LabelValueLengthLimit,

MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme,
Expand Down Expand Up @@ -2966,6 +2970,8 @@ func TestGetScrapeConfigs(t *testing.T) {
AlwaysScrapeClassicHistograms: boolPtr(false),
ConvertClassicHistogramsToNHCB: boolPtr(false),
ExtraScrapeMetrics: boolPtr(false),
LabelNameLengthLimit: DefaultGlobalConfig.LabelNameLengthLimit,
LabelValueLengthLimit: DefaultGlobalConfig.LabelValueLengthLimit,

HTTPClientConfig: config.HTTPClientConfig{
TLSConfig: config.TLSConfig{
Expand Down
6 changes: 6 additions & 0 deletions promql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,9 @@ func (ev *evaluator) evalLabelReplace(ctx context.Context, args parser.Expressio
indexes := regex.FindStringSubmatchIndex(srcVal)
if indexes != nil { // Only replace when regexp matches.
res := regex.ExpandString([]byte{}, repl, srcVal, indexes)
if len(res) > 1<<24 {
ev.errorf("label_replace: replacement value too long (%d bytes)", len(res))
}
lb.Reset(el.Metric)
lb.Set(dst, string(res))
matrix[i].Metric = lb.Labels()
Expand Down Expand Up @@ -2051,6 +2054,9 @@ func (ev *evaluator) evalLabelJoin(ctx context.Context, args parser.Expressions)
srcVals[i] = el.Metric.Get(src)
}
strval := strings.Join(srcVals, sep)
if len(strval) > 1<<24 {
ev.errorf("label_join: joined value too long (%d bytes)", len(strval))
}
lb.Reset(el.Metric)
lb.Set(dst, strval)
matrix[i].Metric = lb.Labels()
Expand Down
6 changes: 6 additions & 0 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,12 @@ loop:
lset = ce.lset
hash = ce.hash
} else {
// The label encoding format cannot represent strings longer than
// 2^24-1 bytes. Reject before constructing Labels to avoid a panic.
if len(met) > 1<<24 {
err = fmt.Errorf("metric string too long to encode (%d bytes)", len(met))
break loop
}
p.Labels(&lset)
hash = lset.Hash()

Expand Down
33 changes: 32 additions & 1 deletion storage/remote/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type writeHandler struct {
ingestSTZeroSample bool
enableTypeAndUnitLabels bool
appendMetadata bool

labelNameLengthLimit int
labelValueLengthLimit int
}

const maxAheadTime = 10 * time.Minute
Expand All @@ -57,7 +60,7 @@ const maxAheadTime = 10 * time.Minute
//
// NOTE(bwplotka): When accepting v2 proto and spec, partial writes are possible
// as per https://prometheus.io/docs/specs/remote_write_spec_2_0/#partial-write.
func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedMsgs remoteapi.MessageTypes, ingestSTZeroSample, enableTypeAndUnitLabels, appendMetadata bool) http.Handler {
func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, acceptedMsgs remoteapi.MessageTypes, ingestSTZeroSample, enableTypeAndUnitLabels, appendMetadata bool, labelNameLengthLimit, labelValueLengthLimit int) http.Handler {
h := &writeHandler{
logger: logger,
appendable: appendable,
Expand All @@ -77,6 +80,9 @@ func NewWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable
ingestSTZeroSample: ingestSTZeroSample,
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
appendMetadata: appendMetadata,

labelNameLengthLimit: labelNameLengthLimit,
labelValueLengthLimit: labelValueLengthLimit,
}
return remoteapi.NewWriteHandler(h, acceptedMsgs, remoteapi.WithWriteHandlerLogger(logger))
}
Expand Down Expand Up @@ -146,6 +152,23 @@ func (h *writeHandler) Store(r *http.Request, msgType remoteapi.WriteMessageType
return wr, nil
}

// checkLabelLengths returns an error if any label name or value in lset exceeds
// the configured limits. A limit of 0 means no limit.
func (h *writeHandler) checkLabelLengths(lset labels.Labels) error {
if h.labelNameLengthLimit == 0 && h.labelValueLengthLimit == 0 {
return nil
}
return lset.Validate(func(l labels.Label) error {
if h.labelNameLengthLimit > 0 && len(l.Name) > h.labelNameLengthLimit {
return fmt.Errorf("label name too long (label: %.50s, length: %d, limit: %d)", l.Name, len(l.Name), h.labelNameLengthLimit)
}
if h.labelValueLengthLimit > 0 && len(l.Value) > h.labelValueLengthLimit {
return fmt.Errorf("label value too long (label: %.50s, length: %d, limit: %d)", l.Name, len(l.Value), h.labelValueLengthLimit)
}
return nil
})
}

func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
outOfOrderExemplarErrs := 0
samplesWithInvalidLabels := 0
Expand Down Expand Up @@ -181,6 +204,10 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
h.logger.Warn("Invalid labels for series.", "labels", ls.String(), "duplicated_label", duplicateLabel)
samplesWithInvalidLabels++
continue
} else if err := h.checkLabelLengths(ls); err != nil {
h.logger.Warn("Label length limit exceeded", "err", err)
samplesWithInvalidLabels++
continue
}

if err := h.appendV1Samples(app, ts.Samples, ls); err != nil {
Expand Down Expand Up @@ -345,6 +372,10 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
badRequestErrs = append(badRequestErrs, fmt.Errorf("invalid labels for series, labels %v, duplicated label %s", ls.String(), duplicateLabel))
samplesWithInvalidLabels += len(ts.Samples) + len(ts.Histograms)
continue
} else if err := h.checkLabelLengths(ls); err != nil {
badRequestErrs = append(badRequestErrs, fmt.Errorf("label length limit exceeded for series %v: %w", ls.String(), err))
samplesWithInvalidLabels += len(ts.Samples) + len(ts.Histograms)
continue
}

// Validate that the TimeSeries has at least one sample or histogram.
Expand Down
32 changes: 17 additions & 15 deletions storage/remote/write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestRemoteWriteHandlerHeadersHandling_V1Message(t *testing.T) {
}

appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
}

appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) {
}

appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestRemoteWriteHandler_V1Message(t *testing.T) {
// in Prometheus, so keeping like this to not break existing 1.0 clients.

appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
appendExemplarErr: tc.appendExemplarErr,
updateMetadataErr: tc.updateMetadataErr,
}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, tc.ingestSTZeroSample, tc.enableTypeAndUnitLabels, tc.appendMetadata)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, tc.ingestSTZeroSample, tc.enableTypeAndUnitLabels, tc.appendMetadata, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -880,7 +880,7 @@ func TestRemoteWriteHandler_V2Message_NoDuplicateTypeAndUnitLabels(t *testing.T)
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)

appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, true, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, true, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -929,7 +929,7 @@ func TestOutOfOrderSample_V1Message(t *testing.T) {
require.NoError(t, err)

appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -971,7 +971,7 @@ func TestOutOfOrderExemplar_V1Message(t *testing.T) {
require.NoError(t, err)

appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func TestOutOfOrderHistogram_V1Message(t *testing.T) {
require.NoError(t, err)

appendable := &mockAppendable{latestSample: map[uint64]int64{labels.FromStrings("__name__", "test_metric").Hash(): 100}}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func BenchmarkRemoteWriteHandler(b *testing.B) {
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{tc.protoFormat}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{tc.protoFormat}, false, false, false, 0, 0)
b.ResetTimer()
for b.Loop() {
b.StopTimer()
Expand All @@ -1084,7 +1084,7 @@ func TestCommitErr_V1Message(t *testing.T) {
require.NoError(t, err)

appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func TestHistogramValidationErrorHandling(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })

handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{protoMsg}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{protoMsg}, false, false, false, 0, 0)
recorder := httptest.NewRecorder()

var buf []byte
Expand Down Expand Up @@ -1195,7 +1195,7 @@ func TestCommitErr_V2Message(t *testing.T) {
req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion20HeaderValue)

appendable := &mockAppendable{commitErr: errors.New("commit error")}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{remoteapi.WriteV2MessageType}, false, false, false, 0, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand All @@ -1222,7 +1222,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close())
})
// TODO: test with other proto format(s)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, db.Head(), []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType}, false, false, false, 0, 0)

buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil, "snappy")
require.NoError(b, err)
Expand Down Expand Up @@ -1554,7 +1554,7 @@ func TestHistogramsReduction(t *testing.T) {
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(string(protoMsg), func(t *testing.T) {
appendable := &mockAppendable{}
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{protoMsg}, false, false, false)
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []remoteapi.WriteMessageType{protoMsg}, false, false, false, 0, 0)

var (
err error
Expand Down Expand Up @@ -1650,6 +1650,8 @@ func TestRemoteWriteHandler_ResponseStats(t *testing.T) {
false,
false,
false,
0,
0,
)

if tt.forceInjectHeaders {
Expand Down
3 changes: 2 additions & 1 deletion web/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ func NewAPI(
}

if rwEnabled {
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, stZeroIngestionEnabled, enableTypeAndUnitLabels, appendMetadata)
cfg := configFunc()
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, stZeroIngestionEnabled, enableTypeAndUnitLabels, appendMetadata, int(cfg.GlobalConfig.LabelNameLengthLimit), int(cfg.GlobalConfig.LabelValueLengthLimit))
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, apV2, configFunc, remote.OTLPOptions{
Expand Down
Loading