Skip to content

Commit 419eac5

Browse files
authored
Remove dependency on apiserver for IPAMD startup (#3243)
* remove apiserver dependency for ipamd startup * fix format issue in UT * wait apiserver connectivty for pod annotate feature * return maxPods value directly when parsing the local file
1 parent 0e8092b commit 419eac5

6 files changed

Lines changed: 306 additions & 62 deletions

File tree

cmd/aws-k8s-agent/main.go

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package main
1616

1717
import (
1818
"os"
19+
"time"
1920

2021
"github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd"
2122
"github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi"
@@ -24,6 +25,7 @@ import (
2425
"github.com/aws/amazon-vpc-cni-k8s/pkg/version"
2526
"github.com/aws/amazon-vpc-cni-k8s/utils"
2627
metrics "github.com/aws/amazon-vpc-cni-k8s/utils/prometheusmetrics"
28+
"k8s.io/client-go/kubernetes"
2729
)
2830

2931
const (
@@ -36,44 +38,108 @@ const (
3638

3739
// Environment variable to disable the IPAMD introspection endpoint on 61679
3840
envDisableIntrospection = "DISABLE_INTROSPECTION"
41+
42+
restCfgTimeout = 5 * time.Second
43+
pollInterval = 5 * time.Second
44+
pollTimeout = 30 * time.Second
3945
)
4046

4147
func main() {
4248
os.Exit(_main())
4349
}
4450

51+
// startBackgroundAPIServerCheck checks API connectivity in the background
52+
func startBackgroundAPIServerCheck(ipamContext *ipamd.IPAMContext) {
53+
go func() {
54+
log := logger.Get()
55+
log.Info("Starting background API server connectivity check...")
56+
57+
// Create a new client for API server check
58+
restCfg, err := k8sapi.GetRestConfig()
59+
if err != nil {
60+
log.Errorf("Failed to get REST config for background API check: %v", err)
61+
return
62+
}
63+
restCfg.Timeout = restCfgTimeout
64+
clientSet, err := kubernetes.NewForConfig(restCfg)
65+
if err != nil {
66+
log.Errorf("Failed to create k8s client for background API check: %v", err)
67+
return
68+
}
69+
70+
// Keep checking until connection is established
71+
for {
72+
version, err := clientSet.Discovery().ServerVersion()
73+
if err == nil {
74+
log.Infof("API server connectivity established in background! Cluster Version is: %s", version.GitVersion)
75+
76+
// Update IPAM context with new API server connectivity
77+
ipamContext.SetAPIServerConnectivity(true)
78+
79+
// Exit the goroutine after successful connection
80+
log.Info("Background API server check completed successfully")
81+
return
82+
}
83+
84+
log.Debugf("Still waiting for API server connectivity in background: %v", err)
85+
time.Sleep(pollInterval)
86+
}
87+
}()
88+
}
89+
4590
func _main() int {
4691
// Do not add anything before initializing logger
4792
log := logger.Get()
4893

4994
log.Infof("Starting L-IPAMD %s ...", version.Version)
5095
version.RegisterMetric()
5196

97+
enabledPodEni := ipamd.EnablePodENI()
98+
enabledCustomNetwork := ipamd.UseCustomNetworkCfg()
99+
enabledPodAnnotation := ipamd.EnablePodIPAnnotation()
100+
withApiServer := false
52101
// Check API Server Connectivity
53-
if err := k8sapi.CheckAPIServerConnectivity(); err != nil {
54-
log.Errorf("Failed to check API server connectivity: %s", err)
55-
return 1
102+
if enabledPodEni || enabledCustomNetwork || enabledPodAnnotation {
103+
log.Info("SGP, custom networking or pod annotation feature is in use, waiting for API server connectivity to start IPAMD")
104+
if err := k8sapi.CheckAPIServerConnectivity(); err != nil {
105+
log.Errorf("Failed to check API server connectivity: %s", err)
106+
return 1
107+
} else {
108+
log.Info("API server connectivity established.")
109+
withApiServer = true
110+
}
111+
} else {
112+
log.Infof("Waiting to connect API server for upto %s...", pollTimeout)
113+
// Try a quick check first
114+
if err := k8sapi.CheckAPIServerConnectivityWithTimeout(pollInterval, pollTimeout); err != nil {
115+
log.Warn("Proceeding without API server connectivity, will run background API server connectivity check")
116+
withApiServer = false
117+
} else {
118+
log.Info("API server connectivity established.")
119+
withApiServer = true
120+
}
56121
}
57-
58122
// Create Kubernetes client for API server requests
59123
k8sClient, err := k8sapi.CreateKubeClient(appName)
60124
if err != nil {
61125
log.Errorf("Failed to create kube client: %s", err)
62-
return 1
63126
}
64-
65127
// Create EventRecorder for use by IPAMD
66-
if err := eventrecorder.Init(k8sClient); err != nil {
128+
if err := eventrecorder.Init(k8sClient, withApiServer); err != nil {
67129
log.Errorf("Failed to create event recorder: %s", err)
68-
return 1
130+
log.Warn("Skipping event recorder initialization")
69131
}
70-
71-
ipamContext, err := ipamd.New(k8sClient)
132+
ipamContext, err := ipamd.New(k8sClient, withApiServer)
72133
if err != nil {
73134
log.Errorf("Initialization failure: %v", err)
74135
return 1
75136
}
76137

138+
// If not connected to API server yet, start background checks
139+
if !withApiServer {
140+
startBackgroundAPIServerCheck(ipamContext)
141+
}
142+
77143
// Pool manager
78144
go ipamContext.StartNodeIPPoolManager()
79145

pkg/ipamd/ipamd.go

Lines changed: 142 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"sync/atomic"
2525
"time"
2626

27+
"github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi"
28+
2729
"github.com/aws/smithy-go"
2830

2931
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -42,7 +44,6 @@ import (
4244
"github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils"
4345
"github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig"
4446
"github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore"
45-
"github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi"
4647
"github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils"
4748
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/cniutils"
4849
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
@@ -185,6 +186,13 @@ const (
185186
// envEnableNetworkPolicy is used to enable IPAMD/CNI to send pod create events to network policy agent.
186187
envNetworkPolicyMode = "NETWORK_POLICY_ENFORCING_MODE"
187188
defaultNetworkPolicyMode = "standard"
189+
190+
defaultMaxPodsFromKubelet = 110
191+
kubeletConfigPath = "/host/etc/kubernetes/kubelet/kubelet-config.json"
192+
eniMaxPodsFilePath = "/app/eni-max-pods.txt"
193+
194+
// Application name for k8s client
195+
appName = "aws-node"
188196
)
189197

190198
var log = logger.Get()
@@ -230,6 +238,11 @@ type IPAMContext struct {
230238
enablePodIPAnnotation bool
231239
maxPods int // maximum number of pods that can be scheduled on the node
232240
networkPolicyMode string
241+
withApiServer bool
242+
}
243+
244+
type kubeletConfig struct {
245+
MaxPods *int64 `json:"maxPods"`
233246
}
234247

235248
// setUnmanagedENIs will rebuild the set of ENI IDs for ENIs tagged as "no_manage"
@@ -335,7 +348,7 @@ func (c *IPAMContext) inInsufficientCidrCoolingPeriod() bool {
335348

336349
// New retrieves IP address usage information from Instance MetaData service and Kubelet
337350
// then initializes IP address pool data store
338-
func New(k8sClient client.Client) (*IPAMContext, error) {
351+
func New(k8sClient client.Client, withApiServer bool) (*IPAMContext, error) {
339352
prometheusRegister()
340353
c := &IPAMContext{}
341354
c.k8sClient = k8sClient
@@ -360,9 +373,9 @@ func New(k8sClient client.Client) (*IPAMContext, error) {
360373
c.warmIPTarget = getWarmIPTarget()
361374
c.minimumIPTarget = getMinimumIPTarget()
362375
c.warmPrefixTarget = getWarmPrefixTarget()
363-
c.enablePodENI = enablePodENI()
376+
c.enablePodENI = EnablePodENI()
364377
c.enableManageUntaggedMode = enableManageUntaggedMode()
365-
c.enablePodIPAnnotation = enablePodIPAnnotation()
378+
c.enablePodIPAnnotation = EnablePodIPAnnotation()
366379
c.numNetworkCards = len(c.awsClient.GetNetworkCards())
367380

368381
c.networkPolicyMode, err = getNetworkPolicyMode()
@@ -385,7 +398,7 @@ func New(k8sClient client.Client) (*IPAMContext, error) {
385398
c.myNodeName = os.Getenv(envNodeName)
386399
checkpointer := datastore.NewJSONFile(dsBackingStorePath())
387400
c.dataStore = datastore.NewDataStore(log, checkpointer, c.enablePrefixDelegation)
388-
401+
c.withApiServer = withApiServer
389402
if err := c.nodeInit(); err != nil {
390403
return nil, err
391404
}
@@ -523,21 +536,32 @@ func (c *IPAMContext) nodeInit() error {
523536
}, 30*time.Second)
524537
}
525538

526-
// Make a k8s client request for the current node so that max pods can be derived
527-
node, err := k8sapi.GetNode(ctx, c.k8sClient)
528-
if err != nil {
529-
log.Errorf("Failed to get node", err)
530-
podENIErrInc("nodeInit")
531-
return err
532-
}
533-
534-
maxPods, isInt64 := node.Status.Capacity.Pods().AsInt64()
535-
if !isInt64 {
536-
log.Errorf("Failed to parse max pods: %s", node.Status.Capacity.Pods().String)
537-
podENIErrInc("nodeInit")
538-
return errors.New("error while trying to determine max pods")
539+
// if apiserver is connected, get the maxPods from node
540+
var node corev1.Node
541+
if c.withApiServer {
542+
node, err := k8sapi.GetNode(ctx, c.k8sClient)
543+
if err != nil {
544+
log.Errorf("Failed to get node, %s", err)
545+
podENIErrInc("nodeInit")
546+
return err
547+
} else {
548+
maxPods, isInt64 := node.Status.Capacity.Pods().AsInt64()
549+
if !isInt64 {
550+
log.Errorf("Failed to parse max pods: %s", node.Status.Capacity.Pods().String)
551+
podENIErrInc("nodeInit")
552+
return errors.New("error while trying to determine max pods")
553+
}
554+
c.maxPods = int(maxPods)
555+
}
556+
} else {
557+
maxPods, err := c.getMaxPodsFromFile()
558+
if err != nil {
559+
log.Warnf("Using default maxPods as %d because reading from file failed: %v", defaultMaxPodsFromKubelet, err)
560+
c.maxPods = defaultMaxPodsFromKubelet
561+
} else {
562+
c.maxPods = int(maxPods)
563+
}
539564
}
540-
c.maxPods = int(maxPods)
541565

542566
if c.useCustomNetworking {
543567
// When custom networking is enabled and a valid ENIConfig is found, IPAMD patches the CNINode
@@ -1691,6 +1715,57 @@ func (c *IPAMContext) warmIPTargetsDefined() bool {
16911715
return c.warmIPTarget != noWarmIPTarget || c.minimumIPTarget != noMinimumIPTarget
16921716
}
16931717

1718+
// max pods from instance type mapping file
1719+
type instanceTypeMaxPodsMapping map[string]int64
1720+
1721+
// getMaxPodsFromFile reads the max pods value from the eni-max-pods.txt file
1722+
// based on the instance type
1723+
func (c *IPAMContext) getMaxPodsFromFile() (int64, error) {
1724+
instanceType := c.awsClient.GetInstanceType()
1725+
if instanceType == "" {
1726+
return 0, fmt.Errorf("failed to get instance type")
1727+
}
1728+
1729+
data, err := os.ReadFile(eniMaxPodsFilePath)
1730+
if err != nil {
1731+
return 0, fmt.Errorf("failed to read ENI max pods file: %w", err)
1732+
}
1733+
1734+
maxPods, err := parseMaxPodsForInstanceFromFile(string(data), instanceType)
1735+
return maxPods, err
1736+
}
1737+
1738+
// parseMaxPodsFile parses the eni-max-pods.txt file content and returns a mapping
1739+
// of instance type to max pods
1740+
func parseMaxPodsForInstanceFromFile(content string, instanceType string) (int64, error) {
1741+
lines := strings.Split(content, "\n")
1742+
1743+
for _, line := range lines {
1744+
// Skip comments and empty lines
1745+
line = strings.TrimSpace(line)
1746+
if line == "" || strings.HasPrefix(line, "#") {
1747+
continue
1748+
}
1749+
1750+
// Split the line into instance type and max pods
1751+
parts := strings.Fields(line)
1752+
if len(parts) != 2 {
1753+
continue
1754+
}
1755+
1756+
currInstanceType := parts[0]
1757+
if currInstanceType == instanceType {
1758+
maxPods, err := strconv.ParseInt(parts[1], 10, 64)
1759+
if err != nil {
1760+
return 0, fmt.Errorf("failed to parse maxPods for instance type %q: %v", instanceType, err)
1761+
}
1762+
return maxPods, nil
1763+
}
1764+
}
1765+
1766+
return 0, fmt.Errorf("instance type %q not found in ENI max pods file", instanceType)
1767+
}
1768+
16941769
// UseCustomNetworkCfg returns whether Pods needs to use pod specific configuration or not.
16951770
func UseCustomNetworkCfg() bool {
16961771
return parseBoolEnvVar(envCustomNetworkCfg, false)
@@ -1766,7 +1841,7 @@ func disableLeakedENICleanup() bool {
17661841
return isIPv6Enabled() || disableENIProvisioning() || utils.GetBoolAsStringEnvVar(envDisableLeakedENICleanup, false)
17671842
}
17681843

1769-
func enablePodENI() bool {
1844+
func EnablePodENI() bool {
17701845
return utils.GetBoolAsStringEnvVar(envEnablePodENI, false)
17711846
}
17721847

@@ -1796,7 +1871,7 @@ func enableManageUntaggedMode() bool {
17961871
return utils.GetBoolAsStringEnvVar(envManageUntaggedENI, true)
17971872
}
17981873

1799-
func enablePodIPAnnotation() bool {
1874+
func EnablePodIPAnnotation() bool {
18001875
return utils.GetBoolAsStringEnvVar(envAnnotatePodIP, false)
18011876
}
18021877

@@ -2355,3 +2430,49 @@ func (c *IPAMContext) AddFeatureToCNINode(ctx context.Context, featureName rcv1a
23552430
newCNINode.Spec.Features = append(newCNINode.Spec.Features, newFeature)
23562431
return c.k8sClient.Patch(ctx, newCNINode, client.MergeFromWithOptions(cniNode, client.MergeFromWithOptimisticLock{}))
23572432
}
2433+
2434+
// SetAPIServerConnectivity updates the API server connectivity status and reconfigures
2435+
// components that depend on API server access
2436+
func (c *IPAMContext) SetAPIServerConnectivity(connected bool) {
2437+
if c.withApiServer == connected {
2438+
// Status didn't change
2439+
return
2440+
}
2441+
2442+
log.Infof("Updating API server connectivity status from %v to %v", c.withApiServer, connected)
2443+
c.withApiServer = connected
2444+
2445+
if connected {
2446+
// API server is now available - update maxPods from node object
2447+
// First, try to recreate the client with caching enabled
2448+
newClient, err := k8sapi.CreateKubeClient(appName)
2449+
if err != nil {
2450+
log.Errorf("Failed to recreate k8s client with cache when API server became available: %v", err)
2451+
} else {
2452+
log.Info("Successfully recreated k8s client with cache after API server became available")
2453+
c.k8sClient = newClient
2454+
}
2455+
2456+
// Now get the node to update maxPods
2457+
ctx := context.TODO()
2458+
node, err := k8sapi.GetNode(ctx, c.k8sClient)
2459+
if err != nil {
2460+
log.Errorf("Failed to get node after API server connection established: %s", err)
2461+
} else {
2462+
maxPods, isInt64 := node.Status.Capacity.Pods().AsInt64()
2463+
if !isInt64 {
2464+
log.Errorf("Failed to parse max pods from node: %s", node.Status.Capacity.Pods().String())
2465+
} else {
2466+
// Update maxPods with the value from the node
2467+
oldMaxPods := c.maxPods
2468+
c.maxPods = int(maxPods)
2469+
log.Infof("Updated maxPods from %d to %d based on node capacity", oldMaxPods, c.maxPods)
2470+
}
2471+
}
2472+
} else {
2473+
// API server connection was lost
2474+
// No action needed here as we already have maxPods from file or kubelet
2475+
// and we want to keep working with that value
2476+
log.Info("API server connection lost, continuing with current maxPods value")
2477+
}
2478+
}

0 commit comments

Comments
 (0)