Skip to content

Commit b05f1bf

Browse files
author
Igor Velichkovich
committed
fix(remediation): retry on errors and throw errors to trigger retries
Signed-off-by: Igor Velichkovich <[email protected]>
1 parent fd4466e commit b05f1bf

23 files changed

+2050
-656
lines changed

.gitignore

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,30 +140,7 @@ dist/
140140
### JetBrains IDEs (GoLand, PyCharm, IntelliJ) ###
141141
### JetBrains IDEs (GoLand, PyCharm, IntelliJ) ###
142142
# User-specific stuff
143-
.idea/**/workspace.xml
144-
.idea/**/tasks.xml
145-
.idea/**/usage.statistics.xml
146-
.idea/**/dictionaries
147-
.idea/**/shelf
148-
149-
# AWS User-specific
150-
.idea/**/aws.xml
151-
152-
# Generated files
153-
.idea/**/contentModel.xml
154-
155-
# Sensitive or high-churn files
156-
.idea/**/dataSources/
157-
.idea/**/dataSources.ids
158-
.idea/**/dataSources.local.xml
159-
.idea/**/sqlDataSources.xml
160-
.idea/**/dynamic.xml
161-
.idea/**/uiDesigner.xml
162-
.idea/**/dbnavigator.xml
163-
164-
# Gradle
165-
.idea/**/gradle.xml
166-
.idea/**/libraries
143+
.idea/
167144

168145
# CMake
169146
cmake-build-*/

fault-remediation/main.go

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"flag"
2121
"fmt"
2222
"log/slog"
23+
"net/http"
2324
"os"
2425
"os/signal"
2526
"strconv"
@@ -100,28 +101,16 @@ func run() error {
100101
TomlConfigPath: tomlConfigPath,
101102
DryRun: dryRun,
102103
EnableLogCollector: enableLogCollector,
104+
UseCtrlRuntime: enableControllerRuntime,
103105
}
104106

105-
components, err := initializer.InitializeAll(ctx, params)
106-
if err != nil {
107-
return fmt.Errorf("initialization failed: %w", err)
108-
}
109-
110-
reconciler := components.FaultRemediationReconciler
111-
112-
defer func() {
113-
if err := reconciler.CloseAll(ctx); err != nil {
114-
slog.Error("failed to close datastore components", "error", err)
115-
}
116-
}()
117-
118107
if enableControllerRuntime {
119-
err = setupCtrlRuntimeManagement(ctx, components)
108+
err := setupCtrlRuntimeManagement(ctx, params)
120109
if err != nil {
121110
return err
122111
}
123112
} else {
124-
err = setupNonCtrlRuntimeManaged(ctx, components)
113+
err := setupNonCtrlRuntimeManaged(ctx, params)
125114
if err != nil {
126115
return err
127116
}
@@ -130,9 +119,22 @@ func run() error {
130119
return nil
131120
}
132121

133-
func setupNonCtrlRuntimeManaged(ctx context.Context, components *initializer.Components) error {
122+
func setupNonCtrlRuntimeManaged(ctx context.Context, params initializer.InitializationParams) error {
134123
slog.Info("Running without controller runtime management")
135124

125+
components, err := initializer.InitializeAll(ctx, params, nil)
126+
if err != nil {
127+
return fmt.Errorf("initialization failed: %w", err)
128+
}
129+
130+
reconciler := components.FaultRemediationReconciler
131+
132+
defer func() {
133+
if err := reconciler.CloseAll(ctx); err != nil {
134+
slog.Error("failed to close datastore components", "error", err)
135+
}
136+
}()
137+
136138
metricsAddr = strings.TrimPrefix(metricsAddr, ":")
137139

138140
portInt, err := strconv.Atoi(metricsAddr)
@@ -165,7 +167,19 @@ func setupNonCtrlRuntimeManaged(ctx context.Context, components *initializer.Com
165167

166168
for event := range components.FaultRemediationReconciler.Watcher.Events() {
167169
slog.Info("Event received", "event", event)
168-
_, _ = components.FaultRemediationReconciler.Reconcile(gCtx, &event)
170+
171+
for i := 1; i <= components.FaultRemediationReconciler.Config.UpdateMaxRetries; i++ {
172+
_, err = components.FaultRemediationReconciler.Reconcile(gCtx, &event)
173+
if err == nil {
174+
break
175+
}
176+
177+
slog.Error("Error processing event", "event", event, "error", err)
178+
179+
if i < components.FaultRemediationReconciler.Config.UpdateMaxRetries {
180+
time.Sleep(components.FaultRemediationReconciler.Config.UpdateRetryDelay)
181+
}
182+
}
169183
}
170184

171185
return nil
@@ -174,10 +188,16 @@ func setupNonCtrlRuntimeManaged(ctx context.Context, components *initializer.Com
174188
return g.Wait()
175189
}
176190

177-
func setupCtrlRuntimeManagement(ctx context.Context, components *initializer.Components) error {
191+
func setupCtrlRuntimeManagement(ctx context.Context, params initializer.InitializationParams) error {
178192
slog.Info("Running in controller runtime managed mode")
179193

180-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
194+
cfg := ctrl.GetConfigOrDie()
195+
cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
196+
return auditlogger.NewAuditingRoundTripper(rt)
197+
})
198+
199+
//TODO: setup informers for node and job
200+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
181201
Scheme: scheme,
182202
Metrics: metricsserver.Options{
183203
BindAddress: metricsAddr,
@@ -205,6 +225,19 @@ func setupCtrlRuntimeManagement(ctx context.Context, components *initializer.Com
205225
return err
206226
}
207227

228+
components, err := initializer.InitializeAll(ctx, params, mgr.GetClient())
229+
if err != nil {
230+
return fmt.Errorf("initialization failed: %w", err)
231+
}
232+
233+
reconciler := components.FaultRemediationReconciler
234+
235+
defer func() {
236+
if err := reconciler.CloseAll(ctx); err != nil {
237+
slog.Error("failed to close datastore components", "error", err)
238+
}
239+
}()
240+
208241
err = components.FaultRemediationReconciler.SetupWithManager(ctx, mgr)
209242
if err != nil {
210243
return fmt.Errorf("SetupWithManager failed: %w", err)
@@ -235,7 +268,7 @@ func parseFlags() {
235268
" (otherwise metrics and health are on same port).",
236269
)
237270

238-
flag.StringVar(&kubeconfigPath, "kubeconfig-path", "", "path to kubeconfig file")
271+
flag.StringVar(&kubeconfigPath, "kubeconfig", "", "path to kubeconfig file")
239272

240273
flag.StringVar(&tomlConfigPath, "config-path", "/etc/config/config.toml",
241274
"path where the fault remediation config file is present")
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package annotation
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log/slog"
8+
"time"
9+
10+
corev1 "k8s.io/api/core/v1"
11+
"k8s.io/apimachinery/pkg/types"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
)
14+
15+
// CtrlRuntimeAnnotationManager manages node annotations for tracking remediation state.
16+
type CtrlRuntimeAnnotationManager struct {
17+
client client.Client
18+
}
19+
20+
// NewCtrlRuntimeAnnotationManager creates a new CtrlRuntimeAnnotationManager.
21+
func NewCtrlRuntimeAnnotationManager(client client.Client) *CtrlRuntimeAnnotationManager {
22+
return &CtrlRuntimeAnnotationManager{
23+
client: client,
24+
}
25+
}
26+
27+
// GetRemediationState retrieves the current remediation state from node annotation
28+
func (m *CtrlRuntimeAnnotationManager) GetRemediationState(
29+
ctx context.Context,
30+
nodeName string,
31+
) (*RemediationStateAnnotation, *corev1.Node, error) {
32+
node := &corev1.Node{}
33+
34+
err := m.client.Get(ctx, types.NamespacedName{
35+
Name: nodeName,
36+
}, node)
37+
if err != nil {
38+
return nil, node, fmt.Errorf("failed to get node %s: %w", nodeName, err)
39+
}
40+
// TODO: maybe split this up so it's not returning both node and state
41+
42+
annotationValue, exists := node.Annotations[AnnotationKey]
43+
if !exists {
44+
// No annotation means no active remediations
45+
return &RemediationStateAnnotation{
46+
EquivalenceGroups: make(map[string]EquivalenceGroupState),
47+
}, node, nil
48+
}
49+
50+
var state RemediationStateAnnotation
51+
if err = json.Unmarshal([]byte(annotationValue), &state); err != nil {
52+
slog.Error("Failed to unmarshal annotation", "node", nodeName, "error", err)
53+
// Return empty state if unmarshal fails
54+
return &RemediationStateAnnotation{
55+
EquivalenceGroups: make(map[string]EquivalenceGroupState),
56+
}, node, nil
57+
}
58+
59+
return &state, node, nil
60+
}
61+
62+
// UpdateRemediationState updates the node annotation with new remediation state
63+
func (m *CtrlRuntimeAnnotationManager) UpdateRemediationState(ctx context.Context, nodeName string,
64+
group string, crName string) error {
65+
// Get current state
66+
state, node, err := m.GetRemediationState(ctx, nodeName)
67+
if err != nil {
68+
// Log but continue with empty state
69+
slog.Warn("Failed to get current remediation state", "node", nodeName, "error", err)
70+
71+
state = &RemediationStateAnnotation{
72+
EquivalenceGroups: make(map[string]EquivalenceGroupState),
73+
}
74+
}
75+
76+
// Update state for the group
77+
state.EquivalenceGroups[group] = EquivalenceGroupState{
78+
MaintenanceCR: crName,
79+
CreatedAt: time.Now().UTC(),
80+
}
81+
82+
// Marshal to JSON
83+
stateJSON, err := json.Marshal(state)
84+
if err != nil {
85+
return fmt.Errorf("failed to marshal remediation state: %w", err)
86+
}
87+
88+
patchBase := node.DeepCopy()
89+
if node.Annotations == nil {
90+
node.Annotations = map[string]string{}
91+
}
92+
93+
node.Annotations[AnnotationKey] = string(stateJSON)
94+
95+
if err = m.client.Patch(ctx, node, client.MergeFrom(patchBase)); err != nil {
96+
return fmt.Errorf("failed to patch node annotation: %w", err)
97+
}
98+
99+
slog.Info("Updated remediation state annotation for node",
100+
"node", nodeName,
101+
"group", group,
102+
"crName", crName)
103+
104+
return nil
105+
}
106+
107+
// ClearRemediationState removes the remediation state annotation from a node
108+
func (m *CtrlRuntimeAnnotationManager) ClearRemediationState(ctx context.Context, nodeName string) error {
109+
node := &corev1.Node{}
110+
111+
err := m.client.Get(ctx, types.NamespacedName{
112+
Name: nodeName,
113+
}, node)
114+
if err != nil {
115+
return fmt.Errorf("failed to get node %s: %w", nodeName, err)
116+
}
117+
118+
if node.Annotations == nil {
119+
return nil
120+
}
121+
122+
patchBase := node.DeepCopy()
123+
delete(node.Annotations, AnnotationKey)
124+
125+
if err = m.client.Patch(ctx, node, client.MergeFrom(patchBase)); err != nil {
126+
return fmt.Errorf("failed to patch node annotation: %w", err)
127+
}
128+
129+
slog.Info("Cleared remediation state annotation for node", "node", nodeName)
130+
131+
return nil
132+
}
133+
134+
// RemoveGroupFromState removes a specific group from the remediation state
135+
func (m *CtrlRuntimeAnnotationManager) RemoveGroupFromState(ctx context.Context, nodeName string, group string) error {
136+
state, node, err := m.GetRemediationState(ctx, nodeName)
137+
if err != nil {
138+
return fmt.Errorf("failed to get current remediation state: %w", err)
139+
}
140+
141+
// Remove the group
142+
delete(state.EquivalenceGroups, group)
143+
144+
// If no groups remain, clear the entire annotation
145+
if len(state.EquivalenceGroups) == 0 {
146+
return m.ClearRemediationState(ctx, nodeName)
147+
}
148+
149+
// Marshal to JSON
150+
stateJSON, err := json.Marshal(state)
151+
if err != nil {
152+
return fmt.Errorf("failed to marshal remediation state: %w", err)
153+
}
154+
155+
patchBase := node.DeepCopy()
156+
if node.Annotations == nil {
157+
node.Annotations = map[string]string{}
158+
}
159+
160+
node.Annotations[AnnotationKey] = string(stateJSON)
161+
162+
if err = m.client.Patch(ctx, node, client.MergeFrom(patchBase)); err != nil {
163+
return fmt.Errorf("failed to patch node annotation: %w", err)
164+
}
165+
166+
slog.Info("Removed group from remediation state for node", "node", nodeName, "group", group)
167+
168+
return nil
169+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package annotation
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
corev1 "k8s.io/api/core/v1"
8+
)
9+
10+
const (
11+
// AnnotationKey is the key for the node annotation that tracks remediation state
12+
AnnotationKey = "latestFaultRemediationState"
13+
)
14+
15+
// NodeAnnotationManagerInterface defines the interface for managing node annotations
16+
type NodeAnnotationManagerInterface interface {
17+
GetRemediationState(ctx context.Context, nodeName string) (*RemediationStateAnnotation, *corev1.Node, error)
18+
UpdateRemediationState(ctx context.Context, nodeName string, group string, crName string) error
19+
ClearRemediationState(ctx context.Context, nodeName string) error
20+
RemoveGroupFromState(ctx context.Context, nodeName string, group string) error
21+
}
22+
23+
// RemediationStateAnnotation represents the structure of the node annotation
24+
type RemediationStateAnnotation struct {
25+
EquivalenceGroups map[string]EquivalenceGroupState `json:"equivalenceGroups"`
26+
}
27+
28+
// EquivalenceGroupState represents the state of a single equivalence group
29+
type EquivalenceGroupState struct {
30+
MaintenanceCR string `json:"maintenanceCR"`
31+
CreatedAt time.Time `json:"createdAt"`
32+
}

0 commit comments

Comments
 (0)