Skip to content

Commit c5ca2cb

Browse files
tigrannajaryanjohnleslie
authored andcommitted
[stefreceiver] Correctly handle Shutdown request (open-telemetry#40082)
#### Description Previously the receiver would get stuck in Shutdown if there were open connections. Now we terminate open connections if Shutdown is requested and make sure Shutdown proceeds. #### Testing Added unit test and tested manually.
1 parent 6e1b658 commit c5ca2cb

File tree

4 files changed

+115
-25
lines changed

4 files changed

+115
-25
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: stefreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Correctly handle Shutdown request
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40082]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: The receiver now correctly Shutdown even if there are active connections
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/stefreceiver/e2e_test.go

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,50 +12,87 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"go.opentelemetry.io/collector/component/componenttest"
1414
"go.opentelemetry.io/collector/consumer/consumertest"
15+
"go.opentelemetry.io/collector/exporter"
1516
"go.opentelemetry.io/collector/exporter/exportertest"
1617
"go.opentelemetry.io/collector/pdata/pcommon"
1718
"go.opentelemetry.io/collector/pdata/pmetric"
19+
"go.opentelemetry.io/collector/receiver"
1820
"go.opentelemetry.io/collector/receiver/receivertest"
1921
"go.uber.org/zap"
2022

2123
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter"
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
2225
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stefreceiver/internal/metadata"
2326
)
2427

25-
func TestRoundtrip(t *testing.T) {
28+
func genMetrics() pmetric.Metrics {
29+
data := pmetric.NewMetrics()
30+
metricPoint := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
31+
metricPoint.SetName("foo")
32+
gauge := metricPoint.SetEmptyGauge()
33+
dp := gauge.DataPoints().AppendEmpty()
34+
dp.SetIntValue(1)
35+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
36+
return data
37+
}
38+
39+
func createReceiver(t *testing.T, endpoint string) (receiver.Metrics, *consumertest.MetricsSink) {
2640
sink := &consumertest.MetricsSink{}
2741
settings := receivertest.NewNopSettings(metadata.Type)
2842
settings.Logger, _ = zap.NewDevelopment()
29-
m, err := NewFactory().CreateMetrics(context.Background(), settings, createDefaultConfig(), sink)
30-
t.Cleanup(func() {
31-
err = m.Shutdown(context.Background())
32-
require.NoError(t, err)
33-
})
43+
rcvCfg := createDefaultConfig()
44+
rcvCfg.(*Config).NetAddr.Endpoint = endpoint
45+
m, err := NewFactory().CreateMetrics(context.Background(), settings, rcvCfg, sink)
3446
require.NoError(t, m.Start(context.Background(), componenttest.NewNopHost()))
3547
require.NoError(t, err)
48+
return m, sink
49+
}
50+
51+
func createExporter(t *testing.T, endpoint string) exporter.Metrics {
3652
cfg := stefexporter.NewFactory().CreateDefaultConfig().(*stefexporter.Config)
37-
cfg.Endpoint = "localhost:4320"
53+
cfg.Endpoint = endpoint
3854
cfg.TLSSetting.Insecure = true
39-
exporterSettings := exportertest.NewNopSettings(metadata.Type)
40-
exporterSettings.Logger, _ = zap.NewDevelopment()
41-
exporter, err := stefexporter.NewFactory().CreateMetrics(context.Background(), exporterSettings, cfg)
55+
settings := exportertest.NewNopSettings(metadata.Type)
56+
settings.Logger, _ = zap.NewDevelopment()
57+
exp, err := stefexporter.NewFactory().CreateMetrics(context.Background(), settings, cfg)
4258
require.NoError(t, err)
43-
t.Cleanup(func() {
44-
err = exporter.Shutdown(context.Background())
45-
require.NoError(t, err)
46-
})
59+
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
60+
return exp
61+
}
4762

48-
require.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
49-
data := pmetric.NewMetrics()
50-
metricPoint := data.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
51-
metricPoint.SetName("foo")
52-
gauge := metricPoint.SetEmptyGauge()
53-
dp := gauge.DataPoints().AppendEmpty()
54-
dp.SetIntValue(1)
55-
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
56-
err = exporter.ConsumeMetrics(context.Background(), data)
63+
func TestRoundtrip(t *testing.T) {
64+
endpoint := testutil.GetAvailableLocalAddress(t)
65+
m, sink := createReceiver(t, endpoint)
66+
t.Cleanup(func() { require.NoError(t, m.Shutdown(context.Background())) })
67+
68+
exporter := createExporter(t, endpoint)
69+
t.Cleanup(func() { require.NoError(t, exporter.Shutdown(context.Background())) })
70+
71+
err := exporter.ConsumeMetrics(context.Background(), genMetrics())
5772
require.NoError(t, err)
5873
assert.EventuallyWithT(t, func(tt *assert.CollectT) {
5974
assert.Len(tt, sink.AllMetrics(), 1)
6075
}, 1*time.Minute, 10*time.Millisecond)
6176
}
77+
78+
func TestShutdownWhenConnected(t *testing.T) {
79+
endpoint := testutil.GetAvailableLocalAddress(t)
80+
receiver, sink := createReceiver(t, endpoint)
81+
exporter := createExporter(t, endpoint)
82+
83+
require.NoError(t, exporter.ConsumeMetrics(context.Background(), genMetrics()))
84+
85+
assert.EventuallyWithT(
86+
t, func(tt *assert.CollectT) {
87+
assert.Len(tt, sink.AllMetrics(), 1)
88+
}, 1*time.Minute, 10*time.Millisecond,
89+
)
90+
91+
// Try shutdown receiver before shutting down exporter.
92+
// This means there is an active connection at the receiver.
93+
// Previously we had a bug causing the receiver Shutdown to hang forever
94+
// in this situation.
95+
require.NoError(t, receiver.Shutdown(context.Background()))
96+
97+
require.NoError(t, exporter.Shutdown(context.Background()))
98+
}

receiver/stefreceiver/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.0
44

55
require (
66
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter v0.126.0
7+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.126.0
78
github.com/splunk/stef/go/grpc v0.0.6
89
github.com/splunk/stef/go/otel v0.0.6
910
github.com/splunk/stef/go/pdata v0.0.6
@@ -18,6 +19,7 @@ require (
1819
go.opentelemetry.io/collector/consumer v1.32.0
1920
go.opentelemetry.io/collector/consumer/consumererror v0.126.0
2021
go.opentelemetry.io/collector/consumer/consumertest v0.126.0
22+
go.opentelemetry.io/collector/exporter v0.126.0
2123
go.opentelemetry.io/collector/exporter/exportertest v0.126.0
2224
go.opentelemetry.io/collector/pdata v1.32.0
2325
go.opentelemetry.io/collector/receiver v1.32.0
@@ -63,7 +65,6 @@ require (
6365
go.opentelemetry.io/collector/config/configopaque v1.32.0 // indirect
6466
go.opentelemetry.io/collector/config/configretry v1.32.0 // indirect
6567
go.opentelemetry.io/collector/consumer/xconsumer v0.126.0 // indirect
66-
go.opentelemetry.io/collector/exporter v0.126.0 // indirect
6768
go.opentelemetry.io/collector/exporter/xexporter v0.126.0 // indirect
6869
go.opentelemetry.io/collector/extension v1.32.0 // indirect
6970
go.opentelemetry.io/collector/extension/extensionauth v1.32.0 // indirect

receiver/stefreceiver/stef.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"context"
77
"errors"
88
"net"
9+
"sync/atomic"
10+
"time"
911

1012
stefgrpc "github.com/splunk/stef/go/grpc"
1113
"github.com/splunk/stef/go/grpc/stef_proto"
@@ -32,11 +34,14 @@ type stefReceiver struct {
3234
nextMetricsConsumer consumer.Metrics
3335
settings receiver.Settings
3436

35-
eg errgroup.Group
37+
stopping atomic.Bool
38+
eg errgroup.Group
3639
}
3740

3841
// Start runs the STEF gRPC receiver.
3942
func (r *stefReceiver) Start(ctx context.Context, host component.Host) error {
43+
r.stopping.Store(false)
44+
4045
var err error
4146
if r.serverGRPC, err = r.cfg.ToServer(ctx, host, r.settings.TelemetrySettings); err != nil {
4247
return err
@@ -75,8 +80,22 @@ func (r *stefReceiver) Start(ctx context.Context, host component.Host) error {
7580

7681
// Shutdown is a method to turn off receiving.
7782
func (r *stefReceiver) Shutdown(_ context.Context) error {
83+
r.stopping.Store(true)
84+
7885
if r.serverGRPC != nil {
86+
r.settings.Logger.Info("Stopping STEF/gRPC server", zap.String("endpoint", r.cfg.NetAddr.Endpoint))
87+
88+
// Give graceful stop a second to finish.
89+
timer := time.AfterFunc(
90+
1*time.Second, func() {
91+
r.settings.Logger.Info("STEF/gRPC server couldn't stop gracefully in time. Doing force stop.")
92+
r.serverGRPC.Stop()
93+
},
94+
)
95+
defer timer.Stop()
96+
7997
r.serverGRPC.GracefulStop()
98+
r.settings.Logger.Debug("STEF/gRPC server stopped.")
8099
}
81100

82101
return r.eg.Wait()
@@ -100,6 +119,12 @@ func (r *stefReceiver) onStream(grpcReader stefgrpc.GrpcReader, stream stefgrpc.
100119

101120
// Read, decode, convert the incoming data and push it to the next consumer.
102121
for {
122+
if r.stopping.Load() {
123+
// The receiver is shutting down. Close the connection.
124+
r.settings.Logger.Debug("Shutdown requested. Closing STEF/gRPC connection.")
125+
return nil
126+
}
127+
103128
respError := resp.LastError()
104129
if respError != nil {
105130
// We had problem sending responses. Can't continue using this connection since

0 commit comments

Comments
 (0)