Skip to content

Commit 12e9407

Browse files
committed
infrabox parallel log
1 parent a2b552a commit 12e9407

1 file changed

Lines changed: 37 additions & 10 deletions

File tree

src/services/gcp/pkg/stub/handler.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package stub
22

33
import (
44
"bytes"
5+
"context"
56
"crypto/tls"
67
"crypto/x509"
78
b64 "encoding/base64"
89
"encoding/json"
910
"fmt"
11+
"golang.org/x/sync/semaphore"
1012
"io/ioutil"
1113
"mime/multipart"
1214
"net/http"
@@ -15,6 +17,7 @@ import (
1517
"path"
1618
"strconv"
1719
"strings"
20+
"sync"
1821
"time"
1922

2023
uuid "github.com/satori/go.uuid"
@@ -1111,6 +1114,13 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E
11111114
log.Infof("Collecting data from GKE cluster %s", cluster.Name)
11121115
defer close(done)
11131116

1117+
parallelLogPulls := 1
1118+
if n, err := strconv.Atoi(os.Getenv("INFRABOX_PARALLEL_LOG_PULL")); err == nil {
1119+
if n > 0 && n < 10 {
1120+
parallelLogPulls = n
1121+
}
1122+
}
1123+
11141124
annotations := cr.GetAnnotations()
11151125
_, ok := annotations["infrabox.net/root-url"]
11161126
if !ok {
@@ -1144,23 +1154,40 @@ func retrieveLogs(cr *v1alpha1.GKECluster, cluster *RemoteCluster, log *logrus.E
11441154
return
11451155
}
11461156

1157+
// Do log collection in parallel, up to parallelLogPulls concurrent goroutines.
1158+
wg := sync.WaitGroup{}
1159+
sem := semaphore.NewWeighted(int64(parallelLogPulls))
11471160
for _, pod := range pods {
1161+
pod := pod // necessary before Go1.22 I think that changed this behavior.
11481162
for _, container := range pod.Containers {
1149-
log.Debug("Collecting logs for pod: ", pod.PodID)
1150-
data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container)
1163+
container := container
1164+
err := sem.Acquire(context.Background(), 1)
11511165
if err != nil {
1152-
log.Warningf("Failed to get collected pod logs: %v", err)
1153-
continue
1166+
log.Errorf("Failed to get collected pod list, cannot acquire semaphore: %v", err)
1167+
return
11541168
}
11551169

1156-
filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt"
1157-
filename = path.Join(logPath, filename)
1158-
if err := ioutil.WriteFile(filename, *data, os.ModePerm); err != nil {
1159-
log.Debugf("Failed to write pod logs: %v", err)
1160-
continue
1161-
}
1170+
wg.Add(1)
1171+
go func() {
1172+
defer sem.Release(1)
1173+
defer wg.Done()
1174+
1175+
log.Debug("Collecting logs for pod: ", pod.PodID)
1176+
data, err := doCollectorRequest(cluster, log, "/api/pods/"+pod.PodID+"/log/"+container)
1177+
if err != nil {
1178+
log.Warningf("Failed to get collected pod logs: %v", err)
1179+
return
1180+
}
1181+
filename := "pod_" + pod.Namespace + "_" + pod.Pod + "_" + container + ".txt"
1182+
filename = path.Join(logPath, filename)
1183+
if err := ioutil.WriteFile(filename, *data, os.ModePerm); err != nil {
1184+
log.Debugf("Failed to write pod logs: %v", err)
1185+
return
1186+
}
1187+
}()
11621188
}
11631189
}
1190+
wg.Wait()
11641191

11651192
archivePath := path.Join(logPath, "pods_log.zip")
11661193
err = archiver.Archive([]string{logPath}, archivePath)

0 commit comments

Comments
 (0)