Skip to content

Commit c38dd7e

Browse files
authored
Merge pull request #28 from crytic/additional-faults
Additional faults. Addresses #26 #16 #3
2 parents 34ab0e6 + 1e4a4cd commit c38dd7e

19 files changed

+465
-53
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
3. Install chaos-mesh
1717
1. `kubectl create ns chaos-mesh`
1818
2. `helm repo add chaos-mesh https://charts.chaos-mesh.org`
19-
3. `helm install chaos-mesh chaos-mesh/chaos-mesh -n=chaos-mesh --version 2.6.1 --set chaosDaemon.runtime=containerd --set chaosDaemon.socketPath=/run/containerd/containerd.sock --set dashboard.securityMode=false`
19+
3. `helm install chaos-mesh chaos-mesh/chaos-mesh -n=chaos-mesh --version 2.6.1 --set chaosDaemon.runtime=containerd --set chaosDaemon.socketPath=/run/containerd/containerd.sock --set dashboard.securityMode=false --set bpfki.create=true`
2020
4. To access chaos dashboard, use `kubectl --namespace chaos-mesh port-forward svc/chaos-dashboard 2333`
2121
4. Install kurtosis locally.
2222
5. Run `kurtosis cluster set cloud`

cmd/attacknet/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func main() {
4949
err = pkg.StartTestSuite(ctx, cfg)
5050
if err != nil {
5151
log.Fatal(err)
52+
os.Exit(1)
5253
}
5354
}
5455
}

pkg/chaos-mesh/session.go

Lines changed: 121 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
api "github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
77
"github.com/kurtosis-tech/stacktrace"
88
log "github.com/sirupsen/logrus"
9+
v1 "k8s.io/api/core/v1"
910
"reflect"
1011
"sigs.k8s.io/controller-runtime/pkg/client"
1112
"time"
@@ -23,38 +24,73 @@ const (
2324
Error FaultStatus = "Error"
2425
)
2526

27+
var FaultHasNoDurationErr = fmt.Errorf("this fault has no expected duration")
28+
2629
// succeeded (inject worked, now back to normal)
2730
// failure?
2831
// time out?
2932

3033
type FaultSession struct {
31-
client *ChaosClient
32-
faultKind *api.ChaosKind
33-
faultSpec map[string]interface{}
34-
Name string
35-
podsFailingRecovery map[string]*api.Record
36-
TestStartTime time.Time
37-
TestDuration *time.Duration
38-
TestEndTime time.Time
34+
client *ChaosClient
35+
faultKind *api.ChaosKind
36+
faultType string
37+
faultAction string
38+
faultSpec map[string]interface{}
39+
Name string
40+
podsFailingRecovery map[string]*api.Record
41+
checkedForMissingPods bool
42+
podsExpectedMissing int
43+
TestStartTime time.Time
44+
TestDuration *time.Duration
45+
TestEndTime *time.Time
46+
TargetSelectionCompleted bool
3947
}
4048

4149
func NewFaultSession(ctx context.Context, client *ChaosClient, faultKind *api.ChaosKind, faultSpec map[string]interface{}, name string) (*FaultSession, error) {
4250
now := time.Now()
4351

52+
faultKindStr, ok := faultSpec["kind"].(string)
53+
if !ok {
54+
return nil, stacktrace.NewError("failed to decode faultSpec.kind to string: %s", faultSpec["kind"])
55+
}
56+
57+
spec, ok := faultSpec["spec"].(map[string]interface{})
58+
if !ok {
59+
return nil, stacktrace.NewError("failed to decode faultSpec.spec to map[string]interface{}")
60+
}
61+
62+
faultAction, ok := spec["action"].(string)
63+
if !ok {
64+
return nil, stacktrace.NewError("failed to decode faultSpec.spec.action to string: %s", spec["action"])
65+
}
66+
4467
partial := &FaultSession{
45-
client: client,
46-
faultKind: faultKind,
47-
faultSpec: faultSpec,
48-
Name: name,
49-
podsFailingRecovery: map[string]*api.Record{},
50-
TestStartTime: now,
68+
client: client,
69+
faultKind: faultKind,
70+
faultType: faultKindStr,
71+
faultSpec: spec,
72+
faultAction: faultAction,
73+
Name: name,
74+
podsFailingRecovery: map[string]*api.Record{},
75+
TestStartTime: now,
76+
podsExpectedMissing: 0,
77+
checkedForMissingPods: false,
78+
TargetSelectionCompleted: false,
5179
}
5280
duration, err := partial.getDuration(ctx)
5381
if err != nil {
54-
return nil, err
82+
if err == FaultHasNoDurationErr {
83+
partial.TestDuration = nil
84+
partial.TestEndTime = nil
85+
} else {
86+
return nil, err
87+
}
88+
} else {
89+
partial.TestDuration = duration
90+
endTime := now.Add(*duration)
91+
partial.TestEndTime = &endTime
5592
}
56-
partial.TestDuration = duration
57-
partial.TestEndTime = now.Add(*duration)
93+
5894
return partial, nil
5995
}
6096

@@ -72,12 +108,39 @@ func (f *FaultSession) getKubeResource(ctx context.Context) (client.Object, erro
72108
return resource, nil
73109
}
74110

75-
func (f *FaultSession) getDetailedStatus(ctx context.Context) ([]*api.Record, error) {
111+
func (f *FaultSession) checkTargetSelectionCompleted(resource client.Object) error {
112+
if f.TargetSelectionCompleted {
113+
return nil
114+
}
115+
conditionsVal := reflect.ValueOf(resource).Elem().FieldByName("Status").FieldByName("ChaosStatus").FieldByName("Conditions")
116+
conditions, ok := conditionsVal.Interface().([]api.ChaosCondition)
117+
if !ok || conditions == nil {
118+
return stacktrace.NewError("Unable to decode status.chaosstatus.conditions")
119+
}
120+
for _, condition := range conditions {
121+
if condition.Type != api.ConditionSelected {
122+
continue
123+
}
124+
if condition.Status == v1.ConditionTrue {
125+
log.Info("chaos-mesh has identified pods to inject into")
126+
f.TargetSelectionCompleted = true
127+
}
128+
break
129+
}
130+
return nil
131+
}
132+
133+
func (f *FaultSession) getFaultRecords(ctx context.Context) ([]*api.Record, error) {
76134
resource, err := f.getKubeResource(ctx)
77135
if err != nil {
78136
return nil, err
79137
}
80138

139+
err = f.checkTargetSelectionCompleted(resource)
140+
if err != nil {
141+
return nil, err
142+
}
143+
81144
// Feel free to figure out a better way to do this. These fields are part of every Chaos status struct we support,
82145
// but since they don't implement a common interface containing the status fields, there's no clean or simple way
83146
// to extract the values in Go. One alternate option may be to serialize to json, then deserialize into an object
@@ -117,18 +180,51 @@ func (f *FaultSession) checkForFailedRecovery(record *api.Record) (bool, []strin
117180
return true, distinctMessages
118181
}
119182

183+
/*
184+
Determines whether the fault will leave some pods in a terminated state, and how many pods will be impacted.
185+
This must be run after the fault manifest has been applied and the handler webhook has run.
186+
*/
187+
func (f *FaultSession) checkForMissingPods(records []*api.Record) error {
188+
if !f.checkedForMissingPods {
189+
f.checkedForMissingPods = true
190+
// we expect missing pods when the fault is pod kill.
191+
192+
podsInjected := countInjectedPods(records)
193+
log.Infof("Chaos-mesh has identified %d pods matching the targeting criteria", podsInjected)
194+
if f.faultType == "PodChaos" && f.faultAction == "pod-kill" {
195+
f.podsExpectedMissing = podsInjected
196+
log.Infof("We're expecting %d pods to be terminated from the selected fault", f.podsExpectedMissing)
197+
}
198+
}
199+
return nil
200+
}
201+
202+
func countInjectedPods(records []*api.Record) int {
203+
podsInjected := 0
204+
for _, record := range records {
205+
if record.Phase == "Injected" {
206+
podsInjected += 1
207+
}
208+
}
209+
return podsInjected
210+
}
211+
120212
// todo: we need a better way of monitoring fault injection status. There's a ton of statefulness represented in
121213
// chaos-mesh that we're glancing over. Situations such as a pod crashing during a fault may produce unexpected behavior
122214
// in this code as it currently stands.
123215
func (f *FaultSession) GetStatus(ctx context.Context) (FaultStatus, error) {
124-
records, err := f.getDetailedStatus(ctx)
216+
records, err := f.getFaultRecords(ctx)
125217
if err != nil {
126218
return Error, err
127219
}
128220

129221
if records == nil {
130222
return Starting, nil
131223
}
224+
err = f.checkForMissingPods(records)
225+
if err != nil {
226+
return Error, err
227+
}
132228

133229
podsInjectedAndRecovered := 0
134230
podsInjectedNotRecovered := 0
@@ -152,15 +248,13 @@ func (f *FaultSession) GetStatus(ctx context.Context) (FaultStatus, error) {
152248
}
153249
}
154250

155-
// todo: check if unrecovered pods are failing to recover ^^ up here PodRecord.Events[-1].Operation = "Recover", Type="Failed". Emit Message
156-
157251
if podsNotInjected > 0 {
158252
return Starting, nil
159253
}
160-
if podsInjectedNotRecovered > 0 && podsInjectedAndRecovered == 0 {
254+
if podsInjectedNotRecovered-f.podsExpectedMissing > 0 && podsInjectedAndRecovered == 0 {
161255
return InProgress, nil
162256
}
163-
if podsInjectedAndRecovered+len(f.podsFailingRecovery) == len(records) {
257+
if podsInjectedAndRecovered+len(f.podsFailingRecovery)+f.podsExpectedMissing == len(records) {
164258
return Completed, nil
165259
}
166260
if podsInjectedNotRecovered > 0 && podsInjectedAndRecovered > 0 {
@@ -182,6 +276,10 @@ func (f *FaultSession) getDuration(ctx context.Context) (*time.Duration, error)
182276
if !ok {
183277
return nil, stacktrace.NewError("unable to cast durationVal to string")
184278
}
279+
if durationStr == nil {
280+
return nil, FaultHasNoDurationErr
281+
}
282+
185283
duration, err := time.ParseDuration(*durationStr)
186284
if err != nil {
187285
return nil, err

pkg/grafana.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type GrafanaTunnel struct {
1414
Client *grafanaSdk.Client
1515
portForwardStopCh chan struct{}
1616
allowPostFaultInspection bool
17+
cleanedUp bool
1718
}
1819

1920
func CreateGrafanaClient(ctx context.Context, namespace string, config AttacknetConfig) (*GrafanaTunnel, error) {
@@ -42,14 +43,16 @@ func CreateGrafanaClient(ctx context.Context, namespace string, config Attacknet
4243
return nil, stacktrace.Propagate(err, "unable to create Grafana client")
4344
}
4445

45-
return &GrafanaTunnel{client, stopCh, config.AllowPostFaultInspection}, nil
46+
return &GrafanaTunnel{client, stopCh, config.AllowPostFaultInspection, false}, nil
4647
}
4748

48-
func (t *GrafanaTunnel) Cleanup() {
49-
if t.allowPostFaultInspection {
50-
log.Info("Attacknet has completed, but since allowPostFaultInspection is set to true, the program will continue to run to facilitate the Grafana port-forward connection.")
51-
log.Info("Press enter to terminate the port-forward connection.")
52-
_, _ = fmt.Scanln()
49+
func (t *GrafanaTunnel) Cleanup(skipInspection bool) {
50+
if !t.cleanedUp {
51+
if t.allowPostFaultInspection && !skipInspection {
52+
log.Info("Press enter to terminate the port-forward connection.")
53+
_, _ = fmt.Scanln()
54+
}
55+
close(t.portForwardStopCh)
56+
t.cleanedUp = true
5357
}
54-
close(t.portForwardStopCh)
5558
}

pkg/kubernetes/port-forward.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import (
44
"errors"
55
"fmt"
66
"github.com/kurtosis-tech/stacktrace"
7+
log "github.com/sirupsen/logrus"
78
"k8s.io/client-go/rest"
89
"k8s.io/client-go/tools/portforward"
910
"k8s.io/client-go/transport/spdy"
1011
"net/http"
1112
"net/url"
12-
"os"
1313
"time"
1414
)
1515

@@ -31,11 +31,16 @@ func StartPortForwarding(pod, namespace string, port uint16, kubeConfig *rest.Co
3131

3232
stopCh = make(chan struct{}, 1)
3333
readyCh := make(chan struct{}, 1)
34-
portForward, err := portforward.New(dialer, []string{portFwd}, stopCh, readyCh, os.Stdout, os.Stderr)
34+
logger := log.New()
35+
36+
errLogger := CreatePrefixWriter("[port-forward] ", logger.WriterLevel(log.ErrorLevel))
37+
stdLogger := CreatePrefixWriter("[port-forward] ", logger.WriterLevel(log.InfoLevel))
38+
39+
portForward, err := portforward.New(dialer, []string{portFwd}, stopCh, readyCh, stdLogger, errLogger)
3540
if err != nil {
3641
return nil, stacktrace.Propagate(err, "unable to create port forward dialer")
3742
}
38-
fmt.Print("Starting port-forward to grafana pod")
43+
log.Info("Starting port-forward to grafana pod")
3944

4045
go func() {
4146
if err = portForward.ForwardPorts(); err != nil {
@@ -45,7 +50,7 @@ func StartPortForwarding(pod, namespace string, port uint16, kubeConfig *rest.Co
4550

4651
select {
4752
case <-readyCh:
48-
fmt.Print("Port-forward established.")
53+
log.Info("Port-forward established.")
4954
case <-time.After(time.Minute):
5055
return nil, errors.New("timed out after waiting to establish port forward")
5156
}

pkg/kubernetes/utils.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package kubernetes
2+
3+
import "io"
4+
5+
type LogPrefixWriter struct {
6+
prefix string
7+
writer io.Writer
8+
}
9+
10+
func CreatePrefixWriter(prefix string, writer io.Writer) *LogPrefixWriter {
11+
return &LogPrefixWriter{prefix, writer}
12+
}
13+
14+
func (pw *LogPrefixWriter) Write(p []byte) (n int, err error) {
15+
return pw.writer.Write(append([]byte(pw.prefix), p...))
16+
}

pkg/kurtosis.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,24 +139,24 @@ func StartNetwork(ctx context.Context, enclaveCtx *EnclaveContextWrapper, harnes
139139
if progress != nil {
140140
progressMsgs := progress.CurrentStepInfo
141141
for i := progressIndex; i < len(progressMsgs); i++ {
142-
log.Infof("Kurtosis: %s", progressMsgs[i])
142+
log.Infof("[Kurtosis] %s", progressMsgs[i])
143143
}
144144
progressIndex = len(progressMsgs)
145145
}
146146

147147
info := t.GetInfo()
148148
if info != nil {
149-
log.Infof("Kurtosis: %s", info.InfoMessage)
149+
log.Infof("[Kurtosis] %s", info.InfoMessage)
150150
}
151151

152152
warn := t.GetWarning()
153153
if warn != nil {
154-
log.Warnf("Kurtosis: %s", warn.WarningMessage)
154+
log.Warnf("[Kurtosis] %s", warn.WarningMessage)
155155
}
156156

157157
e := t.GetError()
158158
if e != nil {
159-
log.Errorf("Kurtosis: %s", e.String())
159+
log.Errorf("[Kurtosis] %s", e.String())
160160
return stacktrace.Propagate(errors.New("kurtosis deployment failed during execution"), "%s", e.String())
161161
}
162162

@@ -166,17 +166,17 @@ func StartNetwork(ctx context.Context, enclaveCtx *EnclaveContextWrapper, harnes
166166

167167
insRes := t.GetInstructionResult()
168168
if insRes != nil {
169-
log.Infof("Kurtosis: %s", insRes.SerializedInstructionResult)
169+
log.Infof("[Kurtosis] %s", insRes.SerializedInstructionResult)
170170
}
171171

172172
finishRes := t.GetRunFinishedEvent()
173173
if finishRes != nil {
174-
log.Infof("Kurtosis: %s", finishRes.GetSerializedOutput())
174+
log.Infof("[Kurtosis] %s", finishRes.GetSerializedOutput())
175175
if finishRes.IsRunSuccessful {
176-
log.Info("Kurtosis: Devnet genesis successful. Passing back to Attacknet")
176+
log.Info("[Kurtosis] Devnet genesis successful. Passing back to Attacknet")
177177
return nil
178178
} else {
179-
log.Error("Kurtosis: There was an error during genesis.")
179+
log.Error("[Kurtosis] There was an error during genesis.")
180180
return stacktrace.Propagate(errors.New("kurtosis deployment failed"), "%s", finishRes.GetSerializedOutput())
181181
}
182182
}

0 commit comments

Comments
 (0)