diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java index c41d0e979b9..8541625e0ab 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroup.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.linecorp.armeria.client.Endpoint; @@ -45,6 +46,8 @@ import com.linecorp.armeria.common.util.ShutdownHooks; import com.linecorp.armeria.internal.common.util.ReentrantShortLock; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerPort; import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.NodeList; import io.fabric8.kubernetes.api.model.Pod; @@ -60,19 +63,30 @@ import io.fabric8.kubernetes.client.WatcherException; /** - * A {@link DynamicEndpointGroup} that fetches a node IP and a node port for each Pod from Kubernetes. + * A {@link DynamicEndpointGroup} that discovers endpoints from Kubernetes pods. * - *

Note that the Kubernetes service must have a type of NodePort - * or 'LoadBalancer' - * to expose a node port for client side load balancing. + *

Two endpoint discovery modes are supported via {@link KubernetesEndpointMode}: + *

* - *

{@link KubernetesEndpointGroup} gets and watches the nodes, services and pods in the Kubernetes cluster - * and updates the endpoints, so the credentials in the {@link Config} used to create {@link KubernetesClient} - * should have permission to {@code get}, {@code list} and {@code watch} {@code services}, {@code nodes} and - * {@code pods}. Otherwise, the {@link KubernetesEndpointGroup} will not be able to fetch the endpoints. + *

{@link KubernetesEndpointGroup} watches the services and pods (and nodes in + * {@link KubernetesEndpointMode#NODE_PORT} mode) in the Kubernetes cluster and updates the endpoints, + * so the credentials in the {@link Config} used to create {@link KubernetesClient} + * should have the appropriate permissions. Otherwise, the {@link KubernetesEndpointGroup} will not be + * able to fetch the endpoints. * *

For instance, the following RBAC - * configuration is required: + * configuration is required for {@link KubernetesEndpointMode#NODE_PORT} mode: *

{@code
  * apiVersion: rbac.authorization.k8s.io/v1
  * kind: ClusterRole
@@ -84,10 +98,21 @@
  *   verbs: ["get", "list", "watch"]
  * }
* + *

For {@link KubernetesEndpointMode#POD} mode, only {@code pods} and {@code services} are required: + *

{@code
+ * apiVersion: rbac.authorization.k8s.io/v1
+ * kind: ClusterRole
+ * metadata:
+ *   name: my-cluster-role
+ * rules:
+ * - apiGroups: [""]
+ *   resources: ["pods", "services"]
+ *   verbs: ["get", "list", "watch"]
+ * }
+ * *

Example: *

{@code
- * // Create a KubernetesEndpointGroup that fetches the endpoints of the 'my-service' service in the 'default'
- * // namespace. The Kubernetes client will be created with the default configuration in the $HOME/.kube/config.
+ * // NODE_PORT mode (default): uses nodeIP:nodePort
  * KubernetesClient kubernetesClient = new KubernetesClientBuilder().build();
  * KubernetesEndpointGroup
  *   .builder(kubernetesClient)
@@ -95,17 +120,13 @@
  *   .serviceName("my-service")
  *   .build();
  *
- * // If you want to use a custom configuration, you can create a KubernetesEndpointGroup as follows:
- * // The custom configuration would be useful when you want to access Kubernetes from outside the cluster.
- * Config config =
- *   new ConfigBuilder()
- *     .withMasterUrl("https://my-k8s-master")
- *     .withOauthToken("my-token")
- *     .build();
+ * // POD mode: uses podIP:containerPort for true client-side load balancing
  * KubernetesEndpointGroup
- *   .builder(config)
- *   .namespace("my-namespace")
+ *   .builder(kubernetesClient)
+ *   .namespace("default")
  *   .serviceName("my-service")
+ *   .mode(KubernetesEndpointMode.POD)
+ *   .portName("http")
  *   .build();
  * }
*/ @@ -224,13 +245,20 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) { @Nullable private volatile Watch podWatch; + private final KubernetesEndpointMode mode; + + // NODE_PORT mode maps private final Map podToNode = new NonBlockingHashMap<>(); private final Map nodeToIp = new NonBlockingHashMap<>(); @Nullable - private volatile Service service; - @Nullable private volatile Integer nodePort; + // POD mode map + private final Map podToEndpoint = new NonBlockingHashMap<>(); + + @Nullable + private volatile Service service; + private final ReentrantShortLock schedulerLock = new ReentrantShortLock(); @GuardedBy("schedulerLock") @Nullable @@ -250,7 +278,8 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) { KubernetesEndpointGroup(KubernetesClient client, @Nullable String namespace, String serviceName, @Nullable String portName, Function nodeIpExtractor, - boolean autoClose, EndpointSelectionStrategy selectionStrategy, + boolean autoClose, KubernetesEndpointMode mode, + EndpointSelectionStrategy selectionStrategy, boolean allowEmptyEndpoints, long selectionTimeoutMillis, long maxWatchAgeMillis) { super(selectionStrategy, allowEmptyEndpoints, selectionTimeoutMillis); this.client = client; @@ -259,6 +288,7 @@ public static KubernetesEndpointGroupBuilder builder(Config kubeConfig) { this.portName = portName; this.nodeIpExtractor = nodeIpExtractor; this.autoClose = autoClose; + this.mode = mode; this.maxWatchAgeMillis = maxWatchAgeMillis == Long.MAX_VALUE ? 0 : maxWatchAgeMillis; executeJob(() -> start(true)); } @@ -293,12 +323,12 @@ private boolean doStart(boolean initial) { updateId++; nodeToIp.clear(); podToNode.clear(); + podToEndpoint.clear(); } finally { updateLock.unlock(); } final Service service; - final NodeList nodes; final PodList pods; try { logger.info("[{}/{}] Fetching the service...", namespace, serviceName); @@ -317,28 +347,54 @@ private boolean doStart(boolean initial) { String.format("[%s/%s] NodePort not found.", namespace, serviceName)); } - logger.info("[{}/{}] Fetching the nodes ...", namespace, serviceName); - nodes = client.nodes().list(); - for (Node node : nodes.getItems()) { - updateNode(Action.ADDED, node); - } - final Map selector = service.getSpec().getSelector(); - logger.info("[{}/{}] Fetching the pods with the selector: {}", namespace, serviceName, selector); - if (namespace == null) { - pods = client.pods().withLabels(selector).list(); - } else { - pods = client.pods().inNamespace(namespace).withLabels(selector).list(); - } - for (Pod pod : pods.getItems()) { - updatePod(Action.ADDED, pod); - } - // Initialize the endpoints. - maybeUpdateEndpoints(); + switch (mode) { + case NODE_PORT: + logger.info("[{}/{}] Fetching the nodes ...", namespace, serviceName); + final NodeList nodes = client.nodes().list(); + for (Node node : nodes.getItems()) { + updateNode(Action.ADDED, node); + } + // Node watcher is started after pod fetching below. + // Save the resource version for the node watcher. + final String nodeResourceVersion = nodes.getMetadata().getResourceVersion(); + + logger.info("[{}/{}] Fetching the pods with the selector: {}", + namespace, serviceName, selector); + if (namespace == null) { + pods = client.pods().withLabels(selector).list(); + } else { + pods = client.pods().inNamespace(namespace).withLabels(selector).list(); + } + for (Pod pod : pods.getItems()) { + updatePod(Action.ADDED, pod); + } + maybeUpdateEndpoints(); + + watchService(service.getMetadata().getResourceVersion()); + watchNode(updateId, nodeResourceVersion); + watchPod(updateId, pods.getMetadata().getResourceVersion()); + break; + case POD: + // POD mode: skip node fetch/watch + logger.info("[{}/{}] Fetching the pods with the selector: {}", + namespace, serviceName, selector); + if (namespace == null) { + pods = client.pods().withLabels(selector).list(); + } else { + pods = client.pods().inNamespace(namespace).withLabels(selector).list(); + } + for (Pod pod : pods.getItems()) { + updatePod(Action.ADDED, pod); + } + maybeUpdateEndpoints(); - watchService(service.getMetadata().getResourceVersion()); - watchNode(updateId, nodes.getMetadata().getResourceVersion()); - watchPod(updateId, pods.getMetadata().getResourceVersion()); + watchService(service.getMetadata().getResourceVersion()); + watchPod(updateId, pods.getMetadata().getResourceVersion()); + break; + default: + throw new Error("unknown mode: " + mode); + } } catch (Exception e) { logger.warn("[{}/{}] Failed to start {}. (initial: {})", namespace, serviceName, this, initial, e); if (initial) { @@ -435,6 +491,13 @@ public void onClose() { } private boolean updateService(Service service) { + this.service = service; + + if (mode == KubernetesEndpointMode.POD) { + // In POD mode, we only need the service for its selector. No nodePort needed. + return true; + } + final List ports = service.getSpec().getPorts(); final Integer nodePort0 = ports.stream() @@ -455,7 +518,6 @@ private boolean updateService(Service service) { } return false; } - this.service = service; nodePort = nodePort0; return true; } @@ -516,14 +578,29 @@ private boolean updatePod(Action action, Pod resource) { return false; } final String podName = resource.getMetadata().getName(); + if (podName == null) { + logger.debug("[{}/{}] Pod name is null.", namespace, serviceName); + return false; + } + + switch (mode) { + case NODE_PORT: + return updatePodForNodePortMode(action, podName, resource); + case POD: + return updatePodForPodMode(action, podName, resource); + default: + throw new Error("unknown mode: " + mode); + } + } + + private boolean updatePodForNodePortMode(Action action, String podName, Pod resource) { final String nodeName = resource.getSpec().getNodeName(); logger.debug("[{}/{}] Pod event received. action: {}, pod: {}, node: {}, resource version: {}", namespace, serviceName, action, podName, nodeName, resource.getMetadata().getResourceVersion()); - if (podName == null || nodeName == null) { - logger.debug("[{}/{}] Pod or node name is null. pod: {}, node: {}", - namespace, serviceName, podName, nodeName); + if (nodeName == null) { + logger.debug("[{}/{}] Node name is null. pod: {}", namespace, serviceName, podName); return false; } @@ -540,6 +617,67 @@ private boolean updatePod(Action action, Pod resource) { return true; } + private boolean updatePodForPodMode(Action action, String podName, Pod resource) { + logger.debug("[{}/{}] Pod event received (POD mode). action: {}, pod: {}, resource version: {}", + namespace, serviceName, action, podName, + resource.getMetadata().getResourceVersion()); + + switch (action) { + case ADDED: + case MODIFIED: + final Endpoint endpoint = extractPodEndpoint(resource); + if (endpoint != null) { + podToEndpoint.put(podName, endpoint); + } else { + podToEndpoint.remove(podName); + } + break; + case DELETED: + podToEndpoint.remove(podName); + break; + default: + } + return true; + } + + @Nullable + private Endpoint extractPodEndpoint(Pod pod) { + final String podIp = pod.getStatus() != null ? pod.getStatus().getPodIP() : null; + if (podIp == null) { + // Pod is likely in a pending state. This is expected and not an error. + return null; + } + final Integer containerPort = findContainerPort(pod); + if (containerPort == null) { + logger.warn("[{}/{}] No matching container port found in pod: {}", + namespace, serviceName, pod.getMetadata().getName()); + return null; + } + return Endpoint.of(podIp, containerPort); + } + + @Nullable + private Integer findContainerPort(Pod pod) { + if (pod.getSpec() == null || pod.getSpec().getContainers() == null) { + return null; + } + for (Container container : pod.getSpec().getContainers()) { + if (container.getPorts() == null) { + continue; + } + for (ContainerPort port : container.getPorts()) { + if (portName == null) { + // If no portName is specified, return the first container port. + return port.getContainerPort(); + } + if (portName.equals(port.getName())) { + return port.getContainerPort(); + } + } + } + return null; + } + private void watchNode(long updateId, String resourceVersion) { logger.info("[{}/{}] Start the node watcher... (resource version: {})", namespace, serviceName, resourceVersion); @@ -684,28 +822,38 @@ private void maybeUpdateEndpoints() { return; } - if (nodeToIp.isEmpty()) { - // No event received for the nodes yet. - return; - } + final List endpoints; + switch (mode) { + case NODE_PORT: + if (nodeToIp.isEmpty()) { + // No event received for the nodes yet. + return; + } - if (podToNode.isEmpty()) { - // No event received for the pods yet. - return; - } + if (podToNode.isEmpty()) { + // No event received for the pods yet. + return; + } - assert nodePort != null; - final List endpoints = - podToNode.values().stream() - .map(nodeName -> { - final String nodeIp = nodeToIp.get(nodeName); - if (nodeIp == null) { - return null; - } - return Endpoint.of(nodeIp, nodePort); - }) - .filter(Objects::nonNull) - .collect(toImmutableList()); + assert nodePort != null; + endpoints = + podToNode.values().stream() + .map(nodeName -> { + final String nodeIp = nodeToIp.get(nodeName); + if (nodeIp == null) { + return null; + } + return Endpoint.of(nodeIp, nodePort); + }) + .filter(Objects::nonNull) + .collect(toImmutableList()); + break; + case POD: + endpoints = ImmutableList.copyOf(podToEndpoint.values()); + break; + default: + throw new Error("unknown mode: " + mode); + } setEndpoints(endpoints); } diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java index 0b307232a38..c95816a8f6a 100644 --- a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupBuilder.java @@ -33,6 +33,7 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; +import io.fabric8.kubernetes.api.model.ContainerPort; import io.fabric8.kubernetes.api.model.Node; import io.fabric8.kubernetes.api.model.NodeAddress; import io.fabric8.kubernetes.client.KubernetesClient; @@ -70,6 +71,8 @@ public final class KubernetesEndpointGroupBuilder private Function nodeIpExtractor = toNodeIpExtractor(DEFAULT_NODE_ADDRESS_FILTER); + private KubernetesEndpointMode mode = KubernetesEndpointMode.NODE_PORT; + private long maxWatchAgeMillis = DEFAULT_MAX_WATCH_AGE_MILLIS; KubernetesEndpointGroupBuilder(KubernetesClient kubernetesClient, boolean autoClose) { @@ -98,20 +101,39 @@ public KubernetesEndpointGroupBuilder serviceName(String serviceName) { } /** - * Sets the name of the port - * from which NodePort - * should be fetched from. If not set, the first node port will be used. + * Sets the port name used to select the target port. + * + *

In {@link KubernetesEndpointMode#NODE_PORT} mode, the port name is matched against the + * ServicePort + * name to determine the + * NodePort. + * If not set, the first node port will be used. + * + *

In {@link KubernetesEndpointMode#POD} mode, the port name is matched against the + * {@link ContainerPort#getName()} to determine the container port. + * If not set, the first container port will be used. */ public KubernetesEndpointGroupBuilder portName(String portName) { this.portName = requireNonNull(portName, "portName"); return this; } + /** + * Sets the {@link KubernetesEndpointMode} for endpoint discovery. + * If unspecified, {@link KubernetesEndpointMode#NODE_PORT} is used. + */ + public KubernetesEndpointGroupBuilder mode(KubernetesEndpointMode mode) { + this.mode = requireNonNull(mode, "mode"); + return this; + } + /** * Sets the {@link Predicate} to filter the addresses * of a Kubernetes node. * The first selected {@link NodeAddress} of a node will be used to create the {@link Endpoint}. * If unspecified, the default is to select an {@code InternalIP} address that is not empty. + * + *

This option is ignored when {@link KubernetesEndpointMode#POD} is used. */ public KubernetesEndpointGroupBuilder nodeAddressFilter(Predicate nodeAddressFilter) { requireNonNull(nodeAddressFilter, "nodeAddressFilter"); @@ -126,6 +148,8 @@ public KubernetesEndpointGroupBuilder nodeAddressFilter(PredicateNote that this method is mutually exclusive with {@link #nodeAddressFilter(Predicate)}. If both * methods are called, the last one will take precedence. + * + *

This option is ignored when {@link KubernetesEndpointMode#POD} is used. */ public KubernetesEndpointGroupBuilder nodeIpExtractor( Function nodeIpExtractor) { @@ -176,7 +200,7 @@ public KubernetesEndpointGroupBuilder maxWatchAge(Duration maxWatchAge) { public KubernetesEndpointGroup build() { checkState(serviceName != null, "serviceName not set"); return new KubernetesEndpointGroup(kubernetesClient, namespace, serviceName, portName, - nodeIpExtractor, autoClose, + nodeIpExtractor, autoClose, mode, selectionStrategy, shouldAllowEmptyEndpoints(), selectionTimeoutMillis(), maxWatchAgeMillis); } diff --git a/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointMode.java b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointMode.java new file mode 100644 index 00000000000..b831f0ae394 --- /dev/null +++ b/kubernetes/src/main/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointMode.java @@ -0,0 +1,45 @@ +/* + * Copyright 2026 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.kubernetes.endpoints; + +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * Specifies how {@link KubernetesEndpointGroup} discovers endpoints from Kubernetes. + */ +@UnstableApi +public enum KubernetesEndpointMode { + + /** + * Uses {@code nodeIP:nodePort} for endpoints. This is the default mode that relies on kube-proxy + * for traffic routing. The Kubernetes service must have a type of + * NodePort + * or LoadBalancer. + * + *

This mode requires RBAC permissions for {@code pods}, {@code services}, and {@code nodes}. + */ + NODE_PORT, + + /** + * Uses {@code podIP:containerPort} for endpoints. This mode enables true client-side load balancing + * by connecting directly to pod IPs, bypassing kube-proxy. + * + *

This mode is intended for Armeria clients running inside the Kubernetes cluster and requires + * RBAC permissions for {@code pods} and {@code services} only (no {@code nodes} permission needed). + */ + POD +} diff --git a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java index 736015cfba5..6e5ad65b053 100644 --- a/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java +++ b/kubernetes/src/test/java/com/linecorp/armeria/client/kubernetes/endpoints/KubernetesEndpointGroupMockServerTest.java @@ -35,6 +35,7 @@ import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.ContainerPort; import io.fabric8.kubernetes.api.model.ContainerPortBuilder; import io.fabric8.kubernetes.api.model.LabelSelector; import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; @@ -50,6 +51,8 @@ import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.PodStatus; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; import io.fabric8.kubernetes.api.model.PodTemplateSpec; import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder; import io.fabric8.kubernetes.api.model.Service; @@ -377,6 +380,193 @@ void shouldUseAnnotationsToGetNodeIp() { } } + @Test + void podModeBasic() { + // In POD mode, endpoints are podIP:containerPort (no nodes needed). + final Deployment deployment = newDeployment(); + final Service service = newService(null, "nginx", false); + // Set ClusterIP type since NodePort is not required in POD mode. + service.getSpec().setType("ClusterIP"); + + final Pod pod1 = newPodWithIp(deployment.getSpec().getTemplate(), "pod-1", "10.0.0.1"); + final Pod pod2 = newPodWithIp(deployment.getSpec().getTemplate(), "pod-2", "10.0.0.2"); + + client.pods().resource(pod1).create(); + client.pods().resource(pod2).create(); + client.apps().deployments().resource(deployment).create(); + client.services().resource(service).create(); + + try (KubernetesEndpointGroup endpointGroup = + KubernetesEndpointGroup.builder(client, false) + .serviceName("nginx-service") + .mode(KubernetesEndpointMode.POD) + .build()) { + endpointGroup.whenReady().join(); + await().untilAsserted(() -> { + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("10.0.0.1", 8080), + Endpoint.of("10.0.0.2", 8080)); + }); + + // Add a new pod + final Pod pod3 = newPodWithIp(deployment.getSpec().getTemplate(), "pod-3", "10.0.0.3"); + client.pods().resource(pod3).create(); + await().untilAsserted(() -> { + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("10.0.0.1", 8080), + Endpoint.of("10.0.0.2", 8080), + Endpoint.of("10.0.0.3", 8080)); + }); + + // Remove a pod + client.pods().resource(pod1).delete(); + await().untilAsserted(() -> { + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("10.0.0.2", 8080), + Endpoint.of("10.0.0.3", 8080)); + }); + } + } + + @Test + void podModeWithPortName() { + // In POD mode, portName matches ContainerPort.name. + final PodTemplateSpec template = newPodTemplateWithNamedPorts("nginx"); + final Deployment deployment = newDeployment(); + // Replace the template with one that has named ports. + deployment.getSpec().setTemplate(template); + + final Service service = newService(null, "nginx", false); + service.getSpec().setType("ClusterIP"); + + final Pod pod1 = newPodWithIpAndNamedPorts("pod-1", "10.0.0.1", "nginx"); + client.pods().resource(pod1).create(); + client.apps().deployments().resource(deployment).create(); + client.services().resource(service).create(); + + // Select the "https" port + try (KubernetesEndpointGroup endpointGroup = + KubernetesEndpointGroup.builder(client, false) + .serviceName("nginx-service") + .mode(KubernetesEndpointMode.POD) + .portName("https") + .build()) { + endpointGroup.whenReady().join(); + await().untilAsserted(() -> { + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("10.0.0.1", 8443)); + }); + } + + // Select the "http" port + try (KubernetesEndpointGroup endpointGroup = + KubernetesEndpointGroup.builder(client, false) + .serviceName("nginx-service") + .mode(KubernetesEndpointMode.POD) + .portName("http") + .build()) { + endpointGroup.whenReady().join(); + await().untilAsserted(() -> { + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("10.0.0.1", 8080)); + }); + } + } + + @Test + void podModeSkipsPodWithNullPodIp() { + // A pending pod without a podIP should be excluded. + final Deployment deployment = newDeployment(); + final Service service = newService(null, "nginx", false); + service.getSpec().setType("ClusterIP"); + + final Pod podWithIp = newPodWithIp(deployment.getSpec().getTemplate(), "pod-ready", "10.0.0.1"); + // Pod without podIP (pending state) + final Pod podWithoutIp = newPod(deployment.getSpec().getTemplate(), "node-1"); + + client.pods().resource(podWithIp).create(); + client.pods().resource(podWithoutIp).create(); + client.apps().deployments().resource(deployment).create(); + client.services().resource(service).create(); + + try (KubernetesEndpointGroup endpointGroup = + KubernetesEndpointGroup.builder(client, false) + .serviceName("nginx-service") + .mode(KubernetesEndpointMode.POD) + .build()) { + endpointGroup.whenReady().join(); + await().untilAsserted(() -> { + assertThat(endpointGroup.endpoints()).containsExactlyInAnyOrder( + Endpoint.of("10.0.0.1", 8080)); + }); + } + } + + private static Pod newPodWithIp(PodTemplateSpec template, String podName, String podIp) { + final PodSpec spec = template.getSpec() + .toBuilder() + .withNodeName("dummy-node") + .build(); + final ObjectMeta metadata = template.getMetadata() + .toBuilder() + .withName(podName) + .build(); + final PodStatus status = new PodStatusBuilder() + .withPodIP(podIp) + .build(); + return new PodBuilder() + .withMetadata(metadata) + .withSpec(spec) + .withStatus(status) + .build(); + } + + private static PodTemplateSpec newPodTemplateWithNamedPorts(String selectorName) { + final ObjectMeta metadata = new ObjectMetaBuilder() + .withLabels(ImmutableMap.of("app", selectorName)) + .build(); + final ContainerPort httpPort = new ContainerPortBuilder() + .withName("http") + .withContainerPort(8080) + .build(); + final ContainerPort httpsPort = new ContainerPortBuilder() + .withName("https") + .withContainerPort(8443) + .build(); + final Container container = new ContainerBuilder() + .withName("nginx") + .withImage("nginx:1.14.2") + .withPorts(httpPort, httpsPort) + .build(); + final PodSpec spec = new PodSpecBuilder() + .withContainers(container) + .build(); + return new PodTemplateSpecBuilder() + .withMetadata(metadata) + .withSpec(spec) + .build(); + } + + private static Pod newPodWithIpAndNamedPorts(String podName, String podIp, String selectorName) { + final PodTemplateSpec template = newPodTemplateWithNamedPorts(selectorName); + final PodSpec spec = template.getSpec() + .toBuilder() + .withNodeName("dummy-node") + .build(); + final ObjectMeta metadata = template.getMetadata() + .toBuilder() + .withName(podName) + .build(); + final PodStatus status = new PodStatusBuilder() + .withPodIP(podIp) + .build(); + return new PodBuilder() + .withMetadata(metadata) + .withSpec(spec) + .withStatus(status) + .build(); + } + private static Node newNode(String ip, String type) { final NodeAddress nodeAddress = new NodeAddressBuilder() .withType(type)