-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathmerge_test.go
179 lines (167 loc) · 4.41 KB
/
merge_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package pipeline
import (
"errors"
"fmt"
"testing"
"time"
)
// task will wait for a specified duration of time before returning a certain number of errors
type task struct {
id string
errorCount int
waitFor time.Duration
}
// do performs the task
func (t task) do() <-chan error {
out := make(chan error)
go func() {
defer close(out)
time.Sleep(t.waitFor)
for i := 0; i < t.errorCount; i++ {
out <- fmt.Errorf("[task %s] error %d", t.id, i)
}
}()
return out
}
// TestMerge makes sure that the merged chan
// 1. Closes after all of its child chans close
// 2. Receives all error messages from the error chans
// 3. Stays open if one of its child chans never closes
func TestMerge(t *testing.T) {
t.Parallel()
maxTestDuration := time.Second
for _, test := range []struct {
description string
finishBefore time.Duration
expectedErrors []error
tasks []task
}{{
description: "Closes after all of its error chans close",
finishBefore: time.Second,
tasks: []task{{
id: "a",
waitFor: 250 * time.Millisecond,
}, {
id: "b",
waitFor: 500 * time.Millisecond,
}, {
id: "c",
waitFor: 750 * time.Millisecond,
}},
}, {
description: "Receives all errors from all of its error chans",
finishBefore: time.Second,
expectedErrors: []error{
errors.New("[task a] error 0"),
errors.New("[task c] error 0"),
errors.New("[task c] error 1"),
errors.New("[task c] error 2"),
errors.New("[task b] error 0"),
errors.New("[task b] error 1"),
},
tasks: []task{{
id: "a",
waitFor: 250 * time.Millisecond,
errorCount: 1,
}, {
id: "b",
waitFor: 750 * time.Millisecond,
errorCount: 2,
}, {
id: "c",
waitFor: 500 * time.Millisecond,
errorCount: 3,
}},
}, {
description: "Stays open if one of its chans never closes",
expectedErrors: []error{
errors.New("[task c] error 0"),
errors.New("[task b] error 0"),
errors.New("[task b] error 1"),
},
tasks: []task{{
id: "a",
waitFor: 2 * maxTestDuration,
// We shoud expect to 'never' receive this error, because it will emit after the maxTestDuration
errorCount: 1,
}, {
id: "b",
waitFor: 750 * time.Millisecond,
errorCount: 2,
}, {
id: "c",
waitFor: 500 * time.Millisecond,
errorCount: 1,
}},
}, {
description: "Single channel passes through",
expectedErrors: []error{
errors.New("[task a] error 0"),
errors.New("[task a] error 1"),
errors.New("[task a] error 2"),
},
tasks: []task{{
id: "a",
waitFor: 0,
// We shoud expect to 'never' receive this error, because it will emit after the maxTestDuration
errorCount: 3,
}},
}, {
description: "Closed channel returned",
expectedErrors: []error{},
tasks: []task{},
}} {
t.Run(test.description, func(t *testing.T) {
t.Parallel()
// Start doing all of the tasks
var errChans []<-chan error
for _, task := range test.tasks {
errChans = append(errChans, task.do())
}
// Merge all of their error channels together
var errs []error
merged := Merge[error](errChans...)
// Create the timeout
timeout := time.After(maxTestDuration)
if test.finishBefore > 0 {
timeout = time.After(test.finishBefore)
}
loop:
for {
select {
case i, ok := <-merged:
if !ok {
// The chan has closed
break loop
} else if err, ok := i.(error); ok {
errs = append(errs, err)
} else {
t.Errorf("'%+v' is not an error!", i)
return
}
case <-timeout:
if isExpected := test.finishBefore == 0; isExpected {
// We're testing that open channels cause a timeout
break loop
}
t.Error("timed out!")
}
}
// Check that all of the expected errors match the errors we received
lenErrs, lenExpectedErros := len(errs), len(test.expectedErrors)
for i, expectedError := range test.expectedErrors {
if i >= lenErrs {
t.Errorf("expectedErrors[%d]: '%s' != <nil>", i, expectedError)
} else if err := errs[i]; expectedError.Error() != err.Error() {
t.Errorf("expectedErrors[%d]: '%s' != %s", i, expectedError, err)
}
}
// Check that we have no additional error messages other than the ones we expected
if hasTooManyErrors := lenErrs > lenExpectedErros; hasTooManyErrors {
for _, err := range errs[lenExpectedErros-1:] {
t.Errorf("'%s' is unexpected!", err)
}
}
})
}
}