@@ -116,7 +116,7 @@ func (sh *shard) flushSegment(ctx context.Context, wg *sync.WaitGroup) {
116
116
if s .debuginfo .movedHeads > 0 {
117
117
_ = level .Debug (s .logger ).Log ("msg" ,
118
118
"writing segment block done" ,
119
- "heads-count" , len (s .heads ),
119
+ "heads-count" , len (s .datasets ),
120
120
"heads-moved-count" , s .debuginfo .movedHeads ,
121
121
"inflight-duration" , s .debuginfo .waitInflight ,
122
122
"flush-heads-duration" , s .debuginfo .flushHeadsDuration ,
@@ -203,7 +203,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
203
203
s := & segment {
204
204
logger : log .With (sl , "segment-id" , id .String ()),
205
205
ulid : id ,
206
- heads : make (map [datasetKey ]dataset ),
206
+ datasets : make (map [datasetKey ]* dataset ),
207
207
sw : sw ,
208
208
sh : sh ,
209
209
shard : sk ,
@@ -216,7 +216,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
216
216
func (s * segment ) flush (ctx context.Context ) (err error ) {
217
217
span , ctx := opentracing .StartSpanFromContext (ctx , "segment.flush" , opentracing.Tags {
218
218
"block_id" : s .ulid .String (),
219
- "datasets" : len (s .heads ),
219
+ "datasets" : len (s .datasets ),
220
220
"shard" : s .shard ,
221
221
})
222
222
defer span .Finish ()
@@ -340,6 +340,10 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
340
340
lb .WithLabelSet (model .LabelNameServiceName , f .head .key .service , model .LabelNameProfileType , profileType )
341
341
}
342
342
343
+ if f .flushed .HasUnsymbolizedProfiles {
344
+ lb .WithLabelSet (model .LabelNameServiceName , f .head .key .service , metadata .LabelNameUnsymbolized , "true" )
345
+ }
346
+
343
347
// Other optional labels:
344
348
// lb.WithLabelSet("label_name", "label_value", ...)
345
349
ds .Labels = lb .Build ()
@@ -348,8 +352,8 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
348
352
}
349
353
350
354
func (s * segment ) flushHeads (ctx context.Context ) flushStream {
351
- heads := maps .Values (s .heads )
352
- slices .SortFunc (heads , func (a , b dataset ) int {
355
+ heads := maps .Values (s .datasets )
356
+ slices .SortFunc (heads , func (a , b * dataset ) int {
353
357
return a .key .compare (b .key )
354
358
})
355
359
@@ -364,15 +368,15 @@ func (s *segment) flushHeads(ctx context.Context) flushStream {
364
368
defer close (f .done )
365
369
flushed , err := s .flushHead (ctx , f .head )
366
370
if err != nil {
367
- level .Error (s .logger ).Log ("msg" , "failed to flush head " , "err" , err )
371
+ level .Error (s .logger ).Log ("msg" , "failed to flush dataset " , "err" , err )
368
372
return
369
373
}
370
374
if flushed == nil {
371
- level .Debug (s .logger ).Log ("msg" , "skipping nil head " )
375
+ level .Debug (s .logger ).Log ("msg" , "skipping nil dataset " )
372
376
return
373
377
}
374
378
if flushed .Meta .NumSamples == 0 {
375
- level .Debug (s .logger ).Log ("msg" , "skipping empty head " )
379
+ level .Debug (s .logger ).Log ("msg" , "skipping empty dataset " )
376
380
return
377
381
}
378
382
f .flushed = flushed
@@ -403,24 +407,24 @@ func (s *flushStream) Next() bool {
403
407
return false
404
408
}
405
409
406
- func (s * segment ) flushHead (ctx context.Context , e dataset ) (* memdb.FlushedHead , error ) {
410
+ func (s * segment ) flushHead (ctx context.Context , e * dataset ) (* memdb.FlushedHead , error ) {
407
411
th := time .Now ()
408
412
flushed , err := e .head .Flush (ctx )
409
413
if err != nil {
410
414
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
411
415
s .sw .metrics .flushServiceHeadError .WithLabelValues (s .sshard , e .key .tenant ).Inc ()
412
- return nil , fmt .Errorf ("failed to flush head : %w" , err )
416
+ return nil , fmt .Errorf ("failed to flush dataset : %w" , err )
413
417
}
414
418
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
415
419
level .Debug (s .logger ).Log (
416
- "msg" , "flushed head " ,
420
+ "msg" , "flushed dataset " ,
417
421
"tenant" , e .key .tenant ,
418
422
"service" , e .key .service ,
419
423
"profiles" , flushed .Meta .NumProfiles ,
420
424
"profiletypes" , fmt .Sprintf ("%v" , flushed .Meta .ProfileTypeNames ),
421
425
"mintime" , flushed .Meta .MinTimeNanos ,
422
426
"maxtime" , flushed .Meta .MaxTimeNanos ,
423
- "head -flush-duration" , time .Since (th ).String (),
427
+ "dataset -flush-duration" , time .Since (th ).String (),
424
428
)
425
429
return flushed , nil
426
430
}
@@ -443,7 +447,7 @@ type dataset struct {
443
447
}
444
448
445
449
type headFlush struct {
446
- head dataset
450
+ head * dataset
447
451
flushed * memdb.FlushedHead
448
452
// protects head
449
453
done chan struct {}
@@ -454,10 +458,12 @@ type segment struct {
454
458
shard shardKey
455
459
sshard string
456
460
inFlightProfiles sync.WaitGroup
457
- heads map [datasetKey ]dataset
458
- headsLock sync.RWMutex
459
- logger log.Logger
460
- sw * segmentsWriter
461
+
462
+ mu sync.RWMutex
463
+ datasets map [datasetKey ]* dataset
464
+
465
+ logger log.Logger
466
+ sw * segmentsWriter
461
467
462
468
// TODO(kolesnikovae): Revisit.
463
469
doneChan chan struct {}
@@ -501,11 +507,12 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
501
507
tenant : tenantID ,
502
508
service : model .Labels (labels ).Get (model .LabelNameServiceName ),
503
509
}
510
+ ds := s .datasetForIngest (k )
504
511
size := p .SizeVT ()
505
512
rules := s .sw .limits .IngestionRelabelingRules (tenantID )
506
513
usage := s .sw .limits .DistributorUsageGroups (tenantID ).GetUsageGroups (tenantID , labels )
507
514
appender := & sampleAppender {
508
- head : s . headForIngest ( k ) ,
515
+ dataset : ds ,
509
516
profile : p ,
510
517
id : id ,
511
518
annotations : annotations ,
@@ -519,7 +526,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
519
526
520
527
type sampleAppender struct {
521
528
id uuid.UUID
522
- head * memdb. Head
529
+ dataset * dataset
523
530
profile * profilev1.Profile
524
531
exporter * pprofmodel.SampleExporter
525
532
annotations []* typesv1.ProfileAnnotation
@@ -529,7 +536,7 @@ type sampleAppender struct {
529
536
}
530
537
531
538
func (v * sampleAppender ) VisitProfile (labels []* typesv1.LabelPair ) {
532
- v .head .Ingest (v .profile , v .id , labels , v .annotations )
539
+ v .dataset . head .Ingest (v .profile , v .id , labels , v .annotations )
533
540
}
534
541
535
542
func (v * sampleAppender ) VisitSampleSeries (labels []* typesv1.LabelPair , samples []* profilev1.Sample ) {
@@ -538,37 +545,36 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples
538
545
}
539
546
var n profilev1.Profile
540
547
v .exporter .ExportSamples (& n , samples )
541
- v .head .Ingest (& n , v .id , labels , v .annotations )
548
+ v .dataset . head .Ingest (v . profile , v .id , labels , v .annotations )
542
549
}
543
550
544
551
func (v * sampleAppender ) Discarded (profiles , bytes int ) {
545
552
v .discardedProfiles += profiles
546
553
v .discardedBytes += bytes
547
554
}
548
555
549
- func (s * segment ) headForIngest (k datasetKey ) * memdb. Head {
550
- s .headsLock .RLock ()
551
- h , ok := s .heads [k ]
552
- s .headsLock .RUnlock ()
556
+ func (s * segment ) datasetForIngest (k datasetKey ) * dataset {
557
+ s .mu .RLock ()
558
+ ds , ok := s .datasets [k ]
559
+ s .mu .RUnlock ()
553
560
if ok {
554
- return h . head
561
+ return ds
555
562
}
556
563
557
- s .headsLock .Lock ()
558
- defer s .headsLock .Unlock ()
559
- h , ok = s .heads [k ]
560
- if ok {
561
- return h .head
564
+ s .mu .Lock ()
565
+ defer s .mu .Unlock ()
566
+ if ds , ok = s .datasets [k ]; ok {
567
+ return ds
562
568
}
563
569
564
- nh := memdb .NewHead (s .sw .headMetrics )
565
-
566
- s .heads [k ] = dataset {
570
+ h := memdb .NewHead (s .sw .headMetrics )
571
+ ds = & dataset {
567
572
key : k ,
568
- head : nh ,
573
+ head : h ,
569
574
}
570
575
571
- return nh
576
+ s .datasets [k ] = ds
577
+ return ds
572
578
}
573
579
574
580
func (sw * segmentsWriter ) uploadBlock (ctx context.Context , blockData []byte , meta * metastorev1.BlockMeta , s * segment ) error {
0 commit comments