@@ -17,12 +17,14 @@ package interruption
1717import (
1818 "context"
1919 "fmt"
20+ "sync"
2021 "time"
2122
2223 "github.com/awslabs/operatorpkg/reconciler"
2324 "github.com/awslabs/operatorpkg/singleton"
2425 "go.uber.org/multierr"
2526 "k8s.io/client-go/util/workqueue"
27+ "k8s.io/utils/clock"
2628 controllerruntime "sigs.k8s.io/controller-runtime"
2729 "sigs.k8s.io/controller-runtime/pkg/client"
2830 "sigs.k8s.io/controller-runtime/pkg/log"
@@ -31,36 +33,59 @@ import (
3133 "sigs.k8s.io/karpenter/pkg/events"
3234 "sigs.k8s.io/karpenter/pkg/operator/injection"
3335
34- "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages/instancestatusfailure"
36+ "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages"
37+ instancestatusmsg "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages/instancestatus"
3538 awserrors "github.com/aws/karpenter-provider-aws/pkg/errors"
3639 "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus"
3740)
3841
42+ // unhealthyKey uniquely identifies an unhealthy status check for deduplication.
43+ // The metric is only incremented the first time a given instance+category is observed.
44+ type unhealthyKey struct {
45+ instanceID string
46+ category string
47+ }
48+
3949var (
4050 // InstanceStatusInterval is the polling interval for the EC2 DescribeInstanceStatus API.
4151 InstanceStatusInterval = 1 * time .Minute
52+ // InstanceStatusDryRun controls whether the instance status controller takes action on
53+ // unhealthy instances. When true, the controller only emits metrics without cordoning
54+ // and draining affected nodes. Default is false (full remediation enabled).
55+ InstanceStatusDryRun = false
56+ // categoryToKind maps EC2 DescribeInstanceStatus categories to message kinds.
57+ categoryToKind = map [instancestatus.Category ]messages.Kind {
58+ instancestatus .InstanceStatus : messages .InstanceStatusKind ,
59+ instancestatus .SystemStatus : messages .SystemStatusKind ,
60+ instancestatus .EventStatus : messages .EventStatusKind ,
61+ }
4262)
4363
4464// InstanceStatusController polls EC2 DescribeInstanceStatus to detect unhealthy instances
4565// and scheduled maintenance events, then cordons and drains affected nodes.
4666type InstanceStatusController struct {
4767 InterruptionHandler
4868 instanceStatusProvider instancestatus.Provider
69+ seen map [unhealthyKey ]struct {}
70+ mu sync.Mutex
4971}
5072
5173func NewInstanceStatusController (
5274 kubeClient client.Client ,
75+ clk clock.Clock ,
5376 cloudProvider cloudprovider.CloudProvider ,
5477 recorder events.Recorder ,
5578 instanceStatusProvider instancestatus.Provider ,
5679) * InstanceStatusController {
5780 return & InstanceStatusController {
5881 InterruptionHandler : InterruptionHandler {
5982 kubeClient : kubeClient ,
83+ clk : clk ,
6084 cloudProvider : cloudProvider ,
6185 recorder : recorder ,
6286 },
6387 instanceStatusProvider : instanceStatusProvider ,
88+ seen : map [unhealthyKey ]struct {}{},
6489 }
6590}
6691
@@ -76,25 +101,68 @@ func (c *InstanceStatusController) Reconcile(ctx context.Context) (reconciler.Re
76101 return reconciler.Result {}, fmt .Errorf ("getting instance statuses, %w" , err )
77102 }
78103
104+ // Build the set of keys observed in this poll cycle for pruning stale entries.
105+ currentKeys := make (map [unhealthyKey ]struct {})
79106 errs := make ([]error , len (instanceStatuses ))
80107 workqueue .ParallelizeUntil (ctx , 10 , len (instanceStatuses ), func (i int ) {
81- categories := map [string ]bool {}
82- for _ , d := range instanceStatuses [i ].Details {
83- categories [string (d .Category )] = true
84- }
85- for cat := range categories {
86- InstanceStatusUnhealthy .Inc (map [string ]string {categoryLabel : cat })
87- }
88- if err := c .handleMessage (ctx , instancestatusfailure .Message (instanceStatuses [i ])); err != nil {
89- errs [i ] = fmt .Errorf ("handling instance status check message, %w" , err )
90- }
108+ errs [i ] = c .handleHealthStatus (ctx , instanceStatuses [i ], currentKeys )
91109 })
110+
111+ // Prune entries for instances that are no longer reported as unhealthy,
112+ // so that if the same instance becomes unhealthy again later it gets counted again.
113+ c .mu .Lock ()
114+ for key := range c .seen {
115+ if _ , ok := currentKeys [key ]; ! ok {
116+ delete (c .seen , key )
117+ }
118+ }
119+ c .mu .Unlock ()
120+
92121 if err = multierr .Combine (errs ... ); err != nil {
93122 return reconciler.Result {}, err
94123 }
95124 return reconciler.Result {RequeueAfter : InstanceStatusInterval }, nil
96125}
97126
127+ // handleHealthStatus dispatches a message per EC2 status category and records metrics.
128+ func (c * InstanceStatusController ) handleHealthStatus (ctx context.Context , hs instancestatus.HealthStatus , currentKeys map [unhealthyKey ]struct {}) error {
129+ categories := make (map [instancestatus.Category ]struct {})
130+ for _ , d := range hs .Details {
131+ categories [d .Category ] = struct {}{}
132+ }
133+ for category := range categories {
134+ kind , ok := categoryToKind [category ]
135+ if ! ok {
136+ continue
137+ }
138+ f , err := c .handleMessage (ctx , instancestatusmsg .New (hs .InstanceID , kind , hs .ImpairedSince ), InstanceStatusDryRun )
139+ if err != nil {
140+ return fmt .Errorf ("handling instance status check message, %w" , err )
141+ }
142+ if f {
143+ c .recordUnhealthyInstance (ctx , hs .InstanceID , category , currentKeys )
144+ }
145+ }
146+ return nil
147+ }
148+
149+ func (c * InstanceStatusController ) recordUnhealthyInstance (ctx context.Context , instanceID string , category instancestatus.Category , currentKeys map [unhealthyKey ]struct {}) {
150+ key := unhealthyKey {instanceID : instanceID , category : string (category )}
151+ c .mu .Lock ()
152+ currentKeys [key ] = struct {}{}
153+ _ , already := c .seen [key ]
154+ if ! already {
155+ c .seen [key ] = struct {}{}
156+ }
157+ c .mu .Unlock ()
158+ if ! already {
159+ log .FromContext (ctx ).Info ("detected unhealthy instance owned by cluster" ,
160+ "instanceID" , instanceID ,
161+ "category" , string (category ))
162+ InstanceStatusUnhealthy .Inc (map [string ]string {categoryLabel : string (category )})
163+ }
164+ }
165+
98166func (c * InstanceStatusController ) Register (_ context.Context , m manager.Manager ) error {
99167 return controllerruntime .NewControllerManagedBy (m ).
100168 Named ("interruption.instancestatus" ).
0 commit comments