Skip to content

Commit e790594

Browse files
committed
refactoring
1 parent 1475566 commit e790594

File tree

4 files changed

+224
-62
lines changed

4 files changed

+224
-62
lines changed

REQUIREMENTS.md

Lines changed: 157 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,163 @@ All tests passing ✅
9898

9999
## Moving from direct scenario creation to CRD approach
100100

101-
- I want to change the current implementation of the KRKN job creation, I want to have a CRD called `KrknChaosJob` that must have
102-
all the details currently defined in the `createScenarioJob` method in internal/api/handlers.go to instantiate a job
103-
- I'd like that the reconcile loop keeps track of the Job status and updates the `KrknChaosJob` accordinglyall
104-
- I want to have a controller able to reconcile the `KrknChaosJob` and instantiate the the chaos job as it does the createScenarioJob
105-
- I want that the current /scenarios/run methods creates the new CR `KrknChaosJob` and returns the job uuid
106-
- I want that the `GetScenarioRunStatus` is eventually adapted to this new behaviour
101+
### ✅ Implemented
102+
- Changed from direct Pod creation to CRD-based approach with `KrknScenarioRun`
103+
- Controller reconciles `KrknScenarioRun` and creates jobs for each target cluster
104+
- API endpoints updated to use `scenarioRunName` as primary identifier
105+
- Multi-cluster support with aggregated status
106+
107+
### API Endpoints Structure (Nested Approach)
108+
109+
```
110+
POST /api/v1/scenarios/run
111+
→ Creates KrknScenarioRun CR
112+
→ Returns: {scenarioRunName, clusterNames, totalTargets}
113+
114+
GET /api/v1/scenarios/run/{scenarioRunName}
115+
→ Returns aggregated status with list of clusterJobs (each with jobId)
116+
117+
GET /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}
118+
→ Returns status of a single job (TODO)
119+
120+
GET /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}/logs
121+
→ WebSocket stream of logs for specific job (TODO - currently uses clusterName)
122+
123+
DELETE /api/v1/scenarios/run/{scenarioRunName}
124+
→ Deletes entire scenario run (all jobs)
125+
126+
DELETE /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}
127+
→ Terminates a single job (TODO)
128+
```
129+
130+
### 🔧 TODO: Pod Recreation and Retry Logic
131+
132+
#### Current Behavior
133+
- Controller creates one pod per cluster when KrknScenarioRun is created
134+
- No automatic retry on pod failure
135+
- No distinction between user-initiated deletion and failure
136+
137+
#### Requirements
138+
139+
**1. Automatic Retry on Failure**
140+
- When a pod fails (phase=Failed), the controller should retry creating a new pod
141+
- Maximum number of retry attempts should be configurable (suggested: 3)
142+
- Retry attempts should be tracked in ClusterJobStatus
143+
- Exponential backoff between retries (suggested: 10s, 30s, 60s)
144+
145+
**2. Manual Cancellation vs Failure**
146+
- User-initiated job deletion (DELETE /jobs/{jobId}) should NOT trigger retry
147+
- Need to distinguish between "pod failed" and "user cancelled"
148+
- Proposed solution: Add a field to ClusterJobStatus to track cancellation intent
149+
150+
**3. Job Lifecycle States**
151+
```
152+
Pending → Running → Succeeded (terminal)
153+
→ Failed → Retrying → Running → ...
154+
→ Cancelled (terminal, no retry)
155+
→ MaxRetriesExceeded (terminal)
156+
```
157+
158+
#### Proposed Solution Options
159+
160+
##### Option A: Cancellation Field in Status (Recommended)
161+
```go
162+
type ClusterJobStatus struct {
163+
ClusterName string
164+
JobId string
165+
PodName string
166+
Phase string // Pending, Running, Succeeded, Failed, Cancelled, MaxRetriesExceeded
167+
168+
// NEW FIELDS
169+
RetryCount int `json:"retryCount,omitempty"`
170+
MaxRetries int `json:"maxRetries,omitempty"` // Default: 3
171+
CancelRequested bool `json:"cancelRequested,omitempty"`
172+
LastRetryTime *metav1.Time `json:"lastRetryTime,omitempty"`
173+
174+
StartTime *metav1.Time
175+
CompletionTime *metav1.Time
176+
Message string
177+
}
178+
```
179+
180+
**Controller Logic**:
181+
```go
182+
// In updateClusterJobStatuses()
183+
if pod.Status.Phase == corev1.PodFailed {
184+
if job.CancelRequested {
185+
job.Phase = "Cancelled" // Terminal, no retry
186+
} else if job.RetryCount < job.MaxRetries {
187+
job.Phase = "Retrying"
188+
job.RetryCount++
189+
job.LastRetryTime = now
190+
// Create new pod with new jobId
191+
createClusterJob(ctx, scenarioRun, clusterName)
192+
} else {
193+
job.Phase = "MaxRetriesExceeded" // Terminal
194+
}
195+
}
196+
```
197+
198+
**DELETE /jobs/{jobId} Handler**:
199+
```go
200+
func (h *Handler) DeleteJob(w http.ResponseWriter, r *http.Request) {
201+
// 1. Find KrknScenarioRun containing this jobId
202+
// 2. Set job.CancelRequested = true in status
203+
// 3. Delete the pod
204+
// 4. Controller sees CancelRequested → does NOT retry
205+
}
206+
```
207+
208+
##### Option B: Finalizers for Cancellation Tracking
209+
- Add finalizer `krkn.krkn-chaos.dev/job-cancellation` to ClusterJobStatus
210+
- When user deletes job, add finalizer before deleting pod
211+
- Controller checks for finalizer → skips retry
212+
- More complex, but leverages K8s patterns
213+
214+
##### Option C: Separate CancellationRequest CR
215+
- Create a new CRD `KrknJobCancellation` to track cancellation intent
216+
- Controller watches both KrknScenarioRun and KrknJobCancellation
217+
- More decoupled, but adds complexity
218+
219+
#### Implementation Plan (TODO)
220+
221+
1. **Phase 1: Add Retry Fields to CRD**
222+
- Update `ClusterJobStatus` with retry tracking fields
223+
- Add `maxRetries` to `KrknScenarioRunSpec` (default: 3)
224+
- Regenerate manifests
225+
226+
2. **Phase 2: Implement Retry Logic in Controller**
227+
- Detect pod failure vs cancellation
228+
- Implement retry with exponential backoff
229+
- Update job phase to reflect retry state
230+
- Create new pod with new jobId on retry
231+
232+
3. **Phase 3: Add DELETE /jobs/{jobId} Endpoint**
233+
- Parse scenarioRunName and jobId from path
234+
- Set CancelRequested flag in CR status
235+
- Delete pod
236+
- Return success
237+
238+
4. **Phase 4: Update GET /jobs/{jobId} and Logs Endpoints**
239+
- Change from `/logs/{clusterName}` to `/logs/{jobId}`
240+
- Support nested path: `/scenarios/run/{scenarioRunName}/jobs/{jobId}/logs`
241+
- Add endpoint: `GET /scenarios/run/{scenarioRunName}/jobs/{jobId}` for single job status
242+
243+
#### Configuration
244+
245+
Add to KrknScenarioRunSpec:
246+
```yaml
247+
apiVersion: krkn.krkn-chaos.dev/v1alpha1
248+
kind: KrknScenarioRun
249+
spec:
250+
# ... existing fields ...
251+
252+
# Retry configuration
253+
maxRetries: 3 # Default: 3, set to 0 to disable retry
254+
retryBackoff: exponential # exponential or fixed
255+
retryDelay: 10s # Initial delay for exponential, fixed delay for fixed
256+
```
107257
108258
### Overview
109-
TODO
259+
The CRD-based approach provides better state management, automatic reconciliation, and improved observability compared to direct Pod creation.
110260

config/rbac/role.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,30 @@ rules:
4444
- get
4545
- patch
4646
- update
47+
- apiGroups:
48+
- krkn.krkn-chaos.dev
49+
resources:
50+
- krknoperatortargets
51+
verbs:
52+
- get
53+
- list
54+
- watch
55+
- apiGroups:
56+
- krkn.krkn-chaos.dev
57+
resources:
58+
- krkntargetrequests
59+
verbs:
60+
- create
61+
- get
62+
- list
63+
- patch
64+
- update
65+
- watch
66+
- apiGroups:
67+
- krkn.krkn-chaos.dev
68+
resources:
69+
- krkntargetrequests/status
70+
verbs:
71+
- get
72+
- patch
73+
- update

config/rbac/role_binding.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
apiVersion: rbac.authorization.k8s.io/v1
2-
kind: RoleBinding
2+
kind: ClusterRoleBinding
33
metadata:
44
labels:
55
app.kubernetes.io/name: krkn-operator
66
app.kubernetes.io/managed-by: kustomize
77
name: manager-rolebinding
8-
namespace: system
98
roleRef:
109
apiGroup: rbac.authorization.k8s.io
11-
kind: Role
10+
kind: ClusterRole
1211
name: manager-role
1312
subjects:
1413
- kind: ServiceAccount

internal/api/handlers.go

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ var upgrader = websocket.Upgrader{
10481048
},
10491049
}
10501050

1051-
// GetScenarioRunLogs handles GET /api/v1/scenarios/run/{scenarioRunName}/logs/{clusterName} endpoint
1051+
// GetScenarioRunLogs handles GET /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}/logs endpoint
10521052
// It streams the stdout/stderr logs of a running or completed job via WebSocket
10531053
func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
10541054
logger := log.Log.WithName("websocket-logs")
@@ -1064,8 +1064,8 @@ func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
10641064
}
10651065
defer conn.Close()
10661066

1067-
// Extract scenarioRunName and clusterName from path
1068-
// Path format: /api/v1/scenarios/run/{scenarioRunName}/logs/{clusterName}
1067+
// Extract scenarioRunName and jobId from path
1068+
// Path format: /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}/logs
10691069
path := r.URL.Path
10701070
prefix := "/api/v1/scenarios/run/"
10711071

@@ -1078,68 +1078,54 @@ func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
10781078
// Remove prefix
10791079
remainder := path[len(prefix):]
10801080

1081-
// Split by "/logs/"
1082-
parts := strings.Split(remainder, "/logs/")
1081+
// Split by "/jobs/" and "/logs"
1082+
parts := strings.Split(remainder, "/jobs/")
10831083
if len(parts) != 2 {
10841084
logger.Error(nil, "Invalid logs endpoint path format", "path", path)
1085-
conn.WriteMessage(websocket.TextMessage, []byte("ERROR: Invalid path format. Expected: /api/v1/scenarios/run/{scenarioRunName}/logs/{clusterName}"))
1085+
conn.WriteMessage(websocket.TextMessage, []byte("ERROR: Invalid path format. Expected: /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}/logs"))
10861086
return
10871087
}
10881088

10891089
scenarioRunName := parts[0]
1090-
clusterName := parts[1]
1090+
jobIdAndLogs := parts[1]
10911091

1092-
if scenarioRunName == "" || clusterName == "" {
1093-
logger.Error(nil, "Empty scenarioRunName or clusterName in request path", "path", path)
1094-
conn.WriteMessage(websocket.TextMessage, []byte("ERROR: scenarioRunName and clusterName cannot be empty"))
1092+
// Extract jobId (remove "/logs" suffix)
1093+
if !strings.HasSuffix(jobIdAndLogs, "/logs") {
1094+
logger.Error(nil, "Invalid logs endpoint path format", "path", path)
1095+
conn.WriteMessage(websocket.TextMessage, []byte("ERROR: Invalid path format. Expected: /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}/logs"))
10951096
return
10961097
}
10971098

1098-
logger.Info("WebSocket connection established", "scenarioRunName", scenarioRunName, "clusterName", clusterName, "client_ip", r.RemoteAddr)
1099-
1100-
ctx := context.Background()
1099+
jobId := strings.TrimSuffix(jobIdAndLogs, "/logs")
11011100

1102-
// Fetch the KrknScenarioRun CR
1103-
var scenarioRun krknv1alpha1.KrknScenarioRun
1104-
if err := h.client.Get(ctx, client.ObjectKey{
1105-
Name: scenarioRunName,
1106-
Namespace: h.namespace,
1107-
}, &scenarioRun); err != nil {
1108-
logger.Error(err, "Failed to fetch scenario run", "scenarioRunName", scenarioRunName)
1109-
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("ERROR: Scenario run '%s' not found", scenarioRunName)))
1101+
if scenarioRunName == "" || jobId == "" {
1102+
logger.Error(nil, "Empty scenarioRunName or jobId in request path", "path", path)
1103+
conn.WriteMessage(websocket.TextMessage, []byte("ERROR: scenarioRunName and jobId cannot be empty"))
11101104
return
11111105
}
11121106

1113-
// Find the job for the requested cluster
1114-
var jobId, podName string
1115-
for _, job := range scenarioRun.Status.ClusterJobs {
1116-
if job.ClusterName == clusterName {
1117-
jobId = job.JobId
1118-
podName = job.PodName
1119-
break
1120-
}
1121-
}
1107+
logger.Info("WebSocket connection established", "scenarioRunName", scenarioRunName, "jobId", jobId, "client_ip", r.RemoteAddr)
11221108

1123-
if jobId == "" {
1124-
logger.Error(nil, "Cluster not found in scenario run",
1125-
"scenarioRunName", scenarioRunName,
1126-
"clusterName", clusterName)
1127-
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("ERROR: Cluster '%s' not found in scenario run '%s'", clusterName, scenarioRunName)))
1109+
ctx := context.Background()
1110+
1111+
// Find pod by jobId label (no need to fetch the CR)
1112+
var podList corev1.PodList
1113+
if err := h.client.List(ctx, &podList, client.InNamespace(h.namespace), client.MatchingLabels{
1114+
"krkn-job-id": jobId,
1115+
}); err != nil {
1116+
logger.Error(err, "Failed to list pods", "jobId", jobId)
1117+
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("ERROR: Failed to list pods: %s", err.Error())))
11281118
return
11291119
}
11301120

1131-
// Fetch the pod
1132-
var pod corev1.Pod
1133-
if err := h.client.Get(ctx, client.ObjectKey{
1134-
Name: podName,
1135-
Namespace: h.namespace,
1136-
}, &pod); err != nil {
1137-
logger.Error(err, "Failed to fetch pod", "podName", podName)
1138-
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("ERROR: Pod '%s' not found", podName)))
1121+
if len(podList.Items) == 0 {
1122+
logger.Error(nil, "Job not found", "jobId", jobId)
1123+
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("ERROR: Job with ID '%s' not found", jobId)))
11391124
return
11401125
}
11411126

1142-
logger.Info("Found pod for cluster", "scenarioRunName", scenarioRunName, "clusterName", clusterName, "podName", pod.Name, "podPhase", pod.Status.Phase)
1127+
pod := podList.Items[0]
1128+
logger.Info("Found pod for job", "scenarioRunName", scenarioRunName, "jobId", jobId, "podName", pod.Name, "podPhase", pod.Status.Phase)
11431129

11441130
// Parse query parameters
11451131
follow := r.URL.Query().Get("follow") == "true"
@@ -1163,7 +1149,7 @@ func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
11631149

11641150
logger.Info("Opening log stream",
11651151
"scenarioRunName", scenarioRunName,
1166-
"clusterName", clusterName,
1152+
"jobId", jobId,
11671153
"podName", pod.Name,
11681154
"follow", follow,
11691155
"timestamps", timestamps)
@@ -1174,15 +1160,15 @@ func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
11741160
if err != nil {
11751161
logger.Error(err, "Failed to open log stream",
11761162
"scenarioRunName", scenarioRunName,
1177-
"clusterName", clusterName,
1163+
"jobId", jobId,
11781164
"podName", pod.Name,
11791165
"namespace", h.namespace)
11801166
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("ERROR: Failed to open log stream: %s", err.Error())))
11811167
return
11821168
}
11831169
defer stream.Close()
11841170

1185-
logger.Info("Streaming logs started", "scenarioRunName", scenarioRunName, "clusterName", clusterName, "podName", pod.Name)
1171+
logger.Info("Streaming logs started", "scenarioRunName", scenarioRunName, "jobId", jobId, "podName", pod.Name)
11861172

11871173
// Read logs line by line and send via WebSocket
11881174
scanner := bufio.NewScanner(stream)
@@ -1193,7 +1179,7 @@ func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
11931179
if err != nil {
11941180
logger.Error(err, "Failed to write log line to WebSocket, client likely disconnected",
11951181
"scenarioRunName", scenarioRunName,
1196-
"clusterName", clusterName,
1182+
"jobId", jobId,
11971183
"podName", pod.Name,
11981184
"linesStreamed", lineCount)
11991185
return
@@ -1205,7 +1191,7 @@ func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
12051191
if err := scanner.Err(); err != nil {
12061192
logger.Error(err, "Log stream scanner error",
12071193
"scenarioRunName", scenarioRunName,
1208-
"clusterName", clusterName,
1194+
"jobId", jobId,
12091195
"podName", pod.Name,
12101196
"linesStreamed", lineCount)
12111197
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("ERROR: Log stream error: %s", err.Error())))
@@ -1214,7 +1200,7 @@ func (h *Handler) GetScenarioRunLogs(w http.ResponseWriter, r *http.Request) {
12141200

12151201
logger.Info("Log streaming completed",
12161202
"scenarioRunName", scenarioRunName,
1217-
"clusterName", clusterName,
1203+
"jobId", jobId,
12181204
"podName", pod.Name,
12191205
"totalLines", lineCount)
12201206

@@ -1383,8 +1369,8 @@ func (h *Handler) ScenariosRunRouter(w http.ResponseWriter, r *http.Request) {
13831369
}
13841370

13851371
if strings.HasPrefix(path, "/api/v1/scenarios/run/") {
1386-
// Check for logs endpoint: /api/v1/scenarios/run/{scenarioRunName}/logs/{clusterName}
1387-
if strings.Contains(path, "/logs/") && r.Method == http.MethodGet {
1372+
// Check for logs endpoint: /api/v1/scenarios/run/{scenarioRunName}/jobs/{jobId}/logs
1373+
if strings.Contains(path, "/jobs/") && strings.HasSuffix(path, "/logs") && r.Method == http.MethodGet {
13881374
h.GetScenarioRunLogs(w, r)
13891375
} else if r.Method == http.MethodGet {
13901376
h.GetScenarioRunStatus(w, r)

0 commit comments

Comments
 (0)