Skip to content

Commit 3d4734a

Browse files
client_routes: add scoped pruning to Merge for stale entry removal
Add scopeConnectionIDs and scopeHostIDs parameters to Merge so it can prune stale entries before upserting: - Both non-empty (partial update): prune entries matching BOTH lists - Only scopeConnectionIDs (full refresh): prune all entries for those connections - Both empty: additive-only merge, no pruning updateHostPortMapping now passes connection/host IDs through to Merge, enabling automatic cleanup of decommissioned hosts.
1 parent 99fdb97 commit 3d4734a

2 files changed

Lines changed: 115 additions & 7 deletions

File tree

client_routes.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,24 @@ func (l *UnresolvedClientRouteList) Len() int {
135135
}
136136

137137
// Merge upserts entries from incoming into the list.
138-
// Existing entries matching by (ConnectionID, HostID) are updated in place;
139-
// new entries are appended.
140-
func (l *UnresolvedClientRouteList) Merge(incoming UnresolvedClientRouteList) {
138+
// Before upserting it prunes stale entries within the query scope defined by
139+
// scopeConnectionIDs and scopeHostIDs:
140+
// - Both non-empty (partial update): prune entries matching BOTH lists.
141+
// - Only scopeConnectionIDs (full refresh): prune all entries for those connections.
142+
// - Both empty / nil: additive-only merge, no pruning.
143+
func (l *UnresolvedClientRouteList) Merge(incoming UnresolvedClientRouteList, scopeConnectionIDs, scopeHostIDs []string) {
144+
switch {
145+
case len(scopeConnectionIDs) > 0 && len(scopeHostIDs) > 0:
146+
*l = slices.DeleteFunc(*l, func(r UnresolvedClientRoute) bool {
147+
return slices.Contains(scopeConnectionIDs, r.ConnectionID) &&
148+
slices.Contains(scopeHostIDs, r.HostID)
149+
})
150+
case len(scopeConnectionIDs) > 0:
151+
*l = slices.DeleteFunc(*l, func(r UnresolvedClientRoute) bool {
152+
return slices.Contains(scopeConnectionIDs, r.ConnectionID)
153+
})
154+
}
155+
141156
for _, inc := range incoming {
142157
found := false
143158
for id, existing := range *l {
@@ -355,7 +370,7 @@ func (p *ClientRoutesHandler) updateHostPortMapping(connectionIDs []string, host
355370
}
356371

357372
p.mu.Lock()
358-
p.routes.Merge(incoming)
373+
p.routes.Merge(incoming, connectionIDs, hostIDs)
359374
p.mu.Unlock()
360375

361376
return nil

client_routes_unit_test.go

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,15 @@ func TestMerge(t *testing.T) {
6666
// Same record: no change expected.
6767
list.Merge(UnresolvedClientRouteList{
6868
{ConnectionID: "c1", HostID: "h1", Address: "a1", CQLPort: 9042},
69-
})
69+
}, nil, nil)
7070
if len(list) != 1 {
7171
t.Fatalf("expected 1 entry, got %d", len(list))
7272
}
7373

7474
// Updated address: record should be replaced.
7575
list.Merge(UnresolvedClientRouteList{
7676
{ConnectionID: "c1", HostID: "h1", Address: "a2", CQLPort: 9043},
77-
})
77+
}, nil, nil)
7878
if list[0].Address != "a2" || list[0].CQLPort != 9043 {
7979
t.Fatalf("expected record to update")
8080
}
@@ -83,7 +83,7 @@ func TestMerge(t *testing.T) {
8383
list = UnresolvedClientRouteList{}
8484
list.Merge(UnresolvedClientRouteList{
8585
{ConnectionID: "c2", HostID: "h2", Address: "a3", CQLPort: 9044},
86-
})
86+
}, nil, nil)
8787
if len(list) != 1 {
8888
t.Fatalf("expected new record to be appended")
8989
}
@@ -226,6 +226,99 @@ func TestGetHostPortMappingFromClusterQuery(t *testing.T) {
226226
}
227227
}
228228

229+
func TestMerge_DeletedHost(t *testing.T) {
230+
list := UnresolvedClientRouteList{
231+
{ConnectionID: "c1", HostID: "h1", Address: "a1", CQLPort: 9042},
232+
{ConnectionID: "c1", HostID: "h2", Address: "a2", CQLPort: 9042},
233+
}
234+
235+
// Simulate event for (c1, h1) where query returned nothing → (c1,h1) should be removed.
236+
list.Merge(nil, []string{"c1"}, []string{"h1"})
237+
238+
if len(list) != 1 {
239+
t.Fatalf("expected 1 entry after pruning deleted host, got %d", len(list))
240+
}
241+
if list[0].HostID != "h2" {
242+
t.Fatalf("expected h2 to survive, got %s", list[0].HostID)
243+
}
244+
}
245+
246+
func TestMerge_UpdatedHost(t *testing.T) {
247+
list := UnresolvedClientRouteList{
248+
{ConnectionID: "c1", HostID: "h1", Address: "old-addr", CQLPort: 9042},
249+
{ConnectionID: "c2", HostID: "h1", Address: "old-addr2", CQLPort: 9042},
250+
{ConnectionID: "c1", HostID: "h2", Address: "keep", CQLPort: 9042},
251+
}
252+
253+
// h1 address changed; fresh query returns new data for h1. h2 is not affected.
254+
list.Merge(UnresolvedClientRouteList{
255+
{ConnectionID: "c1", HostID: "h1", Address: "new-addr", CQLPort: 9043},
256+
{ConnectionID: "c2", HostID: "h1", Address: "new-addr2", CQLPort: 9043},
257+
}, []string{"c1", "c2"}, []string{"h1"})
258+
259+
if len(list) != 3 {
260+
t.Fatalf("expected 3 entries, got %d", len(list))
261+
}
262+
for _, r := range list {
263+
if r.HostID == "h1" {
264+
if r.Address != "new-addr" && r.Address != "new-addr2" {
265+
t.Fatalf("expected h1 entries to have new addresses, got %s", r.Address)
266+
}
267+
}
268+
if r.HostID == "h2" && r.Address != "keep" {
269+
t.Fatalf("expected h2 entry to be preserved unchanged")
270+
}
271+
}
272+
}
273+
274+
func TestMerge_FullRefresh_PrunesAllStale(t *testing.T) {
275+
list := UnresolvedClientRouteList{
276+
{ConnectionID: "c1", HostID: "h1", Address: "a1", CQLPort: 9042},
277+
{ConnectionID: "c1", HostID: "h2", Address: "a2", CQLPort: 9042},
278+
{ConnectionID: "c1", HostID: "h3", Address: "a3", CQLPort: 9042},
279+
}
280+
281+
// Full refresh for connection c1: all entries for c1 are pruned, only h1 and h2 returned.
282+
list.Merge(UnresolvedClientRouteList{
283+
{ConnectionID: "c1", HostID: "h1", Address: "a1", CQLPort: 9042},
284+
{ConnectionID: "c1", HostID: "h2", Address: "a2", CQLPort: 9042},
285+
}, []string{"c1"}, nil)
286+
287+
if len(list) != 2 {
288+
t.Fatalf("expected 2 entries after full refresh prune, got %d", len(list))
289+
}
290+
for _, r := range list {
291+
if r.HostID == "h3" {
292+
t.Fatalf("expected h3 to be pruned")
293+
}
294+
}
295+
}
296+
229297
// TestUpdateHostPortMapping_FullRefresh_PrunesStaleEntries simulates the same
230298
// sequence of operations that updateHostPortMapping performs (lock → Merge → unlock)
231299
// to verify that a full refresh correctly prunes a host that disappeared.
300+
func TestUpdateHostPortMapping_FullRefresh_PrunesStaleEntries(t *testing.T) {
301+
// Existing routes: h1, h2, h3.
302+
routes := UnresolvedClientRouteList{
303+
{ConnectionID: "c1", HostID: "h1", Address: "a1", CQLPort: 9042},
304+
{ConnectionID: "c1", HostID: "h2", Address: "a2", CQLPort: 9042},
305+
{ConnectionID: "c1", HostID: "h3", Address: "a3", CQLPort: 9042},
306+
}
307+
308+
// Cluster now returns only h1 and h2 (h3 was decommissioned).
309+
incoming := UnresolvedClientRouteList{
310+
{ConnectionID: "c1", HostID: "h1", Address: "a1", CQLPort: 9042},
311+
{ConnectionID: "c1", HostID: "h2", Address: "a2", CQLPort: 9042},
312+
}
313+
314+
routes.Merge(incoming, []string{"c1"}, nil)
315+
316+
if len(routes) != 2 {
317+
t.Fatalf("expected 2 entries after full-refresh prune, got %d", len(routes))
318+
}
319+
for _, r := range routes {
320+
if r.HostID == "h3" {
321+
t.Fatalf("h3 should have been pruned by full refresh")
322+
}
323+
}
324+
}

0 commit comments

Comments
 (0)