-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Expand file tree
/
Copy pathfailover_test.go
More file actions
129 lines (98 loc) · 4.96 KB
/
failover_test.go
File metadata and controls
129 lines (98 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package failoverconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector"
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pipeline"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/metadata"
)
func TestFailoverRecovery(t *testing.T) {
tracesFirst := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/first")
tracesSecond := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/second")
tracesThird := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/third")
tracesFourth := pipeline.NewIDWithName(pipeline.SignalTraces, "traces/fourth")
cfg := &Config{
PipelinePriority: [][]pipeline.ID{{tracesFirst}, {tracesSecond}, {tracesThird}, {tracesFourth}},
RetryInterval: 50 * time.Millisecond,
}
tr := sampleTrace()
newTestConnector := func(t *testing.T) (*tracesFailover, *tracesRouter, *consumertest.TracesSink, *consumertest.TracesSink, *consumertest.TracesSink, *consumertest.TracesSink) {
t.Helper()
var sinkFirst, sinkSecond, sinkThird, sinkFourth consumertest.TracesSink
router := connector.NewTracesRouter(map[pipeline.ID]consumer.Traces{
tracesFirst: &sinkFirst,
tracesSecond: &sinkSecond,
tracesThird: &sinkThird,
tracesFourth: &sinkFourth,
})
conn, err := NewFactory().CreateTracesToTraces(t.Context(),
connectortest.NewNopSettings(metadata.Type), cfg, router.(consumer.Traces))
require.NoError(t, err)
failoverConnector := conn.(*tracesFailover)
t.Cleanup(func() {
assert.NoError(t, failoverConnector.Shutdown(t.Context()))
})
return failoverConnector, failoverConnector.failover, &sinkFirst, &sinkSecond, &sinkThird, &sinkFourth
}
t.Run("single failover recovery to primary consumer: level 2 -> 1", func(t *testing.T) {
failoverConnector, tRouter, sinkFirst, _, _, _ := newTestConnector(t)
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
require.NoError(t, failoverConnector.ConsumeTraces(t.Context(), tr))
idx := failoverConnector.failover.TestGetCurrentConsumerIndex()
require.Equal(t, 1, idx)
failoverConnector.failover.ModifyConsumerAtIndex(0, sinkFirst)
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 0, tr)
}, 3*time.Second, 5*time.Millisecond)
})
t.Run("double failover recovery: level 3 -> 2 -> 1", func(t *testing.T) {
failoverConnector, tRouter, sinkFirst, sinkSecond, _, _ := newTestConnector(t)
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 2, tr)
}, 3*time.Second, 5*time.Millisecond)
// Simulate recovery of exporter
failoverConnector.failover.ModifyConsumerAtIndex(1, sinkSecond)
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 1, tr)
}, 3*time.Second, 5*time.Millisecond)
failoverConnector.failover.ModifyConsumerAtIndex(0, sinkFirst)
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 0, tr)
}, 3*time.Second, 5*time.Millisecond)
})
t.Run("multiple failover recovery: level 3 -> 2 -> 4 -> 3 -> 1", func(t *testing.T) {
failoverConnector, tRouter, sinkFirst, sinkSecond, sinkThird, _ := newTestConnector(t)
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 2, tr)
}, 3*time.Second, 5*time.Millisecond)
// Simulate recovery of exporter
failoverConnector.failover.ModifyConsumerAtIndex(1, sinkSecond)
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 1, tr)
}, 3*time.Second, 5*time.Millisecond)
failoverConnector.failover.ModifyConsumerAtIndex(2, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 3, tr)
}, 3*time.Second, 5*time.Millisecond)
failoverConnector.failover.ModifyConsumerAtIndex(2, sinkThird)
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 2, tr)
}, 3*time.Second, 5*time.Millisecond)
failoverConnector.failover.ModifyConsumerAtIndex(0, sinkFirst)
require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(tRouter, 0, tr)
}, 3*time.Second, 5*time.Millisecond)
})
}