Skip to content

Commit 58cdabf

Browse files
authored
Add configurable concurrent bundle creation routines to fleet apply (#4292)
Issue: #4233 Make the number of concurrent bundle creation routines configurable via: - CLI flag: --bundle-creation-max-concurrency (default: 4) - Environment variable: FLEET_BUNDLE_CREATION_MAX_CONCURRENCY
1 parent 3b9fcb9 commit 58cdabf

File tree

5 files changed

+347
-97
lines changed

5 files changed

+347
-97
lines changed

internal/cmd/cli/apply.go

Lines changed: 45 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -42,30 +42,31 @@ type Apply struct {
4242
FleetClient
4343
BundleInputArgs
4444
OutputArgsNoDefault
45-
Label map[string]string `usage:"Labels to apply to created bundles" short:"l"`
46-
TargetsFile string `usage:"Addition source of targets and restrictions to be append"`
47-
Compress bool `usage:"Force all resources to be compress" short:"c"`
48-
ServiceAccount string `usage:"Service account to assign to bundle created" short:"a"`
49-
SyncGeneration int `usage:"Generation number used to force sync the deployment"`
50-
TargetNamespace string `usage:"Ensure this bundle goes to this target namespace"`
51-
Paused bool `usage:"Create bundles in a paused state"`
52-
Commit string `usage:"Commit to assign to the bundle" env:"COMMIT"`
53-
Username string `usage:"Basic auth username for helm repo" env:"HELM_USERNAME"`
54-
PasswordFile string `usage:"Path of file containing basic auth password for helm repo"`
55-
CACertsFile string `usage:"Path of custom cacerts for helm repo" name:"cacerts-file"`
56-
SSHPrivateKeyFile string `usage:"Path of ssh-private-key for helm repo" name:"ssh-privatekey-file"`
57-
HelmRepoURLRegex string `usage:"Helm credentials will be used if the helm repo matches this regex. Credentials will always be used if this is empty or not provided" name:"helm-repo-url-regex"`
58-
KeepResources bool `usage:"Keep resources created after the GitRepo or Bundle is deleted" name:"keep-resources"`
59-
DeleteNamespace bool `usage:"Delete GitRepo target namespace after the GitRepo or Bundle is deleted" name:"delete-namespace"`
60-
HelmCredentialsByPathFile string `usage:"Path of file containing helm credentials for paths" name:"helm-credentials-by-path-file"`
61-
HelmBasicHTTP bool `usage:"Uses plain HTTP connections when downloading from helm repositories" name:"helm-basic-http"`
62-
HelmInsecureSkipTLS bool `usage:"Skip TLS verification when downloading from helm repositories" name:"helm-insecure-skip-tls"`
63-
CorrectDrift bool `usage:"Rollback any change made from outside of Fleet" name:"correct-drift"`
64-
CorrectDriftForce bool `usage:"Use --force when correcting drift. Resources can be deleted and recreated" name:"correct-drift-force"`
65-
CorrectDriftKeepFailHistory bool `usage:"Keep helm history for failed rollbacks" name:"correct-drift-keep-fail-history"`
66-
OCIRegistrySecret string `usage:"OCI storage registry secret name" name:"oci-registry-secret"`
67-
DrivenScan bool `usage:"Use driven scan. Bundles are defined by the user" name:"driven-scan"`
68-
DrivenScanSeparator string `usage:"Separator to use for bundle folder and options file" name:"driven-scan-sep" default:":"`
45+
Label map[string]string `usage:"Labels to apply to created bundles" short:"l"`
46+
TargetsFile string `usage:"Addition source of targets and restrictions to be append"`
47+
Compress bool `usage:"Force all resources to be compress" short:"c"`
48+
ServiceAccount string `usage:"Service account to assign to bundle created" short:"a"`
49+
SyncGeneration int `usage:"Generation number used to force sync the deployment"`
50+
TargetNamespace string `usage:"Ensure this bundle goes to this target namespace"`
51+
Paused bool `usage:"Create bundles in a paused state"`
52+
Commit string `usage:"Commit to assign to the bundle" env:"COMMIT"`
53+
Username string `usage:"Basic auth username for helm repo" env:"HELM_USERNAME"`
54+
PasswordFile string `usage:"Path of file containing basic auth password for helm repo"`
55+
CACertsFile string `usage:"Path of custom cacerts for helm repo" name:"cacerts-file"`
56+
SSHPrivateKeyFile string `usage:"Path of ssh-private-key for helm repo" name:"ssh-privatekey-file"`
57+
HelmRepoURLRegex string `usage:"Helm credentials will be used if the helm repo matches this regex. Credentials will always be used if this is empty or not provided" name:"helm-repo-url-regex"`
58+
KeepResources bool `usage:"Keep resources created after the GitRepo or Bundle is deleted" name:"keep-resources"`
59+
DeleteNamespace bool `usage:"Delete GitRepo target namespace after the GitRepo or Bundle is deleted" name:"delete-namespace"`
60+
HelmCredentialsByPathFile string `usage:"Path of file containing helm credentials for paths" name:"helm-credentials-by-path-file"`
61+
HelmBasicHTTP bool `usage:"Uses plain HTTP connections when downloading from helm repositories" name:"helm-basic-http"`
62+
HelmInsecureSkipTLS bool `usage:"Skip TLS verification when downloading from helm repositories" name:"helm-insecure-skip-tls"`
63+
CorrectDrift bool `usage:"Rollback any change made from outside of Fleet" name:"correct-drift"`
64+
CorrectDriftForce bool `usage:"Use --force when correcting drift. Resources can be deleted and recreated" name:"correct-drift-force"`
65+
CorrectDriftKeepFailHistory bool `usage:"Keep helm history for failed rollbacks" name:"correct-drift-keep-fail-history"`
66+
OCIRegistrySecret string `usage:"OCI storage registry secret name" name:"oci-registry-secret"`
67+
DrivenScan bool `usage:"Use driven scan. Bundles are defined by the user" name:"driven-scan"`
68+
DrivenScanSeparator string `usage:"Separator to use for bundle folder and options file" name:"driven-scan-sep" default:":"`
69+
BundleCreationMaxConcurrency int `usage:"Maximum number of concurrent bundle creation routines" name:"bundle-creation-max-concurrency" default:"4" env:"FLEET_BUNDLE_CREATION_MAX_CONCURRENCY"`
6970
}
7071

7172
func (r *Apply) PersistentPre(_ *cobra.Command, _ []string) error {
@@ -108,25 +109,26 @@ func (a *Apply) run(cmd *cobra.Command, args []string) error {
108109

109110
name := ""
110111
opts := apply.Options{
111-
Namespace: a.Namespace,
112-
BundleFile: a.BundleFile,
113-
Output: writer.NewDefaultNone(a.Output),
114-
Compress: a.Compress,
115-
ServiceAccount: a.ServiceAccount,
116-
Labels: a.Label,
117-
TargetsFile: a.TargetsFile,
118-
TargetNamespace: a.TargetNamespace,
119-
Paused: a.Paused,
120-
SyncGeneration: int64(a.SyncGeneration),
121-
HelmRepoURLRegex: a.HelmRepoURLRegex,
122-
KeepResources: a.KeepResources,
123-
DeleteNamespace: a.DeleteNamespace,
124-
CorrectDrift: a.CorrectDrift,
125-
CorrectDriftForce: a.CorrectDriftForce,
126-
CorrectDriftKeepFailHistory: a.CorrectDriftKeepFailHistory,
127-
DrivenScan: a.DrivenScan,
128-
DrivenScanSeparator: a.DrivenScanSeparator,
129-
OCIRegistrySecret: a.OCIRegistrySecret,
112+
Namespace: a.Namespace,
113+
BundleFile: a.BundleFile,
114+
Output: writer.NewDefaultNone(a.Output),
115+
Compress: a.Compress,
116+
ServiceAccount: a.ServiceAccount,
117+
Labels: a.Label,
118+
TargetsFile: a.TargetsFile,
119+
TargetNamespace: a.TargetNamespace,
120+
Paused: a.Paused,
121+
SyncGeneration: int64(a.SyncGeneration),
122+
HelmRepoURLRegex: a.HelmRepoURLRegex,
123+
KeepResources: a.KeepResources,
124+
DeleteNamespace: a.DeleteNamespace,
125+
CorrectDrift: a.CorrectDrift,
126+
CorrectDriftForce: a.CorrectDriftForce,
127+
CorrectDriftKeepFailHistory: a.CorrectDriftKeepFailHistory,
128+
DrivenScan: a.DrivenScan,
129+
DrivenScanSeparator: a.DrivenScanSeparator,
130+
OCIRegistrySecret: a.OCIRegistrySecret,
131+
BundleCreationMaxConcurrency: a.BundleCreationMaxConcurrency,
130132
}
131133

132134
knownHostsPath, err := writeTmpKnownHosts()

internal/cmd/cli/apply/apply.go

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ var (
4444
)
4545

4646
const (
47-
JSONOutputEnvVar = "FLEET_JSON_OUTPUT"
48-
JobNameEnvVar = "JOB_NAME"
49-
FleetApplyConflictRetriesEnv = "FLEET_APPLY_CONFLICT_RETRIES"
50-
defaultApplyConflictRetries = 1
47+
JSONOutputEnvVar = "FLEET_JSON_OUTPUT"
48+
JobNameEnvVar = "JOB_NAME"
49+
FleetApplyConflictRetriesEnv = "FLEET_APPLY_CONFLICT_RETRIES"
50+
BundleCreationMaxConcurrencyEnv = "FLEET_BUNDLE_CREATION_MAX_CONCURRENCY"
51+
defaultApplyConflictRetries = 1
52+
defaultBundleCreationMaxConcurrency = 4
5153
)
5254

5355
type Getter interface {
@@ -64,30 +66,31 @@ type OCIRegistrySpec struct {
6466
}
6567

6668
type Options struct {
67-
Namespace string
68-
BundleFile string
69-
TargetsFile string
70-
Compress bool
71-
BundleReader io.Reader
72-
Output io.Writer
73-
ServiceAccount string
74-
TargetNamespace string
75-
Paused bool
76-
Labels map[string]string
77-
SyncGeneration int64
78-
Auth bundlereader.Auth
79-
HelmRepoURLRegex string
80-
KeepResources bool
81-
DeleteNamespace bool
82-
AuthByPath map[string]bundlereader.Auth
83-
CorrectDrift bool
84-
CorrectDriftForce bool
85-
CorrectDriftKeepFailHistory bool
86-
OCIRegistry OCIRegistrySpec
87-
OCIRegistrySecret string
88-
DrivenScan bool
89-
DrivenScanSeparator string
90-
JobNameEnvVar string
69+
Namespace string
70+
BundleFile string
71+
TargetsFile string
72+
Compress bool
73+
BundleReader io.Reader
74+
Output io.Writer
75+
ServiceAccount string
76+
TargetNamespace string
77+
Paused bool
78+
Labels map[string]string
79+
SyncGeneration int64
80+
Auth bundlereader.Auth
81+
HelmRepoURLRegex string
82+
KeepResources bool
83+
DeleteNamespace bool
84+
AuthByPath map[string]bundlereader.Auth
85+
CorrectDrift bool
86+
CorrectDriftForce bool
87+
CorrectDriftKeepFailHistory bool
88+
OCIRegistry OCIRegistrySpec
89+
OCIRegistrySecret string
90+
DrivenScan bool
91+
DrivenScanSeparator string
92+
JobNameEnvVar string
93+
BundleCreationMaxConcurrency int
9194
}
9295

9396
func globDirs(baseDir string) (result []string, err error) {
@@ -106,7 +109,12 @@ func globDirs(baseDir string) (result []string, err error) {
106109
return
107110
}
108111

109-
const bundleCreationMaxConcurrency = 4
112+
func getEffectiveMaxConcurrency(configured int) int {
113+
if configured <= 0 {
114+
return defaultBundleCreationMaxConcurrency
115+
}
116+
return configured
117+
}
110118

111119
// CreateBundles creates bundles from the baseDirs, their names are prefixed with
112120
// repoName. Depending on opts.Output the bundles are created in the cluster or
@@ -116,13 +124,15 @@ func CreateBundles(pctx context.Context, client client.Client, r record.EventRec
116124
baseDirs = []string{"."}
117125
}
118126

127+
maxConcurrency := getEffectiveMaxConcurrency(opts.BundleCreationMaxConcurrency)
128+
119129
// Using an errgroup to manage concurrency
120130
// 1. Goroutines will be launched, honouring the concurrency limit, and eventually block trying to write to `bundlesChan`.
121131
// 2. The main function will read from `bundlesChan`, hence unblocking the goroutines. This will continue to read from `bundlesChan` until it is closed.
122132
// 3. We use another goroutine to wait for all goroutines to finish, then close `bundlesChan`, finally unblocking the main function.
123133
bundlesChan := make(chan *fleet.Bundle)
124134
eg, ctx := errgroup.WithContext(pctx)
125-
eg.SetLimit(bundleCreationMaxConcurrency + 1) // extra goroutine for WalkDir loop
135+
eg.SetLimit(maxConcurrency + 1) // extra goroutine for WalkDir loop
126136
eg.Go(func() error {
127137
for _, baseDir := range baseDirs {
128138
matches, err := globDirs(baseDir)
@@ -218,18 +228,19 @@ func CreateBundlesDriven(pctx context.Context, client client.Client, r record.Ev
218228
baseDirs = []string{"."}
219229
}
220230

231+
maxConcurrency := getEffectiveMaxConcurrency(opts.BundleCreationMaxConcurrency)
232+
221233
// Using an errgroup to manage concurrency
222234
// 1. Goroutines will be launched, honouring the concurrency limit, and eventually block trying to write to `bundlesChan`.
223235
// 2. The main function will read from `bundlesChan`, hence unblocking the goroutines. This will continue to read from `bundlesChan` until it is closed.
224236
// 3. We use another goroutine to wait for all goroutines to finish, then close `bundlesChan`, finally unblocking the main function.
225237
bundlesChan := make(chan *fleet.Bundle)
226238
eg, ctx := errgroup.WithContext(pctx)
227-
eg.SetLimit(bundleCreationMaxConcurrency + 1) // extra goroutine for WalkDir loop
239+
eg.SetLimit(maxConcurrency + 1) // extra goroutine for scanning loop
228240
eg.Go(func() error {
229241
for _, baseDir := range baseDirs {
230242
opts := opts
231243
eg.Go(func() error {
232-
// verify if it also defines a fleetFile
233244
var err error
234245
baseDir, opts.BundleFile, err = getPathAndFleetYaml(baseDir, opts.DrivenScanSeparator)
235246
if err != nil {
@@ -817,18 +828,24 @@ func setAuthByPath(opts *Options, path string) error {
817828
return nil
818829
}
819830

820-
func GetOnConflictRetries() (int, error) {
821-
s := os.Getenv(FleetApplyConflictRetriesEnv)
822-
if s != "" {
823-
// check if we have a valid value
824-
// it must be an integer
825-
r, err := strconv.Atoi(s)
826-
if err != nil {
827-
return defaultApplyConflictRetries, err
828-
} else {
829-
return r, nil
830-
}
831+
// getIntEnvVar reads an integer from an environment variable, returning the default if unset or invalid.
832+
func getIntEnvVar(envVarName string, defaultValue int) (int, error) {
833+
s := os.Getenv(envVarName)
834+
if s == "" {
835+
return defaultValue, nil
831836
}
832837

833-
return defaultApplyConflictRetries, nil
838+
val, err := strconv.Atoi(s)
839+
if err != nil {
840+
return defaultValue, err
841+
}
842+
return val, nil
843+
}
844+
845+
func GetOnConflictRetries() (int, error) {
846+
return getIntEnvVar(FleetApplyConflictRetriesEnv, defaultApplyConflictRetries)
847+
}
848+
849+
func GetBundleCreationMaxConcurrency() (int, error) {
850+
return getIntEnvVar(BundleCreationMaxConcurrencyEnv, defaultBundleCreationMaxConcurrency)
834851
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package apply
2+
3+
import (
4+
"os"
5+
"testing"
6+
)
7+
8+
func TestGetEffectiveMaxConcurrency(t *testing.T) {
9+
tests := map[string]struct {
10+
input int
11+
expected int
12+
}{
13+
"zero defaults to 4": {0, 4},
14+
"negative defaults to 4": {-1, 4},
15+
"custom value 8": {8, 8},
16+
"custom value 16": {16, 16},
17+
"custom value 12": {12, 12},
18+
}
19+
20+
for name, tt := range tests {
21+
t.Run(name, func(t *testing.T) {
22+
if got := getEffectiveMaxConcurrency(tt.input); got != tt.expected {
23+
t.Errorf("expected %d, got %d", tt.expected, got)
24+
}
25+
})
26+
}
27+
}
28+
29+
func TestGetBundleCreationMaxConcurrency(t *testing.T) {
30+
tests := []struct {
31+
name string
32+
envValue string
33+
expectedValue int
34+
expectedError bool
35+
}{
36+
{
37+
name: "default when env var not set",
38+
envValue: "",
39+
expectedValue: 4,
40+
expectedError: false,
41+
},
42+
{
43+
name: "custom value 8",
44+
envValue: "8",
45+
expectedValue: 8,
46+
expectedError: false,
47+
},
48+
{
49+
name: "custom value 16",
50+
envValue: "16",
51+
expectedValue: 16,
52+
expectedError: false,
53+
},
54+
{
55+
name: "invalid value returns error",
56+
envValue: "not_a_number",
57+
expectedValue: 4,
58+
expectedError: true,
59+
},
60+
{
61+
name: "zero is valid but caller handles default",
62+
envValue: "0",
63+
expectedValue: 0,
64+
expectedError: false,
65+
},
66+
}
67+
68+
for _, tt := range tests {
69+
t.Run(tt.name, func(t *testing.T) {
70+
// Save and restore the environment variable
71+
oldVal, wasSet := os.LookupEnv(BundleCreationMaxConcurrencyEnv)
72+
defer func() {
73+
if wasSet {
74+
os.Setenv(BundleCreationMaxConcurrencyEnv, oldVal)
75+
} else {
76+
os.Unsetenv(BundleCreationMaxConcurrencyEnv)
77+
}
78+
}()
79+
80+
if tt.envValue != "" {
81+
os.Setenv(BundleCreationMaxConcurrencyEnv, tt.envValue)
82+
} else {
83+
os.Unsetenv(BundleCreationMaxConcurrencyEnv)
84+
}
85+
86+
got, err := GetBundleCreationMaxConcurrency()
87+
if (err != nil) != tt.expectedError {
88+
t.Errorf("expected error %v, got %v", tt.expectedError, err != nil)
89+
}
90+
if got != tt.expectedValue {
91+
t.Errorf("expected %d, got %d", tt.expectedValue, got)
92+
}
93+
})
94+
}
95+
}

0 commit comments

Comments
 (0)