@@ -108,7 +108,7 @@ func (sh *shard) flushSegment(ctx context.Context, lastTick bool) {
108
108
if s .debuginfo .movedHeads > 0 {
109
109
_ = level .Debug (s .logger ).Log ("msg" ,
110
110
"writing segment block done" ,
111
- "heads-count" , len (s .datasets ),
111
+ "heads-count" , len (s .heads ),
112
112
"heads-moved-count" , s .debuginfo .movedHeads ,
113
113
"inflight-duration" , s .debuginfo .waitInflight ,
114
114
"flush-heads-duration" , s .debuginfo .flushHeadsDuration ,
@@ -200,7 +200,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
200
200
s := & segment {
201
201
logger : log .With (sl , "segment-id" , id .String ()),
202
202
ulid : id ,
203
- datasets : make (map [datasetKey ]* dataset ),
203
+ heads : make (map [datasetKey ]dataset ),
204
204
sw : sw ,
205
205
sh : sh ,
206
206
shard : sk ,
@@ -213,7 +213,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
213
213
func (s * segment ) flush (ctx context.Context ) (err error ) {
214
214
span , ctx := opentracing .StartSpanFromContext (ctx , "segment.flush" , opentracing.Tags {
215
215
"block_id" : s .ulid .String (),
216
- "datasets" : len (s .datasets ),
216
+ "datasets" : len (s .heads ),
217
217
"shard" : s .shard ,
218
218
})
219
219
defer span .Finish ()
@@ -337,10 +337,6 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
337
337
lb .WithLabelSet (model .LabelNameServiceName , f .head .key .service , model .LabelNameProfileType , profileType )
338
338
}
339
339
340
- if f .flushed .HasUnsymbolizedProfiles {
341
- lb .WithLabelSet (model .LabelNameServiceName , f .head .key .service , metadata .LabelNameUnsymbolized , "true" )
342
- }
343
-
344
340
// Other optional labels:
345
341
// lb.WithLabelSet("label_name", "label_value", ...)
346
342
ds .Labels = lb .Build ()
@@ -349,8 +345,8 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
349
345
}
350
346
351
347
func (s * segment ) flushHeads (ctx context.Context ) flushStream {
352
- heads := maps .Values (s .datasets )
353
- slices .SortFunc (heads , func (a , b * dataset ) int {
348
+ heads := maps .Values (s .heads )
349
+ slices .SortFunc (heads , func (a , b dataset ) int {
354
350
return a .key .compare (b .key )
355
351
})
356
352
@@ -365,15 +361,15 @@ func (s *segment) flushHeads(ctx context.Context) flushStream {
365
361
defer close (f .done )
366
362
flushed , err := s .flushHead (ctx , f .head )
367
363
if err != nil {
368
- level .Error (s .logger ).Log ("msg" , "failed to flush dataset " , "err" , err )
364
+ level .Error (s .logger ).Log ("msg" , "failed to flush head " , "err" , err )
369
365
return
370
366
}
371
367
if flushed == nil {
372
- level .Debug (s .logger ).Log ("msg" , "skipping nil dataset " )
368
+ level .Debug (s .logger ).Log ("msg" , "skipping nil head " )
373
369
return
374
370
}
375
371
if flushed .Meta .NumSamples == 0 {
376
- level .Debug (s .logger ).Log ("msg" , "skipping empty dataset " )
372
+ level .Debug (s .logger ).Log ("msg" , "skipping empty head " )
377
373
return
378
374
}
379
375
f .flushed = flushed
@@ -404,24 +400,24 @@ func (s *flushStream) Next() bool {
404
400
return false
405
401
}
406
402
407
- func (s * segment ) flushHead (ctx context.Context , e * dataset ) (* memdb.FlushedHead , error ) {
403
+ func (s * segment ) flushHead (ctx context.Context , e dataset ) (* memdb.FlushedHead , error ) {
408
404
th := time .Now ()
409
405
flushed , err := e .head .Flush (ctx )
410
406
if err != nil {
411
407
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
412
408
s .sw .metrics .flushServiceHeadError .WithLabelValues (s .sshard , e .key .tenant ).Inc ()
413
- return nil , fmt .Errorf ("failed to flush dataset : %w" , err )
409
+ return nil , fmt .Errorf ("failed to flush head : %w" , err )
414
410
}
415
411
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
416
412
level .Debug (s .logger ).Log (
417
- "msg" , "flushed dataset " ,
413
+ "msg" , "flushed head " ,
418
414
"tenant" , e .key .tenant ,
419
415
"service" , e .key .service ,
420
416
"profiles" , flushed .Meta .NumProfiles ,
421
417
"profiletypes" , fmt .Sprintf ("%v" , flushed .Meta .ProfileTypeNames ),
422
418
"mintime" , flushed .Meta .MinTimeNanos ,
423
419
"maxtime" , flushed .Meta .MaxTimeNanos ,
424
- "dataset -flush-duration" , time .Since (th ).String (),
420
+ "head -flush-duration" , time .Since (th ).String (),
425
421
)
426
422
return flushed , nil
427
423
}
@@ -444,7 +440,7 @@ type dataset struct {
444
440
}
445
441
446
442
type headFlush struct {
447
- head * dataset
443
+ head dataset
448
444
flushed * memdb.FlushedHead
449
445
// protects head
450
446
done chan struct {}
@@ -455,12 +451,10 @@ type segment struct {
455
451
shard shardKey
456
452
sshard string
457
453
inFlightProfiles sync.WaitGroup
458
-
459
- mu sync.RWMutex
460
- datasets map [datasetKey ]* dataset
461
-
462
- logger log.Logger
463
- sw * segmentsWriter
454
+ heads map [datasetKey ]dataset
455
+ headsLock sync.RWMutex
456
+ logger log.Logger
457
+ sw * segmentsWriter
464
458
465
459
// TODO(kolesnikovae): Revisit.
466
460
doneChan chan struct {}
@@ -504,12 +498,11 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
504
498
tenant : tenantID ,
505
499
service : model .Labels (labels ).Get (model .LabelNameServiceName ),
506
500
}
507
- ds := s .datasetForIngest (k )
508
501
size := p .SizeVT ()
509
502
rules := s .sw .limits .IngestionRelabelingRules (tenantID )
510
503
usage := s .sw .limits .DistributorUsageGroups (tenantID ).GetUsageGroups (tenantID , labels )
511
504
appender := & sampleAppender {
512
- dataset : ds ,
505
+ head : s . headForIngest ( k ) ,
513
506
profile : p ,
514
507
id : id ,
515
508
annotations : annotations ,
@@ -523,7 +516,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
523
516
524
517
type sampleAppender struct {
525
518
id uuid.UUID
526
- dataset * dataset
519
+ head * memdb. Head
527
520
profile * profilev1.Profile
528
521
exporter * pprofmodel.SampleExporter
529
522
annotations []* typesv1.ProfileAnnotation
@@ -533,7 +526,7 @@ type sampleAppender struct {
533
526
}
534
527
535
528
func (v * sampleAppender ) VisitProfile (labels []* typesv1.LabelPair ) {
536
- v .dataset . head .Ingest (v .profile , v .id , labels , v .annotations )
529
+ v .head .Ingest (v .profile , v .id , labels , v .annotations )
537
530
}
538
531
539
532
func (v * sampleAppender ) VisitSampleSeries (labels []* typesv1.LabelPair , samples []* profilev1.Sample ) {
@@ -542,36 +535,37 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples
542
535
}
543
536
var n profilev1.Profile
544
537
v .exporter .ExportSamples (& n , samples )
545
- v .dataset . head .Ingest (v . profile , v .id , labels , v .annotations )
538
+ v .head .Ingest (& n , v .id , labels , v .annotations )
546
539
}
547
540
548
541
func (v * sampleAppender ) Discarded (profiles , bytes int ) {
549
542
v .discardedProfiles += profiles
550
543
v .discardedBytes += bytes
551
544
}
552
545
553
- func (s * segment ) datasetForIngest (k datasetKey ) * dataset {
554
- s .mu .RLock ()
555
- ds , ok := s .datasets [k ]
556
- s .mu .RUnlock ()
546
+ func (s * segment ) headForIngest (k datasetKey ) * memdb. Head {
547
+ s .headsLock .RLock ()
548
+ h , ok := s .heads [k ]
549
+ s .headsLock .RUnlock ()
557
550
if ok {
558
- return ds
551
+ return h . head
559
552
}
560
553
561
- s .mu .Lock ()
562
- defer s .mu .Unlock ()
563
- if ds , ok = s .datasets [k ]; ok {
564
- return ds
554
+ s .headsLock .Lock ()
555
+ defer s .headsLock .Unlock ()
556
+ h , ok = s .heads [k ]
557
+ if ok {
558
+ return h .head
565
559
}
566
560
567
- h := memdb .NewHead (s .sw .headMetrics )
568
- ds = & dataset {
561
+ nh := memdb .NewHead (s .sw .headMetrics )
562
+
563
+ s .heads [k ] = dataset {
569
564
key : k ,
570
- head : h ,
565
+ head : nh ,
571
566
}
572
567
573
- s .datasets [k ] = ds
574
- return ds
568
+ return nh
575
569
}
576
570
577
571
func (sw * segmentsWriter ) uploadBlock (ctx context.Context , blockData []byte , meta * metastorev1.BlockMeta , s * segment ) error {
0 commit comments