-
Notifications
You must be signed in to change notification settings - Fork 250
refactor(e2e): add DAG-based concurrent task execution for cluster setup #8149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
r2k1
wants to merge
22
commits into
main
Choose a base branch
from
r2k1/dag
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,190
−73
Open
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
71a4102
Add initial design spec for type-safe DAG execution library
9efee3e
Rename taskflow to tasks, use named Deps structs throughout
1e5f8bf
tasks: add core types — Task interface, Config, DAGError, TaskStatus
39b21c3
tasks: add graph discovery via reflection
b493f46
tasks: add cycle detection via topological sort
b3a2857
tasks: add concurrent scheduler with error strategies
2e5d06e
tasks: add integration tests and clean up vet warnings
0c3f1d4
tasks: simplify scheduler and address review findings
6ce7e7a
refactor(e2e): replace sequential prepareCluster with concurrent DAG
da1d4b2
refactor(e2e): simplify prepareCluster, remove clusterSetup struct
4f0e867
refactor(e2e): wire cluster as DAG task, pass functions directly
c58c727
refactor(e2e): eliminate all anonymous functions from prepareCluster
c9d0792
refactor(e2e): eliminate wrapper helpers, absorb args into functions
6efb867
refactor(e2e): use bind helpers, eliminate newClusterTask
d75c164
polish: improve naming, comments, and error wrapping in dag/tasks pac…
5fea3e4
refactor(e2e): remove unused tasks package in favor of dag package
4ad379a
chore: remove design spec docs, revert config.go changes
388a96a
refactor(e2e): remove bind/bindRun helpers, use inline closures
8718b71
docs(e2e): add note to keep prepareCluster minimal
31cd747
fix: address PR review comments on dag package and cluster.go
8ed5067
test(dag): expand coverage to 95.1% with 37 tests
8dbaa70
dag: recover panics in task goroutines and improve docs
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/Azure/agentbaker/e2e/config" | ||
| "github.com/Azure/agentbaker/e2e/dag" | ||
| "github.com/Azure/agentbaker/e2e/toolkit" | ||
| "github.com/Azure/azure-sdk-for-go/sdk/azcore" | ||
| "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" | ||
|
|
@@ -59,89 +60,82 @@ func (c *Cluster) MaxPodsPerNode() (int, error) { | |
| return 0, fmt.Errorf("cluster agentpool profiles were nil or empty: %+v", c.Model) | ||
| } | ||
|
|
||
| func prepareCluster(ctx context.Context, cluster *armcontainerservice.ManagedCluster, isNetworkIsolated, attachPrivateAcr bool) (*Cluster, error) { | ||
| // prepareCluster runs all cluster preparation steps as a concurrent DAG. | ||
| // This function contains complex concurrent orchestration — keep it as | ||
| // minimal as possible and push all non-trivial logic into the individual | ||
| // task functions it calls. | ||
| func prepareCluster(ctx context.Context, clusterModel *armcontainerservice.ManagedCluster, isNetworkIsolated, attachPrivateAcr bool) (*Cluster, error) { | ||
| defer toolkit.LogStepCtx(ctx, "preparing cluster")() | ||
| ctx, cancel := context.WithTimeout(ctx, config.Config.TestTimeoutCluster) | ||
| defer cancel() | ||
| cluster.Name = to.Ptr(fmt.Sprintf("%s-%s", *cluster.Name, hash(cluster))) | ||
| cluster, err := getOrCreateCluster(ctx, cluster) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("get or create cluster: %w", err) | ||
| } | ||
|
|
||
| bastion, err := getOrCreateBastion(ctx, cluster) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("get or create bastion: %w", err) | ||
| } | ||
|
|
||
| _, err = getOrCreateMaintenanceConfiguration(ctx, cluster) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("get or create maintenance configuration: %w", err) | ||
| } | ||
|
|
||
| subnetID, err := getClusterSubnetID(ctx, *cluster.Properties.NodeResourceGroup) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("get cluster subnet: %w", err) | ||
| } | ||
|
|
||
| resourceGroupName := config.ResourceGroupName(*cluster.Location) | ||
| clusterModel.Name = to.Ptr(fmt.Sprintf("%s-%s", *clusterModel.Name, hash(clusterModel))) | ||
|
|
||
| kube, err := getClusterKubeClient(ctx, resourceGroupName, *cluster.Name) | ||
| cluster, err := getOrCreateCluster(ctx, clusterModel) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("get kube client using cluster %q: %w", *cluster.Name, err) | ||
| return nil, fmt.Errorf("get or create cluster: %w", err) | ||
| } | ||
|
|
||
| kubeletIdentity, err := getClusterKubeletIdentity(cluster) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("getting cluster kubelet identity: %w", err) | ||
| } | ||
| g := dag.NewGroup(ctx) | ||
|
|
||
| if isNetworkIsolated || attachPrivateAcr { | ||
| // private acr must be created before we add the debug daemonsets | ||
| if err := addPrivateAzureContainerRegistry(ctx, cluster, kube, resourceGroupName, kubeletIdentity, true); err != nil { | ||
| return nil, fmt.Errorf("add private azure container registry (true): %w", err) | ||
| } | ||
| if err := addPrivateAzureContainerRegistry(ctx, cluster, kube, resourceGroupName, kubeletIdentity, false); err != nil { | ||
| return nil, fmt.Errorf("add private azure container registry (false): %w", err) | ||
| } | ||
| bastion := dag.Go(g, func(ctx context.Context) (*Bastion, error) { | ||
| return getOrCreateBastion(ctx, cluster) | ||
| }) | ||
| dag.Run(g, func(ctx context.Context) error { return ensureMaintenanceConfiguration(ctx, cluster) }) | ||
| subnet := dag.Go(g, func(ctx context.Context) (string, error) { return getClusterSubnetID(ctx, cluster) }) | ||
| kube := dag.Go(g, func(ctx context.Context) (*Kubeclient, error) { return getClusterKubeClient(ctx, cluster) }) | ||
| identity := dag.Go(g, func(ctx context.Context) (*armcontainerservice.UserAssignedIdentity, error) { | ||
| return getClusterKubeletIdentity(ctx, cluster) | ||
| }) | ||
| dag.Run(g, func(ctx context.Context) error { return collectGarbageVMSS(ctx, cluster) }) | ||
| var networkDeps []dag.Dep | ||
| if !isNetworkIsolated { | ||
| networkDeps = append(networkDeps, dag.Run(g, func(ctx context.Context) error { return addFirewallRules(ctx, cluster) })) | ||
| } | ||
| if isNetworkIsolated { | ||
| if err := addNetworkIsolatedSettings(ctx, cluster, *cluster.Location); err != nil { | ||
| return nil, fmt.Errorf("add network isolated settings: %w", err) | ||
| } | ||
| } | ||
| if !isNetworkIsolated { // network isolated cluster blocks all egress via NSG | ||
| if err := addFirewallRules(ctx, cluster, *cluster.Location); err != nil { | ||
| return nil, fmt.Errorf("add firewall rules: %w", err) | ||
| } | ||
| networkDeps = append(networkDeps, dag.Run(g, func(ctx context.Context) error { return addNetworkIsolatedSettings(ctx, cluster) })) | ||
| } | ||
| needACR := isNetworkIsolated || attachPrivateAcr | ||
| acrNonAnon := dag.Run2(g, kube, identity, addACR(cluster, needACR, true)) | ||
| acrAnon := dag.Run2(g, kube, identity, addACR(cluster, needACR, false)) | ||
| dag.Run1(g, kube, ensureDebugDaemonsets(cluster, isNetworkIsolated), append([]dag.Dep{acrNonAnon, acrAnon}, networkDeps...)...) | ||
| extract := dag.Go1(g, kube, extractClusterParams(cluster)) | ||
|
|
||
| if err := kube.EnsureDebugDaemonsets(ctx, isNetworkIsolated, config.GetPrivateACRName(true, *cluster.Location)); err != nil { | ||
| return nil, fmt.Errorf("ensure debug daemonsets for %q: %w", *cluster.Name, err) | ||
| if err := g.Wait(); err != nil { | ||
| return nil, fmt.Errorf("prepare cluster tasks: %w", err) | ||
| } | ||
| return &Cluster{ | ||
| Model: cluster, | ||
| Kube: kube.MustGet(), | ||
| KubeletIdentity: identity.MustGet(), | ||
| SubnetID: subnet.MustGet(), | ||
| ClusterParams: extract.MustGet(), | ||
| Bastion: bastion.MustGet(), | ||
| }, nil | ||
| } | ||
|
|
||
| // sometimes tests can be interrupted and vmss are left behind | ||
| // don't waste resource and delete them | ||
| if err := collectGarbageVMSS(ctx, cluster); err != nil { | ||
| return nil, fmt.Errorf("collect garbage vmss: %w", err) | ||
| func addACR(cluster *armcontainerservice.ManagedCluster, needACR, isNonAnonymousPull bool) func(context.Context, *Kubeclient, *armcontainerservice.UserAssignedIdentity) error { | ||
| return func(ctx context.Context, k *Kubeclient, id *armcontainerservice.UserAssignedIdentity) error { | ||
| if !needACR { | ||
| return nil | ||
| } | ||
| return addPrivateAzureContainerRegistry(ctx, cluster, k, id, isNonAnonymousPull) | ||
| } | ||
| } | ||
|
|
||
| clusterParams, err := extractClusterParameters(ctx, kube, cluster) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("extracting cluster parameters: %w", err) | ||
| func ensureDebugDaemonsets(cluster *armcontainerservice.ManagedCluster, isNetworkIsolated bool) func(context.Context, *Kubeclient) error { | ||
| return func(ctx context.Context, k *Kubeclient) error { | ||
| return k.EnsureDebugDaemonsets(ctx, isNetworkIsolated, config.GetPrivateACRName(true, *cluster.Location)) | ||
| } | ||
| } | ||
|
|
||
| return &Cluster{ | ||
| Model: cluster, | ||
| Kube: kube, | ||
| KubeletIdentity: kubeletIdentity, | ||
| SubnetID: subnetID, | ||
| ClusterParams: clusterParams, | ||
| Bastion: bastion, | ||
| }, nil | ||
| func extractClusterParams(cluster *armcontainerservice.ManagedCluster) func(context.Context, *Kubeclient) (*ClusterParams, error) { | ||
| return func(ctx context.Context, k *Kubeclient) (*ClusterParams, error) { | ||
| return extractClusterParameters(ctx, cluster, k) | ||
| } | ||
| } | ||
|
|
||
| func getClusterKubeletIdentity(cluster *armcontainerservice.ManagedCluster) (*armcontainerservice.UserAssignedIdentity, error) { | ||
| func getClusterKubeletIdentity(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*armcontainerservice.UserAssignedIdentity, error) { | ||
| if cluster == nil || cluster.Properties == nil || cluster.Properties.IdentityProfile == nil { | ||
| return nil, fmt.Errorf("cannot dereference cluster identity profile to extract kubelet identity ID") | ||
| } | ||
|
|
@@ -152,7 +146,7 @@ func getClusterKubeletIdentity(cluster *armcontainerservice.ManagedCluster) (*ar | |
| return kubeletIdentity, nil | ||
|
Comment on lines
138
to
146
|
||
| } | ||
|
|
||
| func extractClusterParameters(ctx context.Context, kube *Kubeclient, cluster *armcontainerservice.ManagedCluster) (*ClusterParams, error) { | ||
| func extractClusterParameters(ctx context.Context, cluster *armcontainerservice.ManagedCluster, kube *Kubeclient) (*ClusterParams, error) { | ||
| kubeconfig, err := clientcmd.Load(kube.KubeConfig) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("loading cluster kubeconfig: %w", err) | ||
|
|
@@ -423,16 +417,20 @@ func createNewAKSClusterWithRetry(ctx context.Context, cluster *armcontainerserv | |
| return nil, fmt.Errorf("failed to create cluster after %d attempts due to persistent 409 Conflict: %w", maxRetries, lastErr) | ||
| } | ||
|
|
||
| func getOrCreateMaintenanceConfiguration(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*armcontainerservice.MaintenanceConfiguration, error) { | ||
| existingMaintenance, err := config.Azure.Maintenance.Get(ctx, config.ResourceGroupName(*cluster.Location), *cluster.Name, "default", nil) | ||
| func ensureMaintenanceConfiguration(ctx context.Context, cluster *armcontainerservice.ManagedCluster) error { | ||
| _, err := config.Azure.Maintenance.Get(ctx, config.ResourceGroupName(*cluster.Location), *cluster.Name, "default", nil) | ||
| var azErr *azcore.ResponseError | ||
| if errors.As(err, &azErr) && azErr.StatusCode == 404 { | ||
| return createNewMaintenanceConfiguration(ctx, cluster) | ||
| _, err = createNewMaintenanceConfiguration(ctx, cluster) | ||
| if err != nil { | ||
| return fmt.Errorf("creating maintenance configuration for cluster %q: %w", *cluster.Name, err) | ||
| } | ||
| return nil | ||
| } | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to get maintenance configuration 'default' for cluster %q: %w", *cluster.Name, err) | ||
| return fmt.Errorf("failed to get maintenance configuration 'default' for cluster %q: %w", *cluster.Name, err) | ||
| } | ||
| return &existingMaintenance.MaintenanceConfiguration, nil | ||
| return nil | ||
| } | ||
|
|
||
| func createNewMaintenanceConfiguration(ctx context.Context, cluster *armcontainerservice.ManagedCluster) (*armcontainerservice.MaintenanceConfiguration, error) { | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepareClusterno longer enforces the previous ordering where network changes (firewall rules / network-isolated subnet+NSG updates) completed beforeEnsureDebugDaemonsetsruns. Because the daemonset creation triggers pod scheduling/image pulls, running it concurrently with route table/NSG updates can introduce e2e flakiness. Consider capturing the firewall/NSG task(s) asEffectvalues and adding them as explicit dependencies to theRun1(... ensureDebugDaemonsets ...)task (and possibly any other k8s API tasks that assume steady node egress).