Skip to content

Commit 91ed054

Browse files
authored
[processor/k8sattributes] Use podUID instead podName to determine which pods should be deleted from cache (#43008)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description when creating deleteQueue, rely on a pod UID as a unique identifier, as pod name isn't always unique <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #42978 <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: odubajDT <ondrej.dubaj@dynatrace.com>
1 parent 5356cc8 commit 91ed054

File tree

4 files changed

+196
-56
lines changed

4 files changed

+196
-56
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/k8sattributes
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Use podUID instead podName to determine which pods should be deleted from cache"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42978]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/k8sattributesprocessor/internal/kube/client.go

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -643,41 +643,44 @@ func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) {
643643
for {
644644
select {
645645
case <-time.After(interval):
646-
var cutoff int
647-
now := time.Now()
648-
c.deleteMut.Lock()
649-
for i := range c.deleteQueue {
650-
d := c.deleteQueue[i]
651-
if d.ts.Add(gracePeriod).After(now) {
652-
break
653-
}
654-
cutoff = i + 1
655-
}
656-
toDelete := c.deleteQueue[:cutoff]
657-
c.deleteQueue = c.deleteQueue[cutoff:]
658-
c.deleteMut.Unlock()
659-
660-
c.m.Lock()
661-
for i := range toDelete {
662-
d := toDelete[i]
663-
if p, ok := c.Pods[d.id]; ok {
664-
// Sanity check: make sure we are deleting the same pod
665-
// and the underlying state (ip<>pod mapping) has not changed.
666-
if p.Name == d.podName {
667-
delete(c.Pods, d.id)
668-
}
669-
}
670-
}
671-
podTableSize := len(c.Pods)
672-
c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize))
673-
c.m.Unlock()
674-
646+
c.deleteLoopProcessing(gracePeriod)
675647
case <-c.stopCh:
676648
return
677649
}
678650
}
679651
}
680652

653+
func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) {
654+
var cutoff int
655+
now := time.Now()
656+
c.deleteMut.Lock()
657+
for i := range c.deleteQueue {
658+
d := c.deleteQueue[i]
659+
if d.ts.Add(gracePeriod).After(now) {
660+
break
661+
}
662+
cutoff = i + 1
663+
}
664+
toDelete := c.deleteQueue[:cutoff]
665+
c.deleteQueue = c.deleteQueue[cutoff:]
666+
c.deleteMut.Unlock()
667+
668+
c.m.Lock()
669+
for i := range toDelete {
670+
d := toDelete[i]
671+
if p, ok := c.Pods[d.id]; ok {
672+
// Sanity check: make sure we are deleting the same pod
673+
// and the underlying state (ip<>pod mapping) has not changed.
674+
if p.PodUID == d.podUID {
675+
delete(c.Pods, d.id)
676+
}
677+
}
678+
}
679+
podTableSize := len(c.Pods)
680+
c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize))
681+
c.m.Unlock()
682+
}
683+
681684
// GetPod takes an IP address or Pod UID and returns the pod the identifier is associated with.
682685
func (c *WatchClient) GetPod(identifier PodIdentifier) (*Pod, bool) {
683686
c.m.RLock()
@@ -1463,18 +1466,18 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) {
14631466
id := identifiers[i]
14641467
p, ok := c.GetPod(id)
14651468

1466-
if ok && p.Name == pod.Name {
1467-
c.appendDeleteQueue(id, pod.Name)
1469+
if ok && p.PodUID == string(pod.UID) {
1470+
c.appendDeleteQueue(id, p.PodUID)
14681471
}
14691472
}
14701473
}
14711474

1472-
func (c *WatchClient) appendDeleteQueue(podID PodIdentifier, podName string) {
1475+
func (c *WatchClient) appendDeleteQueue(podID PodIdentifier, podUID string) {
14731476
c.deleteMut.Lock()
14741477
c.deleteQueue = append(c.deleteQueue, deleteRequest{
1475-
id: podID,
1476-
podName: podName,
1477-
ts: time.Now(),
1478+
id: podID,
1479+
podUID: podUID,
1480+
ts: time.Now(),
14781481
})
14791482
c.deleteMut.Unlock()
14801483
}

processor/k8sattributesprocessor/internal/kube/client_test.go

Lines changed: 128 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/apimachinery/pkg/fields"
2626
"k8s.io/apimachinery/pkg/labels"
2727
"k8s.io/apimachinery/pkg/selection"
28+
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/client-go/kubernetes"
2930
"k8s.io/client-go/kubernetes/fake"
3031
"k8s.io/client-go/tools/cache"
@@ -455,6 +456,7 @@ func TestPodDelete(t *testing.T) {
455456
c.deleteQueue = c.deleteQueue[:0]
456457
pod = &api_v1.Pod{}
457458
pod.Status.PodIP = "1.1.1.1"
459+
pod.UID = "aaaaaaaa-bbbb-cccc-dddd"
458460
c.handlePodDelete(pod)
459461
got = c.Pods[newPodIdentifier("connection", "k8s.pod.ip", "1.1.1.1")]
460462
assert.Len(t, c.Pods, 5)
@@ -472,7 +474,6 @@ func TestPodDelete(t *testing.T) {
472474
assert.Len(t, c.deleteQueue, 3)
473475
deleteRequest := c.deleteQueue[0]
474476
assert.Equal(t, newPodIdentifier("connection", "k8s.pod.ip", "1.1.1.1"), deleteRequest.id)
475-
assert.Equal(t, "podB", deleteRequest.podName)
476477
assert.False(t, deleteRequest.ts.Before(tsBeforeDelete))
477478
assert.False(t, deleteRequest.ts.After(time.Now()))
478479

@@ -488,12 +489,12 @@ func TestPodDelete(t *testing.T) {
488489
assert.Len(t, c.deleteQueue, 5)
489490
deleteRequest = c.deleteQueue[0]
490491
assert.Equal(t, newPodIdentifier("connection", "k8s.pod.ip", "2.2.2.2"), deleteRequest.id)
491-
assert.Equal(t, "podC", deleteRequest.podName)
492+
assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", deleteRequest.podUID)
492493
assert.False(t, deleteRequest.ts.Before(tsBeforeDelete))
493494
assert.False(t, deleteRequest.ts.After(time.Now()))
494495
deleteRequest = c.deleteQueue[1]
495496
assert.Equal(t, newPodIdentifier("resource_attribute", "k8s.pod.uid", "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"), deleteRequest.id)
496-
assert.Equal(t, "podC", deleteRequest.podName)
497+
assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", deleteRequest.podUID)
497498
assert.False(t, deleteRequest.ts.Before(tsBeforeDelete))
498499
assert.False(t, deleteRequest.ts.After(time.Now()))
499500
}
@@ -568,21 +569,6 @@ func TestNodeDelete(t *testing.T) {
568569
assert.Empty(t, c.Nodes)
569570
}
570571

571-
func TestDeleteQueue(t *testing.T) {
572-
c, _ := newTestClient(t)
573-
podAddAndUpdateTest(t, c, c.handlePodAdd)
574-
assert.Len(t, c.Pods, 5)
575-
assert.Equal(t, "1.1.1.1", c.Pods[newPodIdentifier("connection", "k8s.pod.ip", "1.1.1.1")].Address)
576-
577-
// delete pod
578-
pod := &api_v1.Pod{}
579-
pod.Name = "podB"
580-
pod.Status.PodIP = "1.1.1.1"
581-
c.handlePodDelete(pod)
582-
assert.Len(t, c.Pods, 5)
583-
assert.Len(t, c.deleteQueue, 3)
584-
}
585-
586572
func TestDeleteLoop(t *testing.T) {
587573
// go c.deleteLoop(time.Second * 1)
588574
c, _ := newTestClient(t)
@@ -1392,6 +1378,130 @@ func TestNamespaceExtractionRules(t *testing.T) {
13921378
}
13931379
}
13941380

1381+
func TestDeleteQueue(t *testing.T) {
1382+
makePodIdentifiers := func(pod *api_v1.Pod) []PodIdentifier {
1383+
return []PodIdentifier{
1384+
newPodIdentifier("resource_attribute", "k8s.pod.uid", string(pod.UID)),
1385+
{
1386+
{
1387+
Source: AssociationSource{
1388+
From: "resource_attribute",
1389+
Name: "k8s.namespace.name",
1390+
},
1391+
Value: "ns",
1392+
},
1393+
{
1394+
Source: AssociationSource{
1395+
From: "resource_attribute",
1396+
Name: "k8s.pod.name",
1397+
},
1398+
Value: "abc-0",
1399+
},
1400+
},
1401+
newPodIdentifier("resource_attribute", "k8s.pod.ip", "10.0.0.1"),
1402+
newPodIdentifier("connection", "", "10.0.0.1"),
1403+
}
1404+
}
1405+
1406+
makePod := func(podUid string) *api_v1.Pod {
1407+
pod1 := &api_v1.Pod{
1408+
ObjectMeta: meta_v1.ObjectMeta{
1409+
Name: "abc-0",
1410+
Namespace: "ns",
1411+
UID: types.UID(podUid),
1412+
},
1413+
Status: api_v1.PodStatus{
1414+
PodIP: "10.0.0.1",
1415+
},
1416+
}
1417+
return pod1
1418+
}
1419+
1420+
c, _ := newTestClient(t)
1421+
doAssertions := func(pod *api_v1.Pod) {
1422+
podIdentifiers := makePodIdentifiers(pod)
1423+
for _, id := range podIdentifiers {
1424+
foundPod, ok := c.Pods[id]
1425+
assert.True(t, ok, "Pod should be present in c.Pods for identifier %v", id)
1426+
assert.Equal(t, pod.UID, types.UID(foundPod.PodUID))
1427+
assert.Equal(t, "ns", foundPod.Namespace)
1428+
assert.Equal(t, "abc-0", foundPod.Name)
1429+
assert.Equal(t, "10.0.0.1", foundPod.Address)
1430+
}
1431+
}
1432+
1433+
// Clear the pods map to start fresh...
1434+
c.Pods = make(map[PodIdentifier]*Pod)
1435+
// Set associations to match what we have configured for our OpenTelemetry Collector.
1436+
c.Associations = []Association{
1437+
{
1438+
Sources: []AssociationSource{
1439+
{
1440+
From: "resource_attribute",
1441+
Name: "k8s.pod.uid",
1442+
},
1443+
},
1444+
},
1445+
{
1446+
Sources: []AssociationSource{
1447+
{
1448+
From: "resource_attribute",
1449+
Name: "k8s.namespace.name",
1450+
},
1451+
{
1452+
From: "resource_attribute",
1453+
Name: "k8s.pod.name",
1454+
},
1455+
},
1456+
},
1457+
{
1458+
Sources: []AssociationSource{
1459+
{
1460+
From: "resource_attribute",
1461+
Name: "k8s.pod.ip",
1462+
},
1463+
},
1464+
},
1465+
{
1466+
Sources: []AssociationSource{
1467+
{
1468+
From: "connection",
1469+
},
1470+
},
1471+
},
1472+
}
1473+
1474+
// Add a pod and verify that we can find it by all identifiers.
1475+
pod1 := makePod("12345678-1234-1234-1234-123456789abc")
1476+
c.handlePodAdd(pod1)
1477+
doAssertions(pod1)
1478+
assert.Len(t, c.Pods, 4)
1479+
1480+
// Delete a pod and verify that we can still found it by all identifiers
1481+
c.handlePodDelete(pod1)
1482+
doAssertions(pod1)
1483+
assert.Len(t, c.Pods, 4)
1484+
1485+
// Add a pod with the same values as pod1 except for UID, and verify that we can find it by all identifiers.
1486+
pod2 := makePod("87654321-4321-4321-4321-cba987654321")
1487+
c.handlePodAdd(pod2)
1488+
doAssertions(pod2)
1489+
assert.Len(t, c.Pods, 5) // 4 from pod2 + 1 from pod1 (the pod UID identifier)
1490+
1491+
c.deleteLoopProcessing(0 * time.Second)
1492+
assert.Len(t, c.Pods, 4) // Only mappings for pod2 remain
1493+
doAssertions(pod2)
1494+
1495+
// Delete pod2 and verify that it gets removed after the next delete loop housekeeping.
1496+
c.handlePodDelete(pod2)
1497+
assert.Len(t, c.Pods, 4) // Only mappings for pod2 remain
1498+
doAssertions(pod2)
1499+
1500+
// Delete loop processing should remove mappings for pod2.
1501+
c.deleteLoopProcessing(0 * time.Second)
1502+
assert.Empty(t, c.Pods) // No more mappings
1503+
}
1504+
13951505
func TestNodeExtractionRules(t *testing.T) {
13961506
c, _ := newTestClientWithRulesAndFilters(t, Filters{})
13971507

processor/k8sattributesprocessor/internal/kube/kube.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,9 @@ type Node struct {
181181
type deleteRequest struct {
182182
// id is identifier (IP address or Pod UID) of pod to remove from pods map
183183
id PodIdentifier
184-
// name contains name of pod to remove from pods map
185-
podName string
186-
ts time.Time
184+
// contains uid of pod to remove from pods map
185+
podUID string
186+
ts time.Time
187187
}
188188

189189
// Filters is used to instruct the client on how to filter out k8s pods.

0 commit comments

Comments
 (0)