Skip to content

Commit 8be3320

Browse files
authored
(DynamicSelector) fixups for dynamic resource attributes (#2498)
1 parent 835a071 commit 8be3320

5 files changed

Lines changed: 78 additions & 52 deletions

File tree

pkg/instrumenter/opts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type Option func(info *global.ContextInfo)
2121
// Root AddPIDs/RemovePIDs preserve the legacy behavior and apply to all supported signals.
2222
// Service name and resource attributes set via AddPID or SetPID are shared across all signals.
2323
// SetPID updates live FileInfo for instrumented PIDs; traces and app metrics read it at export time.
24-
// Network and stats metrics decorate flow records by pod IP when the PID is in that signal view.
24+
// Network and stats metrics decorate flow records with service name/namespace by pod IP.
2525
func WithDynamicPIDSelector(sel *discover.DynamicPIDSelector) Option {
2626
return func(info *global.ContextInfo) {
2727
info.DynamicPIDSelector = sel

pkg/internal/appolly/appolly.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *obi.Config) (
108108
// When sel is nil, finder gets nil: config target_pids are used as static criteria (FindingCriteria(cfg, false)).
109109
if sel != nil {
110110
sel.SetOnFileInfoUpdated(func(fi *exec.FileInfo) {
111-
processEventsInput.Send(exec.ProcessEvent{Type: exec.ProcessEventCreated, File: fi})
111+
processEventsInput.SendCtx(ctx, exec.ProcessEvent{Type: exec.ProcessEventCreated, File: fi})
112112
})
113113
}
114114
instr := &Instrumenter{

pkg/selection/dynamic_app_ips.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ func (d *DynamicAppIPs) decrementIPsLocked(ips []string) {
124124
}
125125

126126
// ResolveContainerIPs returns pod IPs for a PID when a Kubernetes store is available.
127+
// It registers the PID in store via AddProcess; callers that invoke it outside
128+
// DynamicAppIPs must call store.DeleteProcess when the PID is no longer needed.
127129
func ResolveContainerIPs(store *kube.Store, pid app.PID) []string {
128130
if store == nil {
129131
return nil

pkg/selection/dynamic_flow_attrs.go

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,28 @@ import (
1616
type flowIPDecoration struct {
1717
serviceName string
1818
serviceNamespace string
19-
resourceAttrs map[attr.Name]string
2019
}
2120

22-
// DynamicFlowAttrs maps dynamically selected application IPs to service identity and resource
23-
// attributes for NetO11y and StatsO11y decoration.
21+
// DynamicFlowAttrs maps dynamically selected application IPs to service identity for NetO11y
22+
// and StatsO11y decoration.
2423
type DynamicFlowAttrs struct {
2524
multiSel MultiSignalPIDSelector
2625
signalSel PIDSelector
2726
store *kube.Store
2827

29-
mu sync.RWMutex
30-
ipDecor map[string]flowIPDecoration
28+
mu sync.RWMutex
29+
ipDecor map[string]flowIPDecoration
30+
registeredPIDs map[app.PID]struct{}
3131
}
3232

3333
// NewDynamicFlowAttrs creates a tracker for the given signal view and optional Kubernetes store.
3434
func NewDynamicFlowAttrs(multiSel MultiSignalPIDSelector, signalSel PIDSelector, store *kube.Store) *DynamicFlowAttrs {
3535
return &DynamicFlowAttrs{
36-
multiSel: multiSel,
37-
signalSel: signalSel,
38-
store: store,
39-
ipDecor: map[string]flowIPDecoration{},
36+
multiSel: multiSel,
37+
signalSel: signalSel,
38+
store: store,
39+
ipDecor: map[string]flowIPDecoration{},
40+
registeredPIDs: map[app.PID]struct{}{},
4041
}
4142
}
4243

@@ -83,31 +84,40 @@ func (d *DynamicFlowAttrs) loopAttrs(ctx context.Context) {
8384

8485
func (d *DynamicFlowAttrs) rebuild() {
8586
pids, ok := d.signalSel.GetPIDs()
86-
if !ok {
87-
d.mu.Lock()
88-
d.ipDecor = map[string]flowIPDecoration{}
89-
d.mu.Unlock()
90-
return
91-
}
9287

9388
next := map[string]flowIPDecoration{}
94-
for _, pid := range pids {
95-
entry, ok := d.multiSel.GetPID(uint32(pid))
96-
if !ok {
97-
continue
98-
}
99-
dec := decorationFromEntry(entry)
100-
if dec.isEmpty() {
101-
continue
102-
}
103-
for _, ip := range ResolveContainerIPs(d.store, pid) {
104-
next[ip] = dec
89+
storeRegisteredPIDs := map[app.PID]struct{}{}
90+
if ok && d.store != nil {
91+
for _, pid := range pids {
92+
entry, found := d.multiSel.GetPID(uint32(pid))
93+
if !found {
94+
continue
95+
}
96+
dec := decorationFromEntry(entry)
97+
if dec.isEmpty() {
98+
continue
99+
}
100+
for _, ip := range ResolveContainerIPs(d.store, pid) {
101+
next[ip] = dec
102+
}
103+
storeRegisteredPIDs[pid] = struct{}{}
105104
}
106105
}
107106

108107
d.mu.Lock()
108+
defer d.mu.Unlock()
109+
for pid := range d.registeredPIDs {
110+
if _, still := storeRegisteredPIDs[pid]; !still {
111+
if d.store != nil {
112+
d.store.DeleteProcess(pid)
113+
}
114+
delete(d.registeredPIDs, pid)
115+
}
116+
}
117+
for pid := range storeRegisteredPIDs {
118+
d.registeredPIDs[pid] = struct{}{}
119+
}
109120
d.ipDecor = next
110-
d.mu.Unlock()
111121
}
112122

113123
func (d *DynamicFlowAttrs) Apply(a *pipe.CommonAttrs) {
@@ -133,21 +143,14 @@ func (d *DynamicFlowAttrs) Apply(a *pipe.CommonAttrs) {
133143
}
134144

135145
func decorationFromEntry(entry DynamicPIDEntry) flowIPDecoration {
136-
dec := flowIPDecoration{
146+
return flowIPDecoration{
137147
serviceName: entry.ServiceName,
138148
serviceNamespace: entry.ServiceNamespace,
139149
}
140-
if len(entry.ResourceAttributes) > 0 {
141-
dec.resourceAttrs = make(map[attr.Name]string, len(entry.ResourceAttributes))
142-
for k, v := range entry.ResourceAttributes {
143-
dec.resourceAttrs[attr.Name(k)] = v
144-
}
145-
}
146-
return dec
147150
}
148151

149152
func (d flowIPDecoration) isEmpty() bool {
150-
return d.serviceName == "" && d.serviceNamespace == "" && len(d.resourceAttrs) == 0
153+
return d.serviceName == "" && d.serviceNamespace == ""
151154
}
152155

153156
func applyFlowDecoration(dec flowIPDecoration, a *pipe.CommonAttrs, src bool) {
@@ -158,15 +161,12 @@ func applyFlowDecoration(dec flowIPDecoration, a *pipe.CommonAttrs, src bool) {
158161
if dec.serviceNamespace != "" {
159162
a.Metadata[attr.ServiceNamespace] = dec.serviceNamespace
160163
}
161-
} else {
162-
if dec.serviceName != "" {
163-
a.Metadata[attr.ServicePeerName] = dec.serviceName
164-
}
165-
if dec.serviceNamespace != "" {
166-
a.Metadata[attr.ServicePeerNamespace] = dec.serviceNamespace
167-
}
164+
return
165+
}
166+
if dec.serviceName != "" {
167+
a.Metadata[attr.ServicePeerName] = dec.serviceName
168168
}
169-
for k, v := range dec.resourceAttrs {
170-
a.Metadata[k] = v
169+
if dec.serviceNamespace != "" {
170+
a.Metadata[attr.ServicePeerNamespace] = dec.serviceNamespace
171171
}
172172
}

pkg/selection/dynamic_flow_attrs_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ func TestDynamicFlowAttrs_Apply_srcAndDst(t *testing.T) {
6262
PID: 42,
6363
ServiceName: "payments",
6464
ServiceNamespace: "prod",
65-
ResourceAttributes: map[string]string{
66-
"team": "checkout",
67-
},
6865
},
6966
},
7067
}
@@ -81,7 +78,6 @@ func TestDynamicFlowAttrs_Apply_srcAndDst(t *testing.T) {
8178
require.NotNil(t, srcFlow.Metadata)
8279
assert.Equal(t, "payments", srcFlow.Metadata[attr.ServiceName])
8380
assert.Equal(t, "prod", srcFlow.Metadata[attr.ServiceNamespace])
84-
assert.Equal(t, "checkout", srcFlow.Metadata[attr.Name("team")])
8581

8682
dstFlow := &pipe.CommonAttrs{
8783
SrcAddr: pipe.IPAddr(net.ParseIP("10.0.0.9")),
@@ -94,15 +90,43 @@ func TestDynamicFlowAttrs_Apply_srcAndDst(t *testing.T) {
9490
}
9591

9692
func TestDynamicFlowAttrs_rebuild_clearsWhenEmpty(t *testing.T) {
97-
sel := &stubMultiPIDSelector{stubPIDSelector: stubPIDSelector{}}
93+
sel := &stubMultiPIDSelector{
94+
stubPIDSelector: stubPIDSelector{pids: []app.PID{1}},
95+
entries: map[app.PID]DynamicPIDEntry{
96+
1: {PID: 1, ServiceName: "a"},
97+
},
98+
}
9899
tracker := NewDynamicFlowAttrs(sel, sel, nil)
99100
tracker.mu.Lock()
100101
tracker.ipDecor["10.0.0.1"] = flowIPDecoration{serviceName: "old"}
102+
tracker.registeredPIDs[1] = struct{}{}
101103
tracker.mu.Unlock()
102104

105+
sel.pids = nil
103106
tracker.rebuild()
104107

105108
tracker.mu.RLock()
106109
assert.Empty(t, tracker.ipDecor)
110+
assert.Empty(t, tracker.registeredPIDs)
111+
tracker.mu.RUnlock()
112+
}
113+
114+
func TestDynamicFlowAttrs_rebuild_doesNotTrackPIDWithoutDecoration(t *testing.T) {
115+
sel := &stubMultiPIDSelector{
116+
stubPIDSelector: stubPIDSelector{pids: []app.PID{1, 2}},
117+
entries: map[app.PID]DynamicPIDEntry{
118+
1: {PID: 1, ServiceName: "a"},
119+
2: {PID: 2},
120+
},
121+
}
122+
tracker := NewDynamicFlowAttrs(sel, sel, nil)
123+
tracker.mu.Lock()
124+
tracker.registeredPIDs[2] = struct{}{}
125+
tracker.mu.Unlock()
126+
127+
tracker.rebuild()
128+
129+
tracker.mu.RLock()
130+
assert.Empty(t, tracker.registeredPIDs)
107131
tracker.mu.RUnlock()
108132
}

0 commit comments

Comments
 (0)