Skip to content

feat: add __unsymbolized__ label on ingest path #4147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/experiment/block/metadata/metadata_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
LabelNameTenantDataset = "__tenant_dataset__"
LabelValueDatasetTSDBIndex = "dataset_tsdb_index"
LabelNameUnsymbolized = "__unsymbolized__"
)

type LabelBuilder struct {
Expand Down
23 changes: 19 additions & 4 deletions pkg/experiment/ingester/memdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

type FlushedHead struct {
Index []byte
Profiles []byte
Symbols []byte
Meta struct {
Index []byte
Profiles []byte
Symbols []byte
HasUnsymbolizedProfiles bool
Meta struct {
ProfileTypeNames []string
MinTimeNanos int64
MaxTimeNanos int64
Expand Down Expand Up @@ -153,6 +154,8 @@ func (h *Head) flush(ctx context.Context) (*FlushedHead, error) {
return res, nil
}

res.HasUnsymbolizedProfiles = HasUnsymbolizedProfiles(h.symbols.Symbols())

symbolsBuffer := bytes.NewBuffer(nil)
if err := symdb.WritePartition(h.symbols, symbolsBuffer); err != nil {
return nil, err
Expand All @@ -173,3 +176,15 @@ func (h *Head) flush(ctx context.Context) (*FlushedHead, error) {
}
return res, nil
}

// TODO: move into the symbolizer package when available
func HasUnsymbolizedProfiles(symbols *symdb.Symbols) bool {
locations := symbols.Locations
mappings := symbols.Mappings
for _, loc := range locations {
if !mappings[loc.MappingId].HasFunctions {
Copy link
Preview

Copilot AI May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a nil check or verifying that mappings[loc.MappingId] exists before accessing its HasFunctions field to avoid potential nil pointer dereferences.

Suggested change
if !mappings[loc.MappingId].HasFunctions {
mapping := mappings[loc.MappingId]
if mapping == nil {
continue
}
if !mapping.HasFunctions {

Copilot uses AI. Check for mistakes.

return true
}
}
return false
}
83 changes: 83 additions & 0 deletions pkg/experiment/ingester/memdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/pyroscope/pkg/og/convert/pprof/bench"
"github.com/grafana/pyroscope/pkg/phlaredb"
testutil2 "github.com/grafana/pyroscope/pkg/phlaredb/block/testutil"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/pprof/testhelper"
)
Expand Down Expand Up @@ -672,6 +673,88 @@ func Test_HeadFlush_DuplicateLabels(t *testing.T) {
&typesv1.LabelPair{Name: "pod", Value: "not-my-pod"},
)
}

// TODO: move into the symbolizer package when available
func TestUnsymbolized(t *testing.T) {
testCases := []struct {
name string
profile *profilev1.Profile
expectUnsymbolized bool
}{
{
name: "fully symbolized profile",
profile: &profilev1.Profile{
StringTable: []string{"", "a"},
Function: []*profilev1.Function{
{Id: 4, Name: 1},
},
Mapping: []*profilev1.Mapping{
{Id: 239, HasFunctions: true},
},
Location: []*profilev1.Location{
{Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}},
},
Sample: []*profilev1.Sample{
{LocationId: []uint64{5}, Value: []int64{1}},
},
},
expectUnsymbolized: false,
},
{
name: "mapping without functions",
profile: &profilev1.Profile{
StringTable: []string{"", "a"},
Function: []*profilev1.Function{
{Id: 4, Name: 1},
},
Mapping: []*profilev1.Mapping{
{Id: 239, HasFunctions: false},
},
Location: []*profilev1.Location{
{Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}},
},
Sample: []*profilev1.Sample{
{LocationId: []uint64{5}, Value: []int64{1}},
},
},
expectUnsymbolized: true,
},
{
name: "multiple locations with mixed symbolization",
profile: &profilev1.Profile{
StringTable: []string{"", "a", "b"},
Function: []*profilev1.Function{
{Id: 4, Name: 1},
{Id: 5, Name: 2},
},
Mapping: []*profilev1.Mapping{
{Id: 239, HasFunctions: true},
{Id: 240, HasFunctions: false},
},
Location: []*profilev1.Location{
{Id: 5, MappingId: 239, Line: []*profilev1.Line{{FunctionId: 4, Line: 1}}},
{Id: 6, MappingId: 240, Line: nil},
},
Sample: []*profilev1.Sample{
{LocationId: []uint64{5, 6}, Value: []int64{1}},
},
},
expectUnsymbolized: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
symbols := symdb.NewPartitionWriter(0, &symdb.Config{
Version: symdb.FormatV3,
})
symbols.WriteProfileSymbols(tc.profile)
unsymbolized := HasUnsymbolizedProfiles(symbols.Symbols())
assert.Equal(t, tc.expectUnsymbolized, unsymbolized)
})
}
}

func BenchmarkHeadIngestProfiles(t *testing.B) {
var (
profilePaths = []string{
Expand Down
78 changes: 42 additions & 36 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (sh *shard) flushSegment(ctx context.Context) {
if s.debuginfo.movedHeads > 0 {
_ = level.Debug(s.logger).Log("msg",
"writing segment block done",
"heads-count", len(s.heads),
"heads-count", len(s.datasets),
"heads-moved-count", s.debuginfo.movedHeads,
"inflight-duration", s.debuginfo.waitInflight,
"flush-heads-duration", s.debuginfo.flushHeadsDuration,
Expand Down Expand Up @@ -195,7 +195,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
s := &segment{
logger: log.With(sl, "segment-id", id.String()),
ulid: id,
heads: make(map[datasetKey]dataset),
datasets: make(map[datasetKey]*dataset),
sw: sw,
sh: sh,
shard: sk,
Expand All @@ -208,7 +208,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
func (s *segment) flush(ctx context.Context) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "segment.flush", opentracing.Tags{
"block_id": s.ulid.String(),
"datasets": len(s.heads),
"datasets": len(s.datasets),
"shard": s.shard,
})
defer span.Finish()
Expand Down Expand Up @@ -332,6 +332,10 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
lb.WithLabelSet(model.LabelNameServiceName, f.head.key.service, model.LabelNameProfileType, profileType)
}

if f.flushed.HasUnsymbolizedProfiles {
lb.WithLabelSet(model.LabelNameServiceName, f.head.key.service, metadata.LabelNameUnsymbolized, "true")
}

// Other optional labels:
// lb.WithLabelSet("label_name", "label_value", ...)
ds.Labels = lb.Build()
Expand All @@ -340,8 +344,8 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
}

func (s *segment) flushHeads(ctx context.Context) flushStream {
heads := maps.Values(s.heads)
slices.SortFunc(heads, func(a, b dataset) int {
heads := maps.Values(s.datasets)
slices.SortFunc(heads, func(a, b *dataset) int {
return a.key.compare(b.key)
})

Expand All @@ -356,15 +360,15 @@ func (s *segment) flushHeads(ctx context.Context) flushStream {
defer close(f.done)
flushed, err := s.flushHead(ctx, f.head)
if err != nil {
level.Error(s.logger).Log("msg", "failed to flush head", "err", err)
level.Error(s.logger).Log("msg", "failed to flush dataset", "err", err)
return
}
if flushed == nil {
level.Debug(s.logger).Log("msg", "skipping nil head")
level.Debug(s.logger).Log("msg", "skipping nil dataset")
return
}
if flushed.Meta.NumSamples == 0 {
level.Debug(s.logger).Log("msg", "skipping empty head")
level.Debug(s.logger).Log("msg", "skipping empty dataset")
return
}
f.flushed = flushed
Expand Down Expand Up @@ -395,24 +399,24 @@ func (s *flushStream) Next() bool {
return false
}

func (s *segment) flushHead(ctx context.Context, e dataset) (*memdb.FlushedHead, error) {
func (s *segment) flushHead(ctx context.Context, e *dataset) (*memdb.FlushedHead, error) {
th := time.Now()
flushed, err := e.head.Flush(ctx)
if err != nil {
s.sw.metrics.flushServiceHeadDuration.WithLabelValues(s.sshard, e.key.tenant).Observe(time.Since(th).Seconds())
s.sw.metrics.flushServiceHeadError.WithLabelValues(s.sshard, e.key.tenant).Inc()
return nil, fmt.Errorf("failed to flush head : %w", err)
return nil, fmt.Errorf("failed to flush dataset : %w", err)
}
s.sw.metrics.flushServiceHeadDuration.WithLabelValues(s.sshard, e.key.tenant).Observe(time.Since(th).Seconds())
level.Debug(s.logger).Log(
"msg", "flushed head",
"msg", "flushed dataset",
"tenant", e.key.tenant,
"service", e.key.service,
"profiles", flushed.Meta.NumProfiles,
"profiletypes", fmt.Sprintf("%v", flushed.Meta.ProfileTypeNames),
"mintime", flushed.Meta.MinTimeNanos,
"maxtime", flushed.Meta.MaxTimeNanos,
"head-flush-duration", time.Since(th).String(),
"dataset-flush-duration", time.Since(th).String(),
)
return flushed, nil
}
Expand All @@ -435,7 +439,7 @@ type dataset struct {
}

type headFlush struct {
head dataset
head *dataset
flushed *memdb.FlushedHead
// protects head
done chan struct{}
Expand All @@ -446,10 +450,12 @@ type segment struct {
shard shardKey
sshard string
inFlightProfiles sync.WaitGroup
heads map[datasetKey]dataset
headsLock sync.RWMutex
logger log.Logger
sw *segmentsWriter

mu sync.RWMutex
datasets map[datasetKey]*dataset

logger log.Logger
sw *segmentsWriter

// TODO(kolesnikovae): Revisit.
doneChan chan struct{}
Expand Down Expand Up @@ -493,11 +499,12 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
tenant: tenantID,
service: model.Labels(labels).Get(model.LabelNameServiceName),
}
ds := s.datasetForIngest(k)
size := p.SizeVT()
rules := s.sw.limits.IngestionRelabelingRules(tenantID)
usage := s.sw.limits.DistributorUsageGroups(tenantID).GetUsageGroups(tenantID, labels)
appender := &sampleAppender{
head: s.headForIngest(k),
dataset: ds,
profile: p,
id: id,
annotations: annotations,
Expand All @@ -511,7 +518,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la

type sampleAppender struct {
id uuid.UUID
head *memdb.Head
dataset *dataset
profile *profilev1.Profile
exporter *pprofmodel.SampleExporter
annotations []*typesv1.ProfileAnnotation
Expand All @@ -521,7 +528,7 @@ type sampleAppender struct {
}

func (v *sampleAppender) VisitProfile(labels []*typesv1.LabelPair) {
v.head.Ingest(v.profile, v.id, labels, v.annotations)
v.dataset.head.Ingest(v.profile, v.id, labels, v.annotations)
}

func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) {
Expand All @@ -530,37 +537,36 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples
}
var n profilev1.Profile
v.exporter.ExportSamples(&n, samples)
v.head.Ingest(&n, v.id, labels, v.annotations)
v.dataset.head.Ingest(v.profile, v.id, labels, v.annotations)
}

func (v *sampleAppender) Discarded(profiles, bytes int) {
v.discardedProfiles += profiles
v.discardedBytes += bytes
}

func (s *segment) headForIngest(k datasetKey) *memdb.Head {
s.headsLock.RLock()
h, ok := s.heads[k]
s.headsLock.RUnlock()
func (s *segment) datasetForIngest(k datasetKey) *dataset {
s.mu.RLock()
ds, ok := s.datasets[k]
s.mu.RUnlock()
if ok {
return h.head
return ds
}

s.headsLock.Lock()
defer s.headsLock.Unlock()
h, ok = s.heads[k]
if ok {
return h.head
s.mu.Lock()
defer s.mu.Unlock()
if ds, ok = s.datasets[k]; ok {
return ds
}

nh := memdb.NewHead(s.sw.headMetrics)

s.heads[k] = dataset{
h := memdb.NewHead(s.sw.headMetrics)
ds = &dataset{
key: k,
head: nh,
head: h,
}

return nh
s.datasets[k] = ds
return ds
}

func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, meta *metastorev1.BlockMeta, s *segment) error {
Expand Down