Skip to content

Commit d04f023

Browse files
authored
fix: issue with not discovering addresses when multiple endpointslices exist for single service (#13)
* chore: reproduce issue with multiple endpointslices * feat: fix issue with having multiple endpoint slices for single service * fix: use refresh method when starting to do not miss acquiring semaphore
1 parent 5f7e6f8 commit d04f023

File tree

4 files changed

+131
-26
lines changed

4 files changed

+131
-26
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ jobs:
3838
distribution: 'temurin'
3939
- name: Setup Minikube
4040
uses: medyagh/setup-minikube@latest
41+
with:
42+
extra-config: 'controller-manager.max-endpoints-per-slice=2'
4143
- name: Setup Gradle
4244
uses: gradle/actions/setup-gradle@v4
4345
- name: Run Integration Tests

integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ void teardown() throws IOException {
5151
@DisplayName("should continuously resolve all addresses of deployment behind a service")
5252
@ParameterizedTest(name = "replicas changes = {0}")
5353
@MethodSource(value = "testCases")
54-
void continuouslyResolveAllAddressesTest(List<Integer> replicasHistory) throws IOException, InterruptedException {
54+
void continuouslyResolveAllAddressesTest(List<Integer> replicasHistory) {
5555
for (Integer replicas : replicasHistory) {
5656
log.info("Scaling server to {} replicas", replicas);
5757
final var serverIPs = manager.scaleServer(replicas);

integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,10 @@ public List<Endpoint> awaitScaledReadyDeployment(String deploymentName, int repl
6666
.list()
6767
.getItems()
6868
.stream()
69-
.findAny()
7069
.map(EndpointSlice::getEndpoints)
71-
.stream()
70+
.filter(Objects::nonNull)
7271
.flatMap(Collection::stream)
73-
.filter(e -> e.getConditions().getReady())
72+
.filter(e -> e != null && e.getConditions() != null && e.getConditions().getReady())
7473
.toList(),
7574
readyEndpoints -> (long) readyEndpoints.size() == replicasCount
7675
);

lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java

Lines changed: 126 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@
33
import java.io.IOException;
44
import java.net.InetSocketAddress;
55
import java.net.SocketAddress;
6+
import java.util.ArrayList;
67
import java.util.List;
8+
import java.util.Map;
79
import java.util.Optional;
810
import java.util.Set;
11+
import java.util.concurrent.ConcurrentHashMap;
912
import java.util.concurrent.Executor;
1013
import java.util.concurrent.ExecutorService;
1114
import java.util.concurrent.Executors;
1215
import java.util.concurrent.Semaphore;
1316
import java.util.logging.Level;
1417
import java.util.logging.Logger;
18+
import java.util.stream.Collectors;
1519

1620
import io.github.lothar1998.kuberesolver.kubernetes.EndpointSliceWatcher;
1721
import io.github.lothar1998.kuberesolver.kubernetes.InClusterEndpointSliceWatcher;
@@ -116,7 +120,7 @@ public KubernetesNameResolver(Executor executor, ResolverTarget params) throws I
116120
@Override
117121
public void start(Listener listener) {
118122
this.listener = listener;
119-
resolve();
123+
refresh();
120124
}
121125

122126
/**
@@ -143,6 +147,8 @@ private void resolve() {
143147
*/
144148
private void watch() {
145149
watcher.watch(params.service(), new EndpointSliceWatcher.Subscriber() {
150+
private final Map<String, List<Set<SocketAddress>>> endpoints = new ConcurrentHashMap<>();
151+
146152
@Override
147153
public void onEvent(Event event) {
148154
// watch event occurred
@@ -152,21 +158,48 @@ public void onEvent(Event event) {
152158
return;
153159
}
154160

155-
if (event.type().equals(EventType.DELETED)) {
156-
LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted",
157-
new Object[]{event.endpointSlice().metadata().name()});
161+
if (event.endpointSlice() == null) {
162+
LOGGER.log(Level.FINE, "No EndpointSlice found in watch event");
158163
return;
159164
}
160165

161-
if (event.endpointSlice() == null) {
162-
LOGGER.log(Level.FINE, "No EndpointSlice found in watch event");
166+
if (event.endpointSlice().metadata() == null || event.endpointSlice().metadata().name() == null) {
167+
LOGGER.log(Level.FINE, "No EndpointSlice name found in watch event metadata");
168+
return;
169+
}
170+
171+
if (event.type().equals(EventType.DELETED)) {
172+
LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted",
173+
new Object[]{event.endpointSlice().metadata().name()});
174+
endpoints.remove(event.endpointSlice().metadata().name());
163175
return;
164176
}
165177

166178
LOGGER.log(Level.FINER, "Resolving addresses for service {0}", new Object[]{params.service()});
167-
buildAddresses(event.endpointSlice()).ifPresentOrElse(a -> listener.onAddresses(a, Attributes.EMPTY),
168-
() -> LOGGER.log(Level.FINE, "No usable addresses found for Kubernetes service {0}",
169-
new Object[]{params.service()}));
179+
var endpointSliceAddresses = buildAddresses(event.endpointSlice());
180+
if (endpointSliceAddresses.isEmpty()) {
181+
LOGGER.log(Level.FINE, "No usable addresses found for service {0} in EndpointSlice {1}",
182+
new Object[]{params.service(), event.endpointSlice().metadata().name()});
183+
} else {
184+
LOGGER.log(Level.FINEST,
185+
() -> String.format(
186+
"Resolved addresses for service %s from EndpointSlice %s: %s",
187+
params.service(),
188+
event.endpointSlice().metadata().name(),
189+
addressGroupsToString(endpointSliceAddresses.get())
190+
));
191+
endpoints.put(event.endpointSlice().metadata().name(), endpointSliceAddresses.get());
192+
193+
var allAddresses = endpoints.values().stream()
194+
.flatMap(List::stream)
195+
.distinct()
196+
.toList();
197+
198+
LOGGER.log(Level.FINEST, () -> String.format(
199+
"All resolved addresses for service %s: %s",
200+
params.service(), addressGroupsToString(allAddresses)));
201+
listener.onAddresses(toEquivalentAddressGroups(allAddresses), Attributes.EMPTY);
202+
}
170203
}
171204

172205
@Override
@@ -208,17 +241,28 @@ public String getServiceAuthority() {
208241
}
209242

210243
/**
211-
* Builds a list of gRPC {@link EquivalentAddressGroup} from the given
212-
* {@link EndpointSlice}.
244+
* Extracts and processes network addresses from a Kubernetes {@link EndpointSlice}.
245+
* <p>
246+
* This method performs several key steps in the address resolution process:
247+
* <ol>
248+
* <li>Finds the appropriate port to use from the EndpointSlice</li>
249+
* <li>Filters for endpoints that are in the "ready" condition</li>
250+
* <li>Maps each endpoint's IP addresses to socket addresses using the resolved port</li>
251+
* </ol>
252+
* <p>
253+
* If no suitable port can be found or if the EndpointSlice contains no ready endpoints,
254+
* an empty Optional will be returned.
213255
*
214-
* @param endpointSlice the EndpointSlice to process
215-
* @return an optional list of resolved addresses
256+
* @param endpointSlice the Kubernetes EndpointSlice containing endpoint information
257+
* @return an Optional containing a list of socket address sets for ready endpoints,
258+
* or an empty Optional if no addresses could be resolved
216259
*/
217-
private Optional<List<EquivalentAddressGroup>> buildAddresses(EndpointSlice endpointSlice) {
260+
private Optional<List<Set<SocketAddress>>> buildAddresses(EndpointSlice endpointSlice) {
218261
return findPort(endpointSlice.ports())
219262
.map(port -> endpointSlice.endpoints().stream()
220263
.filter(endpoint -> endpoint.conditions().isReady())
221264
.map(endpoint -> buildAddressGroup(endpoint.addresses(), port))
265+
.filter(group -> !group.isEmpty())
222266
.toList());
223267
}
224268

@@ -246,17 +290,77 @@ private Optional<Integer> findPort(List<EndpointPort> ports) {
246290
}
247291

248292
/**
249-
* Builds a gRPC {@link EquivalentAddressGroup} from the given addresses and
250-
* port.
293+
* Builds a set of socket addresses from a list of IP addresses and a port number.
294+
* This method converts each IP address into an {@link InetSocketAddress} using the given port,
295+
* which represents one endpoint in a Kubernetes EndpointSlice.
296+
* <p>
297+
* The resulting set of addresses is used in the name resolution process to provide
298+
* gRPC clients with possible connection endpoints for the target service.
251299
*
252-
* @param addresses the list of addresses
253-
* @param port the port number
254-
* @return an {@link EquivalentAddressGroup} containing the resolved addresses
300+
* @param addresses the list of IP addresses from a Kubernetes endpoint
301+
* @param port the port number to use for all addresses
302+
* @return a set of {@link SocketAddress} objects representing the endpoint addresses
255303
*/
256-
private EquivalentAddressGroup buildAddressGroup(List<String> addresses, int port) {
257-
var socketAddresses = addresses.stream()
304+
private Set<SocketAddress> buildAddressGroup(List<String> addresses, int port) {
305+
return addresses.stream()
258306
.map(address -> (SocketAddress) new InetSocketAddress(address, port))
307+
.collect(Collectors.toSet());
308+
}
309+
310+
/**
311+
* Converts a list of socket address sets into a list of {@link EquivalentAddressGroup} objects.
312+
* Each set of socket addresses is transformed into a single {@link EquivalentAddressGroup},
313+
* which gRPC uses to represent a group of equivalent addresses for load balancing.
314+
*
315+
* @param addressGroups the list of socket address sets to convert
316+
* @return a list of {@link EquivalentAddressGroup} objects, each representing one set of addresses
317+
*/
318+
private List<EquivalentAddressGroup> toEquivalentAddressGroups(List<Set<SocketAddress>> addressGroups) {
319+
return addressGroups.stream()
320+
.map(group -> new EquivalentAddressGroup(new ArrayList<>(group)))
259321
.toList();
260-
return new EquivalentAddressGroup(socketAddresses, Attributes.EMPTY);
322+
}
323+
324+
/**
325+
* Converts a list of socket address sets into a human-readable string representation.
326+
* The format is a nested structure like: [(addr1, addr2), (addr3), (addr4, addr5)]
327+
* where each set of addresses is represented as a group in parentheses.
328+
*
329+
* @param addressGroups the list of socket address sets to convert to string
330+
* @return a string representation of the address groups
331+
*/
332+
private String addressGroupsToString(List<Set<SocketAddress>> addressGroups) {
333+
if (addressGroups == null || addressGroups.isEmpty()) {
334+
return "[]";
335+
}
336+
337+
var result = new StringBuilder("[");
338+
339+
result.append("(");
340+
boolean firstAddr = true;
341+
for (SocketAddress address : addressGroups.get(0)) {
342+
if (!firstAddr) {
343+
result.append(", ");
344+
}
345+
result.append(address);
346+
firstAddr = false;
347+
}
348+
result.append(")");
349+
350+
for (int i = 1; i < addressGroups.size(); i++) {
351+
result.append(", (");
352+
firstAddr = true;
353+
for (SocketAddress address : addressGroups.get(i)) {
354+
if (!firstAddr) {
355+
result.append(", ");
356+
}
357+
result.append(address);
358+
firstAddr = false;
359+
}
360+
result.append(")");
361+
}
362+
363+
result.append("]");
364+
return result.toString();
261365
}
262366
}

0 commit comments

Comments
 (0)