Skip to content

Commit a1424c1

Browse files
authored
[QT-420] Ensure that we exit 1 if we exceed our deadline (#109)
Signed-off-by: Ryan Cragun <[email protected]>
1 parent a089528 commit a1424c1

File tree

5 files changed

+144
-4
lines changed

5 files changed

+144
-4
lines changed

acceptance/scenario_run_test.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestAcc_Cmd_Scenario_Run(t *testing.T) {
4444

4545
enos := newAcceptanceRunner(t, skipUnlessTerraformCLI())
4646

47-
tmpDir, err := os.MkdirTemp("/tmp", "enos.launch")
47+
tmpDir, err := os.MkdirTemp("/tmp", "enos.run")
4848
require.NoError(t, err)
4949
t.Cleanup(func() { os.RemoveAll(tmpDir) })
5050

@@ -114,3 +114,43 @@ func TestAcc_Cmd_Scenario_Run(t *testing.T) {
114114
})
115115
}
116116
}
117+
118+
// TestAcc_Cmd_Scenario_Run_Timeout tests that a scenario that times out should fail.
119+
func TestAcc_Cmd_Scenario_Run_Timeout(t *testing.T) {
120+
t.Parallel()
121+
122+
for _, test := range []struct {
123+
dir string
124+
name string
125+
uid string
126+
}{
127+
{
128+
"scenario_run_timeout",
129+
"timeout",
130+
fmt.Sprintf("%x", sha256.Sum256([]byte("timeout"))),
131+
},
132+
} {
133+
test := test
134+
t.Run(fmt.Sprintf("%s %s", test.dir, test.name), func(t *testing.T) {
135+
t.Parallel()
136+
137+
enos := newAcceptanceRunner(t, skipUnlessTerraformCLI())
138+
139+
tmpDir, err := os.MkdirTemp("/tmp", "enos.run")
140+
require.NoError(t, err)
141+
t.Cleanup(func() { os.RemoveAll(tmpDir) })
142+
143+
outDir := filepath.Join(tmpDir, test.dir)
144+
err = os.MkdirAll(outDir, 0o755)
145+
require.NoError(t, err)
146+
outDir, err = filepath.EvalSymlinks(outDir)
147+
require.NoError(t, err)
148+
path, err := filepath.Abs(filepath.Join("./scenarios", test.dir))
149+
require.NoError(t, err)
150+
151+
cmd := fmt.Sprintf("scenario run --chdir %s --out %s --format json --timeout 1s %s", path, outDir, test.name)
152+
out, err := enos.run(context.Background(), cmd)
153+
require.Error(t, err, string(out))
154+
})
155+
}
156+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module "sleep" {
2+
source = "./modules/sleep"
3+
}
4+
5+
scenario "timeout" {
6+
step "sleep" {
7+
module = module.sleep
8+
}
9+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
resource "time_sleep" "wait_5s" {
2+
create_duration = "5s"
3+
}

internal/client/operation_stream.go

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,104 @@ func (c *Connection) StreamOperations(
3838
return res
3939
}
4040

41-
res.Responses = c.streamResponses(ctx, opRes.GetOperations(), ui)
41+
var moreDiags []*pb.Diagnostic
42+
res.Responses, moreDiags = c.streamResponses(ctx, opRes.GetOperations(), ui)
43+
res.Diagnostics = append(res.GetDiagnostics(), moreDiags...)
4244

4345
return res
4446
}
4547

4648
// streamResponses takes a context, workspace, and slice of operation references
4749
// and streams operation events to the ui. It will return a slice of operation
4850
// responses for each stream that completes.
51+
//
52+
//nolint:cyclop // This could probably be refactored to be less complex but right now its inlined.
4953
func (c *Connection) streamResponses(
5054
ctx context.Context,
5155
refs []*pb.Ref_Operation,
5256
ui uipkg.View,
53-
) []*pb.Operation_Response {
57+
) ([]*pb.Operation_Response, []*pb.Diagnostic) {
5458
mu := sync.Mutex{}
5559
wg := sync.WaitGroup{}
5660
res := []*pb.Operation_Response{}
5761

62+
diags := []*pb.Diagnostic{}
63+
diagC := make(chan *pb.Diagnostic)
64+
doneC := make(chan struct{})
65+
diagWg := sync.WaitGroup{}
66+
67+
select {
68+
case <-ctx.Done():
69+
cause := context.Cause(ctx)
70+
if cause != nil && cause != context.Canceled {
71+
// We have a custom error or our deadline was exceeded.
72+
diags = append(diags, diagnostics.FromErr(cause)...)
73+
}
74+
75+
return res, diags
76+
default:
77+
}
78+
79+
// Start the error diagnostic routine. This collects diagnostics generated at the client level.
80+
// that are unexpected. Per stream request diagnostics will be scoped to each ref.
81+
diagWg.Add(1)
82+
go func() {
83+
defer diagWg.Done()
84+
85+
drainDiags := func() {
86+
for {
87+
select {
88+
case diag := <-diagC:
89+
diags = append(diags, diag)
90+
91+
continue
92+
default:
93+
}
94+
95+
return
96+
}
97+
}
98+
99+
checkCtx := func() {
100+
err := ctx.Err()
101+
if err == nil {
102+
return
103+
}
104+
cause := context.Cause(ctx)
105+
if cause != context.Canceled {
106+
// We have a custom error or our deadline was exceeded.
107+
diags = append(diags, diagnostics.FromErr(cause)...)
108+
}
109+
}
110+
111+
for {
112+
select {
113+
case diag := <-diagC:
114+
diags = append(diags, diag)
115+
116+
continue
117+
default:
118+
}
119+
120+
select {
121+
case diag := <-diagC:
122+
diags = append(diags, diag)
123+
124+
continue
125+
case <-ctx.Done():
126+
drainDiags()
127+
checkCtx()
128+
129+
return
130+
case <-doneC:
131+
drainDiags()
132+
checkCtx()
133+
134+
return
135+
}
136+
}
137+
}()
138+
58139
for _, ref := range refs {
59140
ref := ref
60141
wg.Add(1)
@@ -110,6 +191,7 @@ func (c *Connection) streamResponses(
110191
for {
111192
select {
112193
case <-ctx.Done():
194+
113195
break LOOP
114196
case err := <-errC:
115197
if err != nil && err != io.EOF {
@@ -168,6 +250,8 @@ func (c *Connection) streamResponses(
168250
}
169251

170252
wg.Wait()
253+
close(doneC)
254+
diagWg.Wait()
171255

172-
return res
256+
return res, diags
173257
}

internal/flightplan/matrix.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,10 @@ func (v *Vector) EqualUnordered(other *Vector) bool {
295295

296296
// Elements returns a list of the Vectors Elements.
297297
func (v *Vector) Elements() []Element {
298+
if v == nil {
299+
return nil
300+
}
301+
298302
return v.elements
299303
}
300304

0 commit comments

Comments
 (0)