diff --git a/pkg/experiment/ingester/memdb/head.go b/pkg/experiment/ingester/memdb/head.go index d26f20bc81..5aee10db6a 100644 --- a/pkg/experiment/ingester/memdb/head.go +++ b/pkg/experiment/ingester/memdb/head.go @@ -7,6 +7,8 @@ import ( "math" "sync" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -18,6 +20,7 @@ import ( "github.com/grafana/pyroscope/pkg/phlaredb/labels" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" "github.com/grafana/pyroscope/pkg/phlaredb/symdb" + "github.com/grafana/pyroscope/pkg/validation" ) type FlushedHead struct { @@ -43,11 +46,15 @@ type Head struct { totalSamples *atomic.Uint64 profiles *profilesIndex metrics *HeadMetrics + logger log.Logger + limits validation.LabelValidationLimits } -func NewHead(metrics *HeadMetrics) *Head { +func NewHead(metrics *HeadMetrics, logger log.Logger, limits validation.LabelValidationLimits) *Head { h := &Head{ metrics: metrics, + logger: logger, + limits: limits, symbols: symdb.NewPartitionWriter(0, &symdb.Config{ Version: symdb.FormatV3, }), @@ -60,11 +67,17 @@ func NewHead(metrics *HeadMetrics) *Head { return h } -func (h *Head) Ingest(p *profilev1.Profile, id uuid.UUID, externalLabels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) { +func (h *Head) Ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, externalLabels []*typesv1.LabelPair, annotations []*typesv1.ProfileAnnotation) { if len(p.Sample) == 0 { return } + if err := validation.ValidateLabels(h.limits, tenantID, externalLabels); err != nil { + level.Warn(h.logger).Log("msg", "labels validation failed", "err", err) + // TODO aleks-p: propagate the error upward + return + } + // Delta not supported. externalLabels = phlaremodel.Labels(externalLabels).Delete(phlaremodel.LabelNameDelta) // Label order is enforced to ensure that __profile_type__ and __service_name__ always diff --git a/pkg/experiment/ingester/memdb/head_test.go b/pkg/experiment/ingester/memdb/head_test.go index a05a835fb8..9e89f8b5bc 100644 --- a/pkg/experiment/ingester/memdb/head_test.go +++ b/pkg/experiment/ingester/memdb/head_test.go @@ -14,6 +14,7 @@ import ( "github.com/google/uuid" "github.com/parquet-go/parquet-go" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -30,14 +31,25 @@ import ( "github.com/grafana/pyroscope/pkg/phlaredb/symdb" "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/pprof/testhelper" + "github.com/grafana/pyroscope/pkg/test" + "github.com/grafana/pyroscope/pkg/validation" ) var defaultAnnotations []*typesv1.ProfileAnnotation +var ( + baseLabels = []*typesv1.LabelPair{ + {Name: "namespace", Value: "phlare"}, + {Name: "__name__", Value: "process_cpu"}, + } +) + func TestHeadLabelValues(t *testing.T) { - head := newTestHead() - head.Ingest(newProfileFoo(), uuid.New(), []*typesv1.LabelPair{{Name: "job", Value: "foo"}, {Name: "namespace", Value: "phlare"}}, defaultAnnotations) - head.Ingest(newProfileBar(), uuid.New(), []*typesv1.LabelPair{{Name: "job", Value: "bar"}, {Name: "namespace", Value: "phlare"}}, defaultAnnotations) + head := newTestHead(t) + fooLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "foo").Set("service_name", "svc_foo").Labels() + barLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "bar").Set("service_name", "svc_bar").Labels() + head.Ingest("", newProfileFoo(), uuid.New(), fooLabels, defaultAnnotations) + head.Ingest("", newProfileBar(), uuid.New(), barLabels, defaultAnnotations) q := flushTestHead(t, head) @@ -51,32 +63,38 @@ func TestHeadLabelValues(t *testing.T) { } func TestHeadLabelNames(t *testing.T) { - head := newTestHead() - head.Ingest(newProfileFoo(), uuid.New(), []*typesv1.LabelPair{{Name: "job", Value: "foo"}, {Name: "namespace", Value: "phlare"}}, defaultAnnotations) - head.Ingest(newProfileBar(), uuid.New(), []*typesv1.LabelPair{{Name: "job", Value: "bar"}, {Name: "namespace", Value: "phlare"}}, defaultAnnotations) + head := newTestHead(t) + fooLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "foo").Set("service_name", "svc_foo").Labels() + barLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "bar").Set("service_name", "svc_bar").Labels() + head.Ingest("", newProfileFoo(), uuid.New(), fooLabels, defaultAnnotations) + head.Ingest("", newProfileBar(), uuid.New(), barLabels, defaultAnnotations) q := flushTestHead(t, head) res, err := q.LabelNames(context.Background(), connect.NewRequest(&typesv1.LabelNamesRequest{})) require.NoError(t, err) - require.Equal(t, []string{"__period_type__", "__period_unit__", "__profile_type__", "__type__", "__unit__", "job", "namespace"}, res.Msg.Names) + require.Equal(t, []string{"__name__", "__period_type__", "__period_unit__", "__profile_type__", "__service_name__", "__type__", "__unit__", "job", "namespace", "service_name"}, res.Msg.Names) } func TestHeadSeries(t *testing.T) { - head := newTestHead() - fooLabels := phlaremodel.NewLabelsBuilder(nil).Set("namespace", "phlare").Set("job", "foo").Labels() - barLabels := phlaremodel.NewLabelsBuilder(nil).Set("namespace", "phlare").Set("job", "bar").Labels() - head.Ingest(newProfileFoo(), uuid.New(), fooLabels, defaultAnnotations) - head.Ingest(newProfileBar(), uuid.New(), barLabels, defaultAnnotations) + head := newTestHead(t) + fooLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "foo").Set("service_name", "svc_foo").Labels() + barLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "bar").Set("service_name", "svc_bar").Labels() + head.Ingest("", newProfileFoo(), uuid.New(), fooLabels, defaultAnnotations) + head.Ingest("", newProfileBar(), uuid.New(), barLabels, defaultAnnotations) lblBuilder := phlaremodel.NewLabelsBuilder(nil). Set("namespace", "phlare"). Set("job", "foo"). + Set("service_name", "svc_foo"). + Set("__name__", "process_cpu"). Set("__period_type__", "type"). Set("__period_unit__", "unit"). Set("__type__", "type"). Set("__unit__", "unit"). - Set("__profile_type__", ":type:unit:type:unit") + Set("__profile_type__", "process_cpu:type:unit:type:unit"). + Set("__service_name__", "svc_foo") + expected := lblBuilder.Labels() q := flushTestHead(t, head) @@ -98,9 +116,11 @@ func TestHeadSeries(t *testing.T) { } func TestHeadProfileTypes(t *testing.T) { - head := newTestHead() - head.Ingest(newProfileFoo(), uuid.New(), []*typesv1.LabelPair{{Name: "__name__", Value: "foo"}, {Name: "job", Value: "foo"}, {Name: "namespace", Value: "phlare"}}, defaultAnnotations) - head.Ingest(newProfileBar(), uuid.New(), []*typesv1.LabelPair{{Name: "__name__", Value: "bar"}, {Name: "namespace", Value: "phlare"}}, defaultAnnotations) + head := newTestHead(t) + fooLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "foo").Set("service_name", "svc_foo").Set("__name__", "foo").Labels() + barLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "bar").Set("service_name", "svc_bar").Set("__name__", "bar").Labels() + head.Ingest("", newProfileFoo(), uuid.New(), fooLabels, defaultAnnotations) + head.Ingest("", newProfileBar(), uuid.New(), barLabels, defaultAnnotations) q := flushTestHead(t, head) @@ -114,22 +134,20 @@ func TestHeadProfileTypes(t *testing.T) { func TestHead_SelectMatchingProfiles_Order(t *testing.T) { const n = 15 - head := NewHead(NewHeadMetricsWithPrefix(nil, "")) + head := NewHead(NewHeadMetricsWithPrefix(nil, ""), test.NewTestingLogger(t), validation.MockDefaultOverrides()) + fooLabels := phlaremodel.NewLabelsBuilder(baseLabels).Set("job", "foo").Set("service_name", "svc_foo").Set("__name__", "foo") now := time.Now() for i := 0; i < n; i++ { x := newProfileFoo() // Make sure some of our profiles have matching timestamps. x.TimeNanos = now.Add(time.Second * time.Duration(i-i%2)).UnixNano() - head.Ingest(x, uuid.UUID{}, []*typesv1.LabelPair{ - {Name: "job", Value: "foo"}, - {Name: "x", Value: strconv.Itoa(i)}, - }, defaultAnnotations) + head.Ingest("", x, uuid.UUID{}, fooLabels.Set("x", strconv.Itoa(i)).Labels(), defaultAnnotations) } q := flushTestHead(t, head) - typ, err := phlaremodel.ParseProfileTypeSelector(":type:unit:type:unit") + typ, err := phlaremodel.ParseProfileTypeSelector("foo:type:unit:type:unit") require.NoError(t, err) req := &ingestv1.SelectProfilesRequest{ LabelSelector: "{}", @@ -172,11 +190,12 @@ func TestHeadFlushQuery(t *testing.T) { td.profile = p } - head := newTestHead() + head := newTestHead(t) ctx := context.Background() for pos := range testdata { - head.Ingest(testdata[pos].profile.CloneVT(), uuid.New(), []*typesv1.LabelPair{ + head.Ingest("", testdata[pos].profile.CloneVT(), uuid.New(), []*typesv1.LabelPair{ + {Name: labels.MetricName, Value: "foo"}, {Name: phlaremodel.LabelNameServiceName, Value: testdata[pos].svc}, }, defaultAnnotations) } @@ -188,14 +207,14 @@ func TestHeadFlushQuery(t *testing.T) { assert.Equal(t, 11, int(flushed.Meta.NumSeries)) // different value from original phlaredb test because service_name label added assert.Equal(t, 11, int(flushed.Meta.NumProfiles)) assert.Equal(t, []string{ - ":CPU:nanoseconds:CPU:nanoseconds", - ":alloc_objects:count:space:bytes", - ":alloc_space:bytes:space:bytes", - ":cpu:nanoseconds:cpu:nanoseconds", - ":inuse_objects:count:space:bytes", - ":inuse_space:bytes:space:bytes", - ":sample:count:CPU:nanoseconds", - ":samples:count:cpu:nanoseconds", + "foo:CPU:nanoseconds:CPU:nanoseconds", + "foo:alloc_objects:count:space:bytes", + "foo:alloc_space:bytes:space:bytes", + "foo:cpu:nanoseconds:cpu:nanoseconds", + "foo:inuse_objects:count:space:bytes", + "foo:inuse_space:bytes:space:bytes", + "foo:sample:count:CPU:nanoseconds", + "foo:samples:count:cpu:nanoseconds", }, flushed.Meta.ProfileTypeNames) q := createBlockFromFlushedHead(t, flushed) @@ -218,7 +237,7 @@ func TestHeadFlushQuery(t *testing.T) { } func TestHead_Concurrent_Ingest(t *testing.T) { - head := newTestHead() + head := newTestHead(t) wg := sync.WaitGroup{} @@ -251,17 +270,17 @@ func profileWithID(id int) (*profilev1.Profile, uuid.UUID) { } func TestHead_ProfileOrder(t *testing.T) { - head := newTestHead() + head := newTestHead(t) p, u := profileWithID(1) - head.Ingest(p, u, []*typesv1.LabelPair{ + head.Ingest("", p, u, []*typesv1.LabelPair{ {Name: phlaremodel.LabelNameProfileName, Value: "memory"}, {Name: phlaremodel.LabelNameOrder, Value: phlaremodel.LabelOrderEnforced}, {Name: phlaremodel.LabelNameServiceName, Value: "service-a"}, }, defaultAnnotations) p, u = profileWithID(2) - head.Ingest(p, u, []*typesv1.LabelPair{ + head.Ingest("", p, u, []*typesv1.LabelPair{ {Name: phlaremodel.LabelNameProfileName, Value: "memory"}, {Name: phlaremodel.LabelNameOrder, Value: phlaremodel.LabelOrderEnforced}, {Name: phlaremodel.LabelNameServiceName, Value: "service-b"}, @@ -269,7 +288,7 @@ func TestHead_ProfileOrder(t *testing.T) { }, defaultAnnotations) p, u = profileWithID(3) - head.Ingest(p, u, []*typesv1.LabelPair{ + head.Ingest("", p, u, []*typesv1.LabelPair{ {Name: phlaremodel.LabelNameProfileName, Value: "memory"}, {Name: phlaremodel.LabelNameOrder, Value: phlaremodel.LabelOrderEnforced}, {Name: phlaremodel.LabelNameServiceName, Value: "service-c"}, @@ -277,22 +296,22 @@ func TestHead_ProfileOrder(t *testing.T) { }, defaultAnnotations) p, u = profileWithID(4) - head.Ingest(p, u, []*typesv1.LabelPair{ + head.Ingest("", p, u, []*typesv1.LabelPair{ {Name: phlaremodel.LabelNameProfileName, Value: "cpu"}, {Name: phlaremodel.LabelNameOrder, Value: phlaremodel.LabelOrderEnforced}, {Name: phlaremodel.LabelNameServiceName, Value: "service-a"}, - {Name: "000Label", Value: "important"}, + {Name: "a_label", Value: "important"}, }, defaultAnnotations) p, u = profileWithID(5) - head.Ingest(p, u, []*typesv1.LabelPair{ + head.Ingest("", p, u, []*typesv1.LabelPair{ {Name: phlaremodel.LabelNameProfileName, Value: "cpu"}, {Name: phlaremodel.LabelNameOrder, Value: phlaremodel.LabelOrderEnforced}, {Name: phlaremodel.LabelNameServiceName, Value: "service-b"}, }, defaultAnnotations) p, u = profileWithID(6) - head.Ingest(p, u, []*typesv1.LabelPair{ + head.Ingest("", p, u, []*typesv1.LabelPair{ {Name: phlaremodel.LabelNameProfileName, Value: "cpu"}, {Name: phlaremodel.LabelNameOrder, Value: phlaremodel.LabelOrderEnforced}, {Name: phlaremodel.LabelNameServiceName, Value: "service-b"}, @@ -311,7 +330,7 @@ func TestHead_ProfileOrder(t *testing.T) { } func TestFlushEmptyHead(t *testing.T) { - head := newTestHead() + head := newTestHead(t) flushed, err := head.Flush(context.Background()) require.NoError(t, err) require.NotNil(t, flushed) @@ -328,12 +347,13 @@ func TestMergeProfilesStacktraces(t *testing.T) { step = 15 * time.Second ) - db := newTestHead() + db := newTestHead(t) ingestProfiles(t, db, cpuProfileGenerator, start.UnixNano(), end.UnixNano(), step, defaultAnnotations, &typesv1.LabelPair{Name: "namespace", Value: "my-namespace"}, &typesv1.LabelPair{Name: "pod", Value: "my-pod"}, + &typesv1.LabelPair{Name: "service_name", Value: "my-namespace/my-pod"}, ) q := flushTestHead(t, db) @@ -453,7 +473,7 @@ func TestMergeProfilesLabels(t *testing.T) { step = 15 * time.Second ) - db := newTestHead() + db := newTestHead(t) ingestProfiles(t, db, cpuProfileGenerator, start.UnixNano(), end.UnixNano(), step, []*typesv1.ProfileAnnotation{ @@ -461,6 +481,7 @@ func TestMergeProfilesLabels(t *testing.T) { }, &typesv1.LabelPair{Name: "namespace", Value: "my-namespace"}, &typesv1.LabelPair{Name: "pod", Value: "my-pod"}, + &typesv1.LabelPair{Name: "service_name", Value: "my-namespace/my-pod"}, ) q := flushTestHead(t, db) @@ -520,12 +541,13 @@ func TestMergeProfilesPprof(t *testing.T) { step = 15 * time.Second ) - db := NewHead(NewHeadMetricsWithPrefix(nil, "")) + db := NewHead(NewHeadMetricsWithPrefix(nil, ""), test.NewTestingLogger(t), validation.MockDefaultOverrides()) ingestProfiles(t, db, cpuProfileGenerator, start.UnixNano(), end.UnixNano(), step, defaultAnnotations, &typesv1.LabelPair{Name: "namespace", Value: "my-namespace"}, &typesv1.LabelPair{Name: "pod", Value: "my-pod"}, + &typesv1.LabelPair{Name: "service_name", Value: "my-namespace/my-pod"}, ) q := flushTestHead(t, db) @@ -664,7 +686,7 @@ func Test_HeadFlush_DuplicateLabels(t *testing.T) { step = 15 * time.Second ) - head := newTestHead() + head := newTestHead(t) ingestProfiles(t, head, cpuProfileGenerator, start.UnixNano(), end.UnixNano(), step, defaultAnnotations, @@ -764,21 +786,21 @@ func BenchmarkHeadIngestProfiles(t *testing.B) { profileCount = 0 ) - head := newTestHead() + head := newTestHead(t) t.ReportAllocs() for n := 0; n < t.N; n++ { for pos := range profilePaths { p := parseProfile(t, profilePaths[pos]) - head.Ingest(p, uuid.New(), []*typesv1.LabelPair{}, defaultAnnotations) + head.Ingest("", p, uuid.New(), []*typesv1.LabelPair{}, defaultAnnotations) profileCount++ } } } -func newTestHead() *Head { - head := NewHead(NewHeadMetricsWithPrefix(nil, "")) +func newTestHead(t testing.TB) *Head { + head := NewHead(NewHeadMetricsWithPrefix(nil, ""), test.NewTestingLogger(t), validation.MockDefaultOverrides()) return head } @@ -899,12 +921,13 @@ func newProfileBar() *profilev1.Profile { var streams = []string{"stream-a", "stream-b", "stream-c"} -func ingestThreeProfileStreams(i int, ingest func(*profilev1.Profile, uuid.UUID, []*typesv1.LabelPair, []*typesv1.ProfileAnnotation)) { +func ingestThreeProfileStreams(i int, ingest func(string, *profilev1.Profile, uuid.UUID, []*typesv1.LabelPair, []*typesv1.ProfileAnnotation)) { p := testhelper.NewProfileBuilder(time.Second.Nanoseconds() * int64(i)) p.CPUProfile() p.WithLabels( "job", "foo", "stream", streams[i%3], + "service_name", "foo_svc", ) p.UUID = uuid.MustParse(fmt.Sprintf("00000000-0000-0000-0000-%012d", i)) p.ForStacktraceString("func1", "func2").AddSamples(10) @@ -913,17 +936,18 @@ func ingestThreeProfileStreams(i int, ingest func(*profilev1.Profile, uuid.UUID, {Key: "foo", Value: "bar"}, } - ingest(p.Profile, p.UUID, p.Labels, p.Annotations) + ingest("", p.Profile, p.UUID, p.Labels, p.Annotations) } func profileTypeFromProfile(p *profilev1.Profile, stIndex int) *typesv1.ProfileType { t := &typesv1.ProfileType{ + Name: "foo", SampleType: p.StringTable[p.SampleType[stIndex].Type], SampleUnit: p.StringTable[p.SampleType[stIndex].Unit], PeriodType: p.StringTable[p.PeriodType.Type], PeriodUnit: p.StringTable[p.PeriodType.Unit], } - t.ID = fmt.Sprintf(":%s:%s:%s:%s", t.SampleType, t.SampleUnit, t.PeriodType, t.PeriodUnit) + t.ID = fmt.Sprintf("%s:%s:%s:%s:%s", t.Name, t.SampleType, t.SampleUnit, t.PeriodType, t.PeriodUnit) return t } @@ -962,12 +986,7 @@ func ingestProfiles(b testing.TB, db *Head, generator func(tsNano int64, t testi b.Helper() for i := from; i <= to; i += int64(step) { p, name := generator(i, b) - db.Ingest( - p, - uuid.New(), - append(externalLabels, &typesv1.LabelPair{Name: model.MetricNameLabel, Value: name}), - annotations, - ) + db.Ingest("", p, uuid.New(), append(externalLabels, &typesv1.LabelPair{Name: model.MetricNameLabel, Value: name}), annotations) } } diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index ed008c76e6..6608262218 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -512,6 +512,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels) appender := &sampleAppender{ head: s.headForIngest(k), + tenantID: tenantID, profile: p, id: id, annotations: annotations, @@ -524,6 +525,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la } type sampleAppender struct { + tenantID string id uuid.UUID head *memdb.Head profile *profilev1.Profile @@ -535,7 +537,7 @@ type sampleAppender struct { } func (v *sampleAppender) VisitProfile(labels []*typesv1.LabelPair) { - v.head.Ingest(v.profile, v.id, labels, v.annotations) + v.head.Ingest(v.tenantID, v.profile, v.id, labels, v.annotations) } func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) { @@ -544,7 +546,7 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples } var n profilev1.Profile v.exporter.ExportSamples(&n, samples) - v.head.Ingest(&n, v.id, labels, v.annotations) + v.head.Ingest(v.tenantID, &n, v.id, labels, v.annotations) } func (v *sampleAppender) Discarded(profiles, bytes int) { @@ -567,7 +569,7 @@ func (s *segment) headForIngest(k datasetKey) *memdb.Head { return h.head } - nh := memdb.NewHead(s.sw.headMetrics) + nh := memdb.NewHead(s.sw.headMetrics, s.logger, s.sw.limits) s.heads[k] = dataset{ key: k, diff --git a/pkg/experiment/ingester/service.go b/pkg/experiment/ingester/service.go index 0e5db05b44..e29996678c 100644 --- a/pkg/experiment/ingester/service.go +++ b/pkg/experiment/ingester/service.go @@ -84,6 +84,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Limits interface { IngestionRelabelingRules(tenantID string) []*relabel.Config DistributorUsageGroups(tenantID string) *validation.UsageGroupConfig + + validation.LabelValidationLimits } type SegmentWriterService struct { diff --git a/pkg/pprof/testhelper/profile_builder.go b/pkg/pprof/testhelper/profile_builder.go index 2bddc1d0ac..852c88272a 100644 --- a/pkg/pprof/testhelper/profile_builder.go +++ b/pkg/pprof/testhelper/profile_builder.go @@ -208,6 +208,10 @@ func (m *ProfileBuilder) ForStacktraceString(stacktraces ...string) *StacktraceB } } +func (m *ProfileBuilder) AddString(s string) int64 { + return m.addString(s) +} + func (m *ProfileBuilder) addString(s string) int64 { i, ok := m.strings[s] if !ok { diff --git a/pkg/test/integration/ingest_pprof_test.go b/pkg/test/integration/ingest_pprof_test.go index 6db88d7ca3..ac91e7f51f 100644 --- a/pkg/test/integration/ingest_pprof_test.go +++ b/pkg/test/integration/ingest_pprof_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/grafana/pyroscope/pkg/og/convert/pprof/strprofile" + "github.com/grafana/pyroscope/pkg/pprof/testhelper" "connectrpc.com/connect" @@ -319,6 +321,67 @@ func TestIngestPPROFFixPythonLinenumbers(t *testing.T) { }) } +func TestIngestPPROFSanitizeOtelLabels(t *testing.T) { + EachPyroscopeTest(t, func(p *PyroscopeTest, t *testing.T) { + + p1 := testhelper.NewProfileBuilder(time.Now().Add(-time.Second).UnixNano()). + CPUProfile(). + ForStacktraceString("my", "other"). + AddSamples(239) + p1.Sample[0].Label = []*profilev1.Label{ + { + Key: p1.AddString("foo.bar"), + Str: p1.AddString("qwe-asd"), + }, + } + p1bs, err := p1.Profile.MarshalVT() + require.NoError(t, err) + + rb := p.NewRequestBuilder(t) + rb.Push(rb.PushPPROFRequestFromBytes(p1bs, "process_cpu"), 200, "") + + renderedProfile := rb.SelectMergeProfile("process_cpu:cpu:nanoseconds:cpu:nanoseconds", map[string]string{ + "foo_bar": "qwe-asd", + }) + actual, err := strprofile.Stringify(renderedProfile.Msg, strprofile.Options{ + NoTime: true, + NoDuration: true, + }) + require.NoError(t, err) + expected := `{ + "sample_types": [ + { + "type": "cpu", + "unit": "nanoseconds" + } + ], + "samples": [ + { + "locations": [ + { + "address": "0x0", + "lines": [ + "my[]@:0" + ], + "mapping": "0x0-0x0@0x0 ()" + }, + { + "address": "0x0", + "lines": [ + "other[]@:0" + ], + "mapping": "0x0-0x0@0x0 ()" + } + ], + "values": "239" + } + ], + "period": "1000000000" +}` + assert.JSONEq(t, expected, actual) + }) +} + func TestGodeltaprofRelabelPush(t *testing.T) { const blockSize = 1024 const metric = "godeltaprof_memory"