@@ -47,7 +47,6 @@ type Coordinator struct {
4747 testHistory []types.Test
4848 testRegistryMutex sync.RWMutex
4949 testNotificationChan chan bool
50- maxConcurrentTests int
5150}
5251
5352type testDescriptorEntry struct {
@@ -56,7 +55,7 @@ type testDescriptorEntry struct {
5655 index uint64
5756}
5857
59- func NewCoordinator (config * Config , log logrus.FieldLogger , metricsPort , maxConcurrentTests int ) * Coordinator {
58+ func NewCoordinator (config * Config , log logrus.FieldLogger , metricsPort int ) * Coordinator {
6059 return & Coordinator {
6160 log : logger .NewLogger (& logger.ScopeOptions {
6261 Parent : log ,
@@ -70,15 +69,14 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, maxConc
7069 testQueue : []types.Test {},
7170 testHistory : []types.Test {},
7271 testNotificationChan : make (chan bool , 1 ),
73- maxConcurrentTests : maxConcurrentTests ,
7472 }
7573}
7674
7775// Run executes the coordinator until completion.
7876func (c * Coordinator ) Run (ctx context.Context ) error {
7977 defer func () {
8078 if err := recover (); err != nil {
81- logrus .WithError (err .(error )).Errorf ("uncaught panic in coordinator.Run: %v, stack: %v" , err , string (debug .Stack ()))
79+ c . log . GetLogger () .WithError (err .(error )).Errorf ("uncaught panic in coordinator.Run: %v, stack: %v" , err , string (debug .Stack ()))
8280 }
8381 }()
8482
@@ -137,6 +135,9 @@ func (c *Coordinator) Run(ctx context.Context) error {
137135 // start test scheduler
138136 go c .runTestScheduler (ctx )
139137
138+ // start test cleanup routine
139+ go c .runTestCleanup (ctx )
140+
140141 // run tests
141142 c .runTestExecutionLoop (ctx )
142143
@@ -345,7 +346,12 @@ func (c *Coordinator) createTestRun(descriptor types.TestDescriptor, configOverr
345346}
346347
347348func (c * Coordinator ) runTestExecutionLoop (ctx context.Context ) {
348- semaphore := make (chan bool , c .maxConcurrentTests )
349+ concurrencyLimit := c .Config .Coordinator .MaxConcurrentTests
350+ if concurrencyLimit < 1 {
351+ concurrencyLimit = 1
352+ }
353+
354+ semaphore := make (chan bool , concurrencyLimit )
349355
350356 for {
351357 var nextTest types.Test
@@ -393,6 +399,12 @@ func (c *Coordinator) runTest(ctx context.Context, testRef types.Test) {
393399}
394400
395401func (c * Coordinator ) runTestScheduler (ctx context.Context ) {
402+ defer func () {
403+ if err := recover (); err != nil {
404+ c .log .GetLogger ().WithError (err .(error )).Panicf ("uncaught panic in coordinator.runTestScheduler: %v, stack: %v" , err , string (debug .Stack ()))
405+ }
406+ }()
407+
396408 // startup scheduler
397409 for _ , testDescr := range c .getStartupTests () {
398410 _ , err := c .ScheduleTest (testDescr , nil , false )
@@ -480,3 +492,49 @@ func (c *Coordinator) getCronTests(cronTime time.Time) []types.TestDescriptor {
480492
481493 return descriptors
482494}
495+
496+ func (c * Coordinator ) runTestCleanup (ctx context.Context ) {
497+ defer func () {
498+ if err := recover (); err != nil {
499+ c .log .GetLogger ().WithError (err .(error )).Panicf ("uncaught panic in coordinator.runTestCleanup: %v, stack: %v" , err , string (debug .Stack ()))
500+ }
501+ }()
502+
503+ retentionTime := c .Config .Coordinator .TestRetentionTime .Duration
504+ if retentionTime <= 0 {
505+ retentionTime = 14 * 24 * time .Hour
506+ }
507+
508+ cleanupInterval := 1 * time .Hour
509+ if retentionTime <= 4 * time .Hour {
510+ cleanupInterval = 10 * time .Minute
511+ }
512+
513+ for {
514+ select {
515+ case <- ctx .Done ():
516+ return
517+ case <- time .After (cleanupInterval ):
518+ }
519+
520+ c .cleanupTestHistory (retentionTime )
521+ }
522+ }
523+
524+ func (c * Coordinator ) cleanupTestHistory (retentionTime time.Duration ) {
525+ c .testRegistryMutex .Lock ()
526+ defer c .testRegistryMutex .Unlock ()
527+
528+ cleanedHistory := []types.Test {}
529+
530+ for _ , test := range c .testHistory {
531+ if test .Status () != types .TestStatusPending && test .StartTime ().Add (retentionTime ).Compare (time .Now ()) == - 1 {
532+ test .Logger ().Infof ("cleanup test" )
533+ continue
534+ }
535+
536+ cleanedHistory = append (cleanedHistory , test )
537+ }
538+
539+ c .testHistory = cleanedHistory
540+ }
0 commit comments