Skip to content

Commit 7ae41db

Browse files
committed
Add labels as headers in gRPC connections
1 parent 4c52f40 commit 7ae41db

File tree

7 files changed

+86
-19
lines changed

7 files changed

+86
-19
lines changed

internal/command/command_plugin.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,21 @@ func (cp *CommandPlugin) Info() *bus.Info {
8686
}
8787

8888
func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
89+
ctxWithMetadata := cp.config.NewContextWithLabels(ctx)
90+
8991
switch msg.Topic {
9092
case bus.ConnectionResetTopic:
91-
cp.processConnectionReset(ctx, msg)
93+
cp.processConnectionReset(ctxWithMetadata, msg)
9294
case bus.ResourceUpdateTopic:
93-
cp.processResourceUpdate(ctx, msg)
95+
cp.processResourceUpdate(ctxWithMetadata, msg)
9496
case bus.InstanceHealthTopic:
95-
cp.processInstanceHealth(ctx, msg)
97+
cp.processInstanceHealth(ctxWithMetadata, msg)
9698
case bus.DataPlaneHealthResponseTopic:
97-
cp.processDataPlaneHealth(ctx, msg)
99+
cp.processDataPlaneHealth(ctxWithMetadata, msg)
98100
case bus.DataPlaneResponseTopic:
99-
cp.processDataPlaneResponse(ctx, msg)
101+
cp.processDataPlaneResponse(ctxWithMetadata, msg)
100102
default:
101-
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
103+
slog.DebugContext(ctxWithMetadata, "Command plugin received unknown topic", "topic", msg.Topic)
102104
}
103105
}
104106

internal/config/config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func ResolveConfig() (*Config, error) {
128128
}
129129

130130
checkCollectorConfiguration(collector, config)
131+
addLabelsAsOTelHeaders(collector, config.Labels)
131132

132133
slog.Debug("Agent config", "config", config)
133134
slog.Info("Excluded files from being watched for file changes", "exclude_files",
@@ -207,6 +208,22 @@ func defaultCollector(collector *Collector, config *Config) {
207208
}
208209
}
209210

211+
func addLabelsAsOTelHeaders(collector *Collector, labels map[string]any) {
212+
slog.Debug("Adding labels as headers to collector", "labels", labels)
213+
if collector.Extensions.HeadersSetter != nil {
214+
for key, value := range labels {
215+
valueString, ok := value.(string)
216+
if ok {
217+
collector.Extensions.HeadersSetter.Headers = append(collector.Extensions.HeadersSetter.Headers, Header{
218+
Action: "insert",
219+
Key: key,
220+
Value: valueString,
221+
})
222+
}
223+
}
224+
}
225+
}
226+
210227
func setVersion(version, commit string) {
211228
RootCommand.Version = version + "-" + commit
212229
viperInstance.SetDefault(VersionKey, version)

internal/config/config_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"errors"
1010
"os"
1111
"path"
12+
"sort"
1213
"strings"
1314
"testing"
1415
"time"
@@ -63,6 +64,10 @@ func TestResolveConfig(t *testing.T) {
6364

6465
actual, err := ResolveConfig()
6566
require.NoError(t, err)
67+
sort.Slice(actual.Collector.Extensions.HeadersSetter.Headers, func(i, j int) bool {
68+
headers := actual.Collector.Extensions.HeadersSetter.Headers
69+
return headers[i].Key < headers[j].Key
70+
})
6671
assert.Equal(t, createConfig(), actual)
6772
}
6873

@@ -1056,6 +1061,16 @@ func createConfig() *Config {
10561061
Key: "key",
10571062
Value: "value",
10581063
},
1064+
{
1065+
Action: "insert",
1066+
Key: "label1",
1067+
Value: "label 1",
1068+
},
1069+
{
1070+
Action: "insert",
1071+
Key: "label2",
1072+
Value: "new-value",
1073+
},
10591074
},
10601075
},
10611076
},

internal/config/types.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
package config
77

88
import (
9+
"context"
910
"errors"
1011
"fmt"
1112
"path/filepath"
1213
"strings"
1314
"time"
1415

16+
"google.golang.org/grpc/metadata"
17+
1518
"github.com/google/uuid"
1619
)
1720

@@ -411,6 +414,18 @@ func (c *Config) AreReceiversConfigured() bool {
411414
len(c.Collector.Receivers.TcplogReceivers) > 0
412415
}
413416

417+
func (c *Config) NewContextWithLabels(ctx context.Context) context.Context {
418+
md := metadata.Pairs()
419+
for key, value := range c.Labels {
420+
valueString, ok := value.(string)
421+
if ok {
422+
md.Set(key, valueString)
423+
}
424+
}
425+
426+
return metadata.NewOutgoingContext(ctx, md)
427+
}
428+
414429
func isAllowedDir(dir string, allowedDirs []string) bool {
415430
for _, allowedDirectory := range allowedDirs {
416431
if strings.HasPrefix(dir, allowedDirectory) {

internal/file/file_plugin.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,26 +62,28 @@ func (fp *FilePlugin) Info() *bus.Info {
6262
}
6363

6464
func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
65+
ctxWithMetadata := fp.config.NewContextWithLabels(ctx)
66+
6567
switch msg.Topic {
6668
case bus.ConnectionResetTopic:
67-
fp.handleConnectionReset(ctx, msg)
69+
fp.handleConnectionReset(ctxWithMetadata, msg)
6870
case bus.ConnectionCreatedTopic:
69-
slog.DebugContext(ctx, "File plugin received connection created message")
71+
slog.DebugContext(ctxWithMetadata, "File plugin received connection created message")
7072
fp.fileManagerService.SetIsConnected(true)
7173
case bus.NginxConfigUpdateTopic:
72-
fp.handleNginxConfigUpdate(ctx, msg)
74+
fp.handleNginxConfigUpdate(ctxWithMetadata, msg)
7375
case bus.ConfigUploadRequestTopic:
74-
fp.handleConfigUploadRequest(ctx, msg)
76+
fp.handleConfigUploadRequest(ctxWithMetadata, msg)
7577
case bus.ConfigApplyRequestTopic:
76-
fp.handleConfigApplyRequest(ctx, msg)
78+
fp.handleConfigApplyRequest(ctxWithMetadata, msg)
7779
case bus.ConfigApplyCompleteTopic:
78-
fp.handleConfigApplyComplete(ctx, msg)
80+
fp.handleConfigApplyComplete(ctxWithMetadata, msg)
7981
case bus.ConfigApplySuccessfulTopic:
80-
fp.handleConfigApplySuccess(ctx, msg)
82+
fp.handleConfigApplySuccess(ctxWithMetadata, msg)
8183
case bus.ConfigApplyFailedTopic:
82-
fp.handleConfigApplyFailedRequest(ctx, msg)
84+
fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg)
8385
default:
84-
slog.DebugContext(ctx, "File plugin received unknown topic", "topic", msg.Topic)
86+
slog.DebugContext(ctxWithMetadata, "File plugin received unknown topic", "topic", msg.Topic)
8587
}
8688
}
8789

test/mock/collector/nginx-agent.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ allowed_directories:
2626
- /usr/local/etc/nginx
2727
- /usr/share/nginx/modules
2828
- /var/run/nginx
29+
30+
labels:
31+
product-type: mock-product
32+
product-version: v1.0.0
2933

3034
client:
3135
http:

test/mock/grpc/mock_management_server.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ var (
5252
Time: keepAliveTime,
5353
Timeout: keepAliveTimeout,
5454
}
55+
56+
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
57+
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
5558
)
5659

5760
type MockManagementServer struct {
@@ -146,13 +149,15 @@ func serverOptions(agentConfig *config.Config) []grpc.ServerOption {
146149
opts = append(opts, grpc.ChainUnaryInterceptor(
147150
grpcvalidator.UnaryServerInterceptor(),
148151
protovalidateInterceptor.UnaryServerInterceptor(validator),
152+
logHeaders,
149153
),
150154
)
151155
} else {
152156
opts = append(opts, grpc.ChainUnaryInterceptor(
153157
grpcvalidator.UnaryServerInterceptor(),
154158
protovalidateInterceptor.UnaryServerInterceptor(validator),
155159
ensureValidToken,
160+
logHeaders,
156161
),
157162
)
158163
}
@@ -242,10 +247,6 @@ func reportHealth(healthcheck *health.Server, agentConfig *config.Config) {
242247
}
243248

244249
func ensureValidToken(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
245-
var (
246-
errMissingMetadata = status.Errorf(codes.InvalidArgument, "missing metadata")
247-
errInvalidToken = status.Errorf(codes.Unauthenticated, "invalid token")
248-
)
249250
md, ok := metadata.FromIncomingContext(ctx)
250251
if !ok {
251252
return nil, errMissingMetadata
@@ -270,3 +271,14 @@ func valid(authorization []string) bool {
270271
// for a token matching an arbitrary string.
271272
return token == "1234"
272273
}
274+
275+
func logHeaders(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
276+
md, ok := metadata.FromIncomingContext(ctx)
277+
if !ok {
278+
return nil, errMissingMetadata
279+
}
280+
281+
slog.InfoContext(ctx, "Request headers", "headers", md)
282+
283+
return handler(ctx, req)
284+
}

0 commit comments

Comments
 (0)