Skip to content

Commit b1751fa

Browse files
committed
Sanitize gRPC metadata headers built from resource attributes
grpc-go hard-fails the entire RPC when an outgoing metadata pair fails its charset/printability validation, unlike the CE-extension path where an invalid entry is just dropped. Add SanitizeMetadataHeaders (reusing SanitizeExtensionName for keys, plus a printable-ASCII value sanitizer) and apply it once when building the static header provider, so a malformed resource-attribute key or value can no longer take down chip-ingress emission for the node.
1 parent 51940a5 commit b1751fa

3 files changed

Lines changed: 166 additions & 1 deletion

File tree

pkg/beholder/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
217217
opts = append(opts, chipingress.WithTracerProvider(tracerProvider))
218218

219219
if len(resourceAttrs) > 0 {
220-
opts = append(opts, chipingress.WithHeaderProvider(chipingress.NewStaticHeaderProvider(resourceAttrs)))
220+
opts = append(opts, chipingress.WithHeaderProvider(chipingress.NewStaticHeaderProvider(chipingress.SanitizeMetadataHeaders(resourceAttrs))))
221221
}
222222

223223
chipIngressClient, err = chipingress.NewClient(cfg.ChipIngressEmitterGRPCEndpoint, opts...)

pkg/chipingress/header_provider.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"maps"
11+
"sort"
1112
"sync"
1213
"sync/atomic"
1314
"time"
@@ -115,6 +116,58 @@ func NewStaticHeaderProvider(headers map[string]string) HeaderProvider {
115116
return newStaticHeaderProvider(headers, false)
116117
}
117118

119+
// SanitizeMetadataValue replaces any byte outside the printable ASCII range [0x20-0x7E]
120+
// with '?'. grpc-go hard-fails the entire RPC when an outgoing metadata value fails this
121+
// check (unlike the CE-extension path, where an invalid entry is simply dropped), so
122+
// values headed for gRPC metadata must be normalized before being sent.
123+
func SanitizeMetadataValue(val string) string {
124+
b := []byte(val)
125+
out := make([]byte, len(b))
126+
for i, c := range b {
127+
if c >= 0x20 && c <= 0x7E {
128+
out[i] = c
129+
} else {
130+
out[i] = '?'
131+
}
132+
}
133+
return string(out)
134+
}
135+
136+
// SanitizeMetadataHeaders sanitizes a map of resource-attribute headers for use as outgoing
137+
// gRPC metadata (e.g. via NewStaticHeaderProvider). Keys are sanitized with
138+
// SanitizeExtensionName — the same strict [a-z0-9] charset used for CloudEvent extensions —
139+
// which is a subset of grpc's allowed metadata-key charset, so a sanitized key can never trip
140+
// grpc's key validation or the reserved "-bin" suffix, and produces the same key stem as the
141+
// corresponding CE extension (differing only by the CloudEvents Kafka binding's "ce_" prefix
142+
// once on the wire). Values are sanitized via SanitizeMetadataValue, since grpc-go fails the
143+
// whole RPC on a non-printable value. Entries that sanitize to an empty key, or that collide
144+
// with a reserved extension name (see reservedExtensionNames), are skipped. Keys are applied
145+
// in sorted order so duplicate sanitized keys resolve deterministically (first in sorted order
146+
// wins), matching WithResourceAttributeExtensions' collision handling.
147+
func SanitizeMetadataHeaders(in map[string]string) map[string]string {
148+
keys := make([]string, 0, len(in))
149+
for k := range in {
150+
keys = append(keys, k)
151+
}
152+
sort.Strings(keys)
153+
154+
out := make(map[string]string, len(in))
155+
for _, k := range keys {
156+
name := SanitizeExtensionName(k)
157+
if name == "" {
158+
continue
159+
}
160+
if _, reserved := reservedExtensionNames[name]; reserved {
161+
continue
162+
}
163+
if _, exists := out[name]; exists {
164+
continue
165+
}
166+
out[name] = SanitizeMetadataValue(in[k])
167+
}
168+
return out
169+
}
170+
118171
// newRotatingHeaderProvider returns a HeaderProvider that refreshes its
119172
// headers every ttl using signer. initialHeaders, if non-empty, are served
120173
// until the first rotation occurs.

pkg/chipingress/header_provider_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,18 @@ import (
44
"context"
55
"crypto/ed25519"
66
"encoding/hex"
7+
"net"
78
"testing"
89
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/codes"
15+
"google.golang.org/grpc/status"
1216

1317
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
18+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
1419
)
1520

1621
// fakeSigner is a minimal chipingress.Signer used by tests that need to
@@ -277,3 +282,110 @@ func TestNewStaticHeaderProvider(t *testing.T) {
277282
require.True(t, ok)
278283
assert.False(t, tlsReq.RequireTransportSecurity())
279284
}
285+
286+
func TestSanitizeMetadataValue(t *testing.T) {
287+
tests := []struct {
288+
name string
289+
in string
290+
want string
291+
}{
292+
{name: "printable ASCII is unchanged", in: "chain-1_prod.v2", want: "chain-1_prod.v2"},
293+
{name: "empty", in: "", want: ""},
294+
{name: "control character replaced", in: "value\nwith\tcontrol", want: "value?with?control"},
295+
{name: "non-ASCII UTF-8 replaced byte-wise", in: "café", want: "caf??"},
296+
}
297+
for _, tt := range tests {
298+
t.Run(tt.name, func(t *testing.T) {
299+
assert.Equal(t, tt.want, chipingress.SanitizeMetadataValue(tt.in))
300+
})
301+
}
302+
}
303+
304+
func TestSanitizeMetadataHeaders(t *testing.T) {
305+
t.Run("standard OTel-style keys are sanitized to the same stem as CE extensions", func(t *testing.T) {
306+
in := map[string]string{
307+
"service.name": "beholder",
308+
"chain_id": "1",
309+
"node-operator": "acme",
310+
}
311+
got := chipingress.SanitizeMetadataHeaders(in)
312+
assert.Equal(t, map[string]string{
313+
"servicename": "beholder",
314+
"chainid": "1",
315+
"nodeoperator": "acme",
316+
}, got)
317+
})
318+
319+
t.Run("empty-after-sanitize keys are dropped", func(t *testing.T) {
320+
got := chipingress.SanitizeMetadataHeaders(map[string]string{"---": "value"})
321+
assert.Empty(t, got)
322+
})
323+
324+
t.Run("reserved names are dropped", func(t *testing.T) {
325+
got := chipingress.SanitizeMetadataHeaders(map[string]string{chipingress.IdempotencyKeyAttr: "should-not-appear", "subject": "should-not-appear"})
326+
assert.Empty(t, got)
327+
})
328+
329+
t.Run("non-printable values are sanitized", func(t *testing.T) {
330+
got := chipingress.SanitizeMetadataHeaders(map[string]string{"chain_id": "1\n2"})
331+
assert.Equal(t, "1?2", got["chainid"])
332+
})
333+
334+
t.Run("duplicate sanitized keys resolve deterministically to sorted-first key", func(t *testing.T) {
335+
got := chipingress.SanitizeMetadataHeaders(map[string]string{"service.name": "from-dotted", "service_name": "from-snake"})
336+
// sorted order: "service.name" < "service_name" ('.' < '_' in ASCII), so the dotted key wins.
337+
assert.Equal(t, "from-dotted", got["servicename"])
338+
})
339+
}
340+
341+
// pingServer is a minimal ChipIngressServer that always answers Ping successfully.
342+
type pingServer struct {
343+
pb.UnimplementedChipIngressServer
344+
}
345+
346+
func (pingServer) Ping(context.Context, *pb.EmptyRequest) (*pb.PingResponse, error) {
347+
return &pb.PingResponse{}, nil
348+
}
349+
350+
// TestSanitizeMetadataHeaders_AvoidsRPCFailure is a regression/guard test for the core reason
351+
// SanitizeMetadataHeaders exists: grpc-go hard-fails an entire RPC (codes.Internal) when an
352+
// outgoing metadata pair fails its charset validation. An unsanitized resource-attribute key or
353+
// value (dots, non-printable characters) reproduces that failure; running it through
354+
// SanitizeMetadataHeaders first must not.
355+
func TestSanitizeMetadataHeaders_AvoidsRPCFailure(t *testing.T) {
356+
lis, err := net.Listen("tcp", "127.0.0.1:0")
357+
require.NoError(t, err)
358+
defer lis.Close()
359+
360+
srv := grpc.NewServer()
361+
pb.RegisterChipIngressServer(srv, pingServer{})
362+
go func() { _ = srv.Serve(lis) }()
363+
defer srv.Stop()
364+
365+
dirty := map[string]string{"k8s.pod.name": "pod-\x01abc"}
366+
367+
t.Run("unsanitized headers fail the RPC", func(t *testing.T) {
368+
client, err := chipingress.NewClient(lis.Addr().String(),
369+
chipingress.WithInsecureConnection(),
370+
chipingress.WithHeaderProvider(chipingress.NewStaticHeaderProvider(dirty)),
371+
)
372+
require.NoError(t, err)
373+
defer client.Close() //nolint:errcheck
374+
375+
_, err = client.Ping(t.Context(), &chipingress.EmptyRequest{})
376+
require.Error(t, err)
377+
assert.Equal(t, codes.Internal, status.Code(err))
378+
})
379+
380+
t.Run("sanitized headers succeed", func(t *testing.T) {
381+
client, err := chipingress.NewClient(lis.Addr().String(),
382+
chipingress.WithInsecureConnection(),
383+
chipingress.WithHeaderProvider(chipingress.NewStaticHeaderProvider(chipingress.SanitizeMetadataHeaders(dirty))),
384+
)
385+
require.NoError(t, err)
386+
defer client.Close() //nolint:errcheck
387+
388+
_, err = client.Ping(t.Context(), &chipingress.EmptyRequest{})
389+
require.NoError(t, err)
390+
})
391+
}

0 commit comments

Comments
 (0)