Skip to content

Commit 0ca32f1

Browse files
authored
NETOBSERV-2489: reconnect timer (#854)
- Introduce a configurable reconnection timer + entropy on GRPC client - Add tests
1 parent 07634ba commit 0ca32f1

File tree

6 files changed

+239
-45
lines changed

6 files changed

+239
-45
lines changed

docs/config.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ The following environment variables are available to configure the NetObserv eBP
99
* `TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow or packet collector.
1010
* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC
1111
message. Messages larger than that number will be split and submitted sequentially.
12+
* `GRPC_RECONNECT_TIMER` specifies a period after which the GRPC connection is re-established. This is
13+
useful for load rebalancing across receivers. Disabled by default, which means
14+
connections are not actively re-established.
15+
* `GRPC_RECONNECT_TIMER_RANDOMIZATION` specifies how much `GRPC_RECONNECT_TIMER` should be randomized,
16+
to avoid several agents reconnecting all at the same time. The value must be lower than `GRPC_RECONNECT_TIMER`.
17+
For instance, if `GRPC_RECONNECT_TIMER` is 5m and `GRPC_RECONNECT_TIMER_RANDOMIZATION` is 30s,
18+
the randomization yields a value between 4m30s and 5m30s.
1219
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
1320
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
1421
address from in order to report it in the AgentIP field on each flow. Accepted values are:

pkg/agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func buildGRPCExporter(cfg *config.Agent, m *metrics.Metrics) (node.TerminalFunc
263263
return nil, fmt.Errorf("missing target host or port: %s:%d",
264264
cfg.TargetHost, cfg.TargetPort)
265265
}
266-
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.TargetTLSCACertPath, cfg.TargetTLSUserCertPath, cfg.TargetTLSUserKeyPath, cfg.GRPCMessageMaxFlows, m)
266+
grpcExporter, err := exporter.StartGRPCProto(cfg, m)
267267
if err != nil {
268268
return nil, err
269269
}

pkg/config/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ type Agent struct {
110110
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
111111
// larger than that number will be split and submitted sequentially.
112112
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
113+
// GRPCReconnectTimer specifies a period after which the GRPC connection is re-established. This is
114+
// useful for load rebalancing across receivers. Disabled by default, which means
115+
// connections are not actively re-established.
116+
GRPCReconnectTimer time.Duration `env:"GRPC_RECONNECT_TIMER"`
117+
// GRPCReconnectTimerRandomization specifies how much GRPCReconnectTimer should be randomized,
118+
// to avoid several agents reconnecting all at the same time. The value must be lower than GRPCReconnectTimer.
119+
// For instance, if GRPCReconnectTimer is 5m and GRPCReconnectTimerRandomization is 30s,
120+
// the randomization yields a value between 4m30s and 5m30s.
121+
GRPCReconnectTimerRandomization time.Duration `env:"GRPC_RECONNECT_TIMER_RANDOMIZATION"`
113122
// Interfaces contains the interface names from where flows will be collected. If empty, the agent
114123
// will fetch all the interfaces in the system, excepting the ones listed in ExcludeInterfaces.
115124
// If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression,

pkg/exporter/grpc_proto.go

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package exporter
22

33
import (
44
"context"
5+
"math/rand/v2"
6+
"sync"
7+
"time"
58

9+
"github.com/netobserv/netobserv-ebpf-agent/pkg/config"
610
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
711
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
812
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
@@ -21,51 +25,120 @@ const componentGRPC = "grpc"
2125
// by its input channel, converts them to *pbflow.Records instances, and submits
2226
// them to the collector.
2327
type GRPCProto struct {
24-
hostIP string
25-
hostPort int
26-
clientConn *grpc.ClientConnection
28+
hostIP string
29+
hostPort int
30+
caCertPath string
31+
userCertPath string
32+
userKeyPath string
33+
m sync.RWMutex
34+
clientConn *grpc.ClientConnection
2735
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
2836
// If a message contains more flows than this number, the GRPC message will be split into
2937
// multiple messages.
3038
maxFlowsPerMessage int
39+
reconnectTimer time.Duration
3140
metrics *metrics.Metrics
32-
batchCounter prometheus.Counter
41+
batchCounterMetric prometheus.Counter
3342
}
3443

35-
func StartGRPCProto(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
36-
clientConn, err := grpc.ConnectClient(hostIP, hostPort, caPath, userCertPath, userKeyPath)
37-
if err != nil {
44+
func StartGRPCProto(cfg *config.Agent, m *metrics.Metrics) (*GRPCProto, error) {
45+
exporter := GRPCProto{
46+
hostIP: cfg.TargetHost,
47+
hostPort: cfg.TargetPort,
48+
caCertPath: cfg.TargetTLSCACertPath,
49+
userCertPath: cfg.TargetTLSUserCertPath,
50+
userKeyPath: cfg.TargetTLSUserKeyPath,
51+
maxFlowsPerMessage: cfg.GRPCMessageMaxFlows,
52+
reconnectTimer: randomizeTimer(cfg),
53+
metrics: m,
54+
batchCounterMetric: m.CreateBatchCounter(componentGRPC),
55+
}
56+
if err := exporter.reconnect(); err != nil {
3857
return nil, err
3958
}
40-
return &GRPCProto{
41-
hostIP: hostIP,
42-
hostPort: hostPort,
43-
clientConn: clientConn,
44-
maxFlowsPerMessage: maxFlowsPerMessage,
45-
metrics: m,
46-
batchCounter: m.CreateBatchCounter(componentGRPC),
47-
}, nil
59+
return &exporter, nil
60+
}
61+
62+
func (g *GRPCProto) reconnect() error {
63+
g.m.Lock()
64+
defer g.m.Unlock()
65+
if g.clientConn != nil {
66+
if err := g.clientConn.Close(); err != nil {
67+
return err
68+
}
69+
}
70+
clientConn, err := grpc.ConnectClient(g.hostIP, g.hostPort, g.caCertPath, g.userCertPath, g.userKeyPath)
71+
if err != nil {
72+
return err
73+
}
74+
g.clientConn = clientConn
75+
return nil
4876
}
4977

5078
// ExportFlows accepts slices of *model.Record by its input channel, converts them
5179
// to *pbflow.Records instances, and submits them to the collector.
5280
func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) {
5381
socket := utils.GetSocket(g.hostIP, g.hostPort)
5482
log := glog.WithField("collector", socket)
55-
for inputRecords := range input {
56-
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
57-
for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage) {
58-
log.Debugf("sending %d records", len(pbRecords.Entries))
59-
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
60-
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage", metrics.HighSeverity).Inc()
61-
log.WithError(err).Error("couldn't send flow records to collector")
83+
84+
if g.reconnectTimer > 0 {
85+
ticker := time.NewTicker(g.reconnectTimer)
86+
log.Infof("Reconnect timer set to: %v", g.reconnectTimer)
87+
done := make(chan bool)
88+
defer func() {
89+
ticker.Stop()
90+
done <- true
91+
}()
92+
go func() {
93+
for {
94+
select {
95+
case <-done:
96+
return
97+
case <-ticker.C:
98+
// Re-establish the connection
99+
if err := g.reconnect(); err != nil {
100+
log.WithError(err).Warn("couldn't reconnect the GRPC export client")
101+
g.metrics.Errors.WithErrorName(componentGRPC, "CannotReconnectClient", metrics.HighSeverity).Inc()
102+
}
103+
}
62104
}
63-
g.batchCounter.Inc()
64-
g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries)))
65-
}
105+
}()
106+
}
107+
for inputRecords := range input {
108+
g.sendBatch(inputRecords, log)
66109
}
67110
if err := g.clientConn.Close(); err != nil {
68111
log.WithError(err).Warn("couldn't close flow export client")
69112
g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient", metrics.MediumSeverity).Inc()
70113
}
71114
}
115+
116+
func (g *GRPCProto) sendBatch(batch []*model.Record, log *logrus.Entry) {
117+
g.m.RLock()
118+
defer g.m.RUnlock()
119+
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
120+
for _, pbRecords := range pbflow.FlowsToPB(batch, g.maxFlowsPerMessage) {
121+
log.Debugf("sending %d records", len(pbRecords.Entries))
122+
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
123+
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage", metrics.HighSeverity).Inc()
124+
log.WithError(err).Error("couldn't send flow records to collector")
125+
}
126+
g.batchCounterMetric.Inc()
127+
g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries)))
128+
}
129+
}
130+
131+
func randomizeTimer(cfg *config.Agent) time.Duration {
132+
if cfg.GRPCReconnectTimer <= 0 {
133+
return 0
134+
}
135+
timer := cfg.GRPCReconnectTimer
136+
if cfg.GRPCReconnectTimerRandomization <= 0 || cfg.GRPCReconnectTimerRandomization >= timer {
137+
return timer
138+
}
139+
timer += time.Duration(rand.Int64N(2*int64(cfg.GRPCReconnectTimerRandomization)) - int64(cfg.GRPCReconnectTimerRandomization))
140+
if timer < 0 {
141+
return time.Second
142+
}
143+
return timer
144+
}

pkg/exporter/grpc_proto_test.go

Lines changed: 116 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
package exporter
22

33
import (
4+
"context"
5+
"fmt"
46
"net"
57
"testing"
68
"time"
79

10+
"github.com/netobserv/netobserv-ebpf-agent/pkg/config"
11+
grpcflow "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
812
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
913
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
1014
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
1115
test2 "github.com/netobserv/netobserv-ebpf-agent/pkg/test"
1216

1317
"github.com/mariomac/guara/pkg/test"
1418
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
15-
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
1619
"github.com/stretchr/testify/assert"
1720
"github.com/stretchr/testify/require"
21+
"google.golang.org/grpc/peer"
1822
)
1923

2024
const timeout = 2 * time.Second
@@ -24,12 +28,17 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
2428
port, err := test.FreeTCPPort()
2529
require.NoError(t, err)
2630
serverOut := make(chan *pbflow.Records)
27-
coll, err := grpc.StartCollector(port, serverOut)
31+
coll, err := grpcflow.StartCollector(port, serverOut)
2832
require.NoError(t, err)
2933
defer coll.Close()
3034

3135
// Start GRPCProto exporter stage
32-
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", 1000, metrics.NoOp())
36+
cfg := config.Agent{
37+
TargetHost: "127.0.0.1",
38+
TargetPort: port,
39+
GRPCMessageMaxFlows: 1000,
40+
}
41+
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
3342
require.NoError(t, err)
3443

3544
// Send some flows to the input of the exporter stage
@@ -66,12 +75,17 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
6675
port, err := test.FreeTCPPort()
6776
require.NoError(t, err)
6877
serverOut := make(chan *pbflow.Records)
69-
coll, err := grpc.StartCollector(port, serverOut)
78+
coll, err := grpcflow.StartCollector(port, serverOut)
7079
require.NoError(t, err)
7180
defer coll.Close()
7281

7382
// Start GRPCProto exporter stage
74-
exporter, err := StartGRPCProto("::1", port, "", "", "", 1000, metrics.NoOp())
83+
cfg := config.Agent{
84+
TargetHost: "::1",
85+
TargetPort: port,
86+
GRPCMessageMaxFlows: 1000,
87+
}
88+
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
7589
require.NoError(t, err)
7690

7791
// Send some flows to the input of the exporter stage
@@ -108,13 +122,17 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
108122
port, err := test.FreeTCPPort()
109123
require.NoError(t, err)
110124
serverOut := make(chan *pbflow.Records)
111-
coll, err := grpc.StartCollector(port, serverOut)
125+
coll, err := grpcflow.StartCollector(port, serverOut)
112126
require.NoError(t, err)
113127
defer coll.Close()
114128

115-
const msgMaxLen = 10000
116129
// Start GRPCProto exporter stage
117-
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", msgMaxLen, metrics.NoOp())
130+
cfg := config.Agent{
131+
TargetHost: "127.0.0.1",
132+
TargetPort: port,
133+
GRPCMessageMaxFlows: 10_000,
134+
}
135+
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
118136
require.NoError(t, err)
119137

120138
// Send a message much longer than the limit length
@@ -130,9 +148,9 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
130148

131149
// expect that the submitted message is split in chunks no longer than msgMaxLen
132150
rs := test2.ReceiveTimeout(t, serverOut, timeout)
133-
assert.Len(t, rs.Entries, msgMaxLen)
151+
assert.Len(t, rs.Entries, cfg.GRPCMessageMaxFlows)
134152
rs = test2.ReceiveTimeout(t, serverOut, timeout)
135-
assert.Len(t, rs.Entries, msgMaxLen)
153+
assert.Len(t, rs.Entries, cfg.GRPCMessageMaxFlows)
136154
rs = test2.ReceiveTimeout(t, serverOut, timeout)
137155
assert.Len(t, rs.Entries, 5000)
138156

@@ -144,3 +162,91 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
144162
// ok!
145163
}
146164
}
165+
166+
type collectorAPI struct {
167+
pbflow.UnimplementedCollectorServer
168+
recordForwarder chan<- *pbflow.Records
169+
lastClient string
170+
clientOccurrences []int
171+
}
172+
173+
func (c *collectorAPI) Send(ctx context.Context, records *pbflow.Records) (*pbflow.CollectorReply, error) {
174+
if p, ok := peer.FromContext(ctx); ok {
175+
addr := p.Addr.String()
176+
if len(c.clientOccurrences) == 0 || addr != c.lastClient {
177+
c.clientOccurrences = append(c.clientOccurrences, 1)
178+
c.lastClient = addr
179+
} else {
180+
c.clientOccurrences[len(c.clientOccurrences)-1]++
181+
}
182+
}
183+
c.recordForwarder <- records
184+
return &pbflow.CollectorReply{}, nil
185+
}
186+
187+
func TestConnectionReset(t *testing.T) {
188+
// start remote ingestor
189+
port, err := test.FreeTCPPort()
190+
require.NoError(t, err)
191+
serverOut := make(chan *pbflow.Records)
192+
193+
api := collectorAPI{
194+
recordForwarder: serverOut,
195+
clientOccurrences: []int{},
196+
}
197+
coll, err := grpcflow.StartCollectorWithAPI(port, &api)
198+
require.NoError(t, err)
199+
defer coll.Close()
200+
201+
// Start GRPCProto exporter stage
202+
cfg := config.Agent{
203+
TargetHost: "127.0.0.1",
204+
TargetPort: port,
205+
GRPCMessageMaxFlows: 1000,
206+
GRPCReconnectTimer: 500 * time.Millisecond,
207+
}
208+
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
209+
require.NoError(t, err)
210+
211+
// Send some flows to the input of the exporter stage
212+
nFlows := 5
213+
flows := make(chan []*model.Record, nFlows+1)
214+
go exporter.ExportFlows(flows)
215+
go func() {
216+
for i := range nFlows {
217+
flows <- []*model.Record{
218+
{AgentIP: net.ParseIP(fmt.Sprintf("10.9.8.%d", i)), Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{}}},
219+
}
220+
time.Sleep(300 * time.Millisecond)
221+
}
222+
}()
223+
224+
for i := 0; i < nFlows; i++ {
225+
rs := test2.ReceiveTimeout(t, serverOut, timeout)
226+
assert.Len(t, rs.Entries, 1)
227+
r := rs.Entries[0]
228+
assert.EqualValues(t, 0x0a090800+i, r.GetAgentIp().GetIpv4())
229+
}
230+
231+
// Expect flows sent at +0ms, +300ms | +600ms, +900ms | +1200ms ("|" means reconnect event / new client detected)
232+
// If it ends up too flaky, we can increase the durations, e.g. over 5s, or relax the assertion
233+
assert.Equal(t, []int{2, 2, 1}, api.clientOccurrences)
234+
235+
select {
236+
case rs := <-serverOut:
237+
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
238+
default:
239+
// ok!
240+
}
241+
}
242+
243+
func TestRandomizeMaxMessages(t *testing.T) {
244+
for i := 0; i < 1000; i++ {
245+
v := randomizeTimer(&config.Agent{
246+
GRPCReconnectTimer: 5 * time.Minute,
247+
GRPCReconnectTimerRandomization: 30 * time.Second,
248+
})
249+
assert.GreaterOrEqual(t, v, 270*time.Second /*4m30s*/)
250+
assert.LessOrEqual(t, v, 330*time.Second /*5m30s*/)
251+
}
252+
}

0 commit comments

Comments
 (0)