@@ -29,6 +29,7 @@ import (
29
29
"gvisor.dev/gvisor/pkg/bitmap"
30
30
"gvisor.dev/gvisor/pkg/errors/linuxerr"
31
31
"gvisor.dev/gvisor/pkg/fd"
32
+ "gvisor.dev/gvisor/pkg/gohacks"
32
33
"gvisor.dev/gvisor/pkg/goid"
33
34
"gvisor.dev/gvisor/pkg/hostarch"
34
35
"gvisor.dev/gvisor/pkg/log"
@@ -362,6 +363,9 @@ func (f *MemoryFile) LoadFrom(ctx context.Context, r io.Reader, opts *LoadOpts)
362
363
)
363
364
if opts .PagesFile != nil {
364
365
aplg = & aplGoroutine {
366
+ apl : aplShared {
367
+ timeStartWaiters : math .MaxInt64 ,
368
+ },
365
369
f : f ,
366
370
q : aio .NewGoQueue (aplQueueCapacity ),
367
371
doneCallback : opts .OnAsyncPageLoadDone ,
@@ -451,8 +455,8 @@ func (f *MemoryFile) LoadFrom(ctx context.Context, r io.Reader, opts *LoadOpts)
451
455
return nil
452
456
}
453
457
454
- // aplShared holds asynchronous page loading state that is shared with
455
- // users of the MemoryFile .
458
+ // aplShared holds asynchronous page loading state that is shared with other
459
+ // goroutines .
456
460
type aplShared struct {
457
461
// minUnloaded is the MemoryFile offset of the first unloaded byte.
458
462
minUnloaded atomicbitops.Uint64
@@ -471,6 +475,29 @@ type aplShared struct {
471
475
// priority contains possibly-unstarted ranges in unloaded with at least
472
476
// one waiter.
473
477
priority ringdeque.Deque [memmap.FileRange ]
478
+
479
+ // numWaiters is the current number of waiting waiters.
480
+ numWaiters int
481
+
482
+ // totalWaiters is the number of waiters that have ever waited for pages
483
+ // from this MemoryFile.
484
+ totalWaiters int
485
+
486
+ // timeStartWaiters was the value of gohacks.Nanotime() when numWaiters
487
+ // most recently transitioned from 0 to 1. If numWaiters is 0,
488
+ // timeStartWaiters is MaxInt64.
489
+ timeStartWaiters int64
490
+
491
+ // nsWaitedOne is the duration for which at least one waiter was waiting
492
+ // for a load. nsWaitedTotal is the duration for which waiters were waiting
493
+ // for loads, summed across all waiters. bytesWaited is the number of bytes
494
+ // for which at least one waiter waited.
495
+ durWaitedOne time.Duration
496
+ durWaitedTotal time.Duration
497
+ bytesWaited uint64
498
+
499
+ // bytesLoaded is the number of bytes that have been loaded so far.
500
+ bytesLoaded uint64
474
501
}
475
502
476
503
// aplUnloadedInfo is the value type of aplShared.unloaded.
@@ -488,11 +515,17 @@ type aplUnloadedInfo struct {
488
515
489
516
type aplWaiter struct {
490
517
// wakeup is used by a caller of MemoryFile.awaitLoad() to block until all
491
- // pages in fr are loaded.
518
+ // pages in fr are loaded. wakeup is internally synchronized. fr is
519
+ // immutable after initialization.
492
520
wakeup syncevent.Waiter
493
521
fr memmap.FileRange
494
522
523
+ // timeStart was the value of gohacks.Nanotime() when this waiter started
524
+ // waiting. timeStart is immutable after initialization.
525
+ timeStart int64
526
+
495
527
// pending is the number of unloaded bytes that this waiter is waiting for.
528
+ // pending is protected by aplShared.mu.
496
529
pending uint64
497
530
}
498
531
@@ -552,27 +585,36 @@ func (apl *aplShared) awaitLoad(f *MemoryFile, fr memmap.FileRange) error {
552
585
apl .unloaded .MutateRange (fr , func (ulseg aplUnloadedIterator ) bool {
553
586
ul := ulseg .ValuePtr ()
554
587
ulFR := ulseg .Range ()
588
+ ullen := ulFR .Length ()
555
589
if len (ul .waiters ) == 0 && ! ul .started {
556
590
apl .priority .PushBack (ulFR )
591
+ apl .bytesWaited += ullen
557
592
if logAwaitedLoads {
558
593
log .Infof ("MemoryFile(%p): prioritize %v" , f , ulFR )
559
594
}
560
595
}
561
596
ul .waiters = append (ul .waiters , w )
562
- w .pending += ulFR . Length ()
597
+ w .pending += ullen
563
598
return true
564
599
})
565
600
pending := w .pending != 0
601
+ if pending {
602
+ w .timeStart = gohacks .Nanotime ()
603
+ if apl .numWaiters == 0 {
604
+ apl .timeStartWaiters = w .timeStart
605
+ }
606
+ apl .numWaiters ++
607
+ apl .totalWaiters ++
608
+ }
566
609
apl .mu .Unlock ()
567
610
if pending {
568
- var startWaitTime time.Time
569
611
if logAwaitedLoads {
570
- startWaitTime = time .Now ()
571
612
log .Infof ("MemoryFile(%p): awaitLoad goid %d start: %v (%d bytes)" , f , goid .Get (), fr , fr .Length ())
572
613
}
573
614
w .wakeup .WaitAndAckAll ()
574
615
if logAwaitedLoads {
575
- log .Infof ("MemoryFile(%p): awaitLoad goid %d waited %v: %v (%d bytes)" , f , goid .Get (), time .Since (startWaitTime ), fr , fr .Length ())
616
+ waitNS := gohacks .Nanotime () - w .timeStart
617
+ log .Infof ("MemoryFile(%p): awaitLoad goid %d waited %v: %v (%d bytes)" , f , goid .Get (), time .Duration (waitNS ), fr , fr .Length ())
576
618
}
577
619
}
578
620
return apl .err
@@ -854,9 +896,47 @@ func (g *aplGoroutine) main() {
854
896
}
855
897
defer dropDelayedDecRefs ()
856
898
857
- timeStart := time .Now ()
858
- loadedBytes := uint64 (0 )
859
- log .Debugf ("MemoryFile(%p): async page loading started" , f )
899
+ timeStart := gohacks .Nanotime ()
900
+ if log .IsLogging (log .Debug ) {
901
+ log .Debugf ("MemoryFile(%p): async page loading started" , f )
902
+ progressTicker := time .NewTicker (5 * time .Second )
903
+ progressStopC := make (chan struct {})
904
+ defer func () { close (progressStopC ) }()
905
+ go func () {
906
+ timeLast := timeStart
907
+ bytesLoadedLast := uint64 (0 )
908
+ for {
909
+ select {
910
+ case <- progressStopC :
911
+ progressTicker .Stop ()
912
+ return
913
+ case <- progressTicker .C :
914
+ // Take a snapshot of our progress.
915
+ apl .mu .Lock ()
916
+ totalWaiters := apl .totalWaiters
917
+ timeStartWaiters := apl .timeStartWaiters
918
+ durWaitedOne := apl .durWaitedOne
919
+ durWaitedTotal := apl .durWaitedTotal
920
+ bytesWaited := apl .bytesWaited
921
+ bytesLoaded := apl .bytesLoaded
922
+ apl .mu .Unlock ()
923
+ now := gohacks .Nanotime ()
924
+ durTotal := time .Duration (now - timeStart )
925
+ // apl can have at least one waiter for a very long time
926
+ // due to new waiters enqueueing before old ones are
927
+ // served; avoid apparent jumps in durWaitedOne.
928
+ if timeStartWaiters < now {
929
+ durWaitedOne += time .Duration (now - timeStartWaiters )
930
+ }
931
+ durDelta := time .Duration (now - timeLast )
932
+ bytesLoadedDelta := bytesLoaded - bytesLoadedLast
933
+ log .Infof ("MemoryFile(%p): async page loading in progress for %s (%d bytes, %.3f MB/s); since last update %s ago: %d bytes, %.3f MB/s; %d waiters waited %v~%v for %d bytes" , f , durTotal .Round (time .Millisecond ), bytesLoaded , float64 (bytesLoaded )* 1e-6 / durTotal .Seconds (), durDelta .Round (time .Millisecond ), bytesLoadedDelta , float64 (bytesLoadedDelta )* 1e-6 / durDelta .Seconds (), totalWaiters , durWaitedOne .Round (time .Millisecond ), durWaitedTotal .Round (time .Millisecond ), bytesWaited )
934
+ timeLast = now
935
+ bytesLoadedLast = bytesLoaded
936
+ }
937
+ }
938
+ }()
939
+ }
860
940
for {
861
941
// Enqueue as many reads as possible.
862
942
if ! g .canEnqueue () {
@@ -956,8 +1036,11 @@ func (g *aplGoroutine) main() {
956
1036
// so async page loading has completed successfully.
957
1037
apl .minUnloaded .Store (math .MaxUint64 )
958
1038
f .asyncPageLoad .Store (nil )
959
- dur := time .Since (timeStart )
960
- log .Infof ("MemoryFile(%p): async page loading completed in %s (%d bytes, %f bytes/second)" , f , dur , loadedBytes , float64 (loadedBytes )/ dur .Seconds ())
1039
+ timeFinish := gohacks .Nanotime ()
1040
+ durTotal := time .Duration (timeFinish - timeStart )
1041
+ apl .mu .Lock ()
1042
+ log .Infof ("MemoryFile(%p): async page loading completed in %s (%d bytes, %.3f MB/s); %d waiters waited %v~%v for %d bytes" , f , durTotal .Round (time .Millisecond ), apl .bytesLoaded , float64 (apl .bytesLoaded )* 1e-6 / durTotal .Seconds (), apl .totalWaiters , apl .durWaitedOne .Round (time .Millisecond ), apl .durWaitedTotal .Round (time .Millisecond ), apl .bytesWaited )
1043
+ apl .mu .Unlock ()
961
1044
return
962
1045
}
963
1046
panic (fmt .Sprintf ("unknown events in lfStatus: %#x" , ev ))
@@ -1001,6 +1084,7 @@ func (g *aplGoroutine) main() {
1001
1084
return
1002
1085
}
1003
1086
haveWaiters := false
1087
+ now := int64 (0 )
1004
1088
for _ , fr := range op .frs () {
1005
1089
// All pages in fr have been started and were split around fr
1006
1090
// when they were started (above), and apl.unloaded never
@@ -1009,7 +1093,7 @@ func (g *aplGoroutine) main() {
1009
1093
for ulseg := apl .unloaded .FindSegment (fr .Start ); ulseg .Ok () && ulseg .Start () < fr .End ; ulseg = apl .unloaded .Remove (ulseg ).NextSegment () {
1010
1094
ul := ulseg .ValuePtr ()
1011
1095
ullen := ulseg .Range ().Length ()
1012
- loadedBytes += ullen
1096
+ apl . bytesLoaded += ullen
1013
1097
if ! ul .started {
1014
1098
panic (fmt .Sprintf ("completion of %v includes pages %v that were never started" , fr , ulseg .Range ()))
1015
1099
}
@@ -1018,6 +1102,18 @@ func (g *aplGoroutine) main() {
1018
1102
w .pending -= ullen
1019
1103
if w .pending == 0 {
1020
1104
wakeups = append (wakeups , w )
1105
+ if now == 0 {
1106
+ now = gohacks .Nanotime ()
1107
+ }
1108
+ // This definition of "wait time" skips the time
1109
+ // taken for w to wake up (bad), but avoids having
1110
+ // to lock apl.mu again in apl.awaitLoad() (good).
1111
+ apl .durWaitedTotal += time .Duration (now - w .timeStart )
1112
+ apl .numWaiters --
1113
+ if apl .numWaiters == 0 {
1114
+ apl .durWaitedOne += time .Duration (now - apl .timeStartWaiters )
1115
+ apl .timeStartWaiters = math .MaxInt64
1116
+ }
1021
1117
}
1022
1118
}
1023
1119
}
0 commit comments