From fdf9b25d6f56a117bc9e1977a5857a3ec4281054 Mon Sep 17 00:00:00 2001 From: Piotr Kuglin Date: Wed, 25 Jun 2025 15:37:39 +0200 Subject: [PATCH 1/3] chore: reproduce issue with multiple endpointslices --- .github/workflows/test.yml | 2 ++ .../kuberesolver/integration/test/KuberesolverTest.java | 2 +- .../kuberesolver/integration/test/KubernetesManager.java | 5 ++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e764f22..32a998a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,6 +38,8 @@ jobs: distribution: 'temurin' - name: Setup Minikube uses: medyagh/setup-minikube@latest + with: + extra-config: 'controller-manager.max-endpoints-per-slice=2' - name: Setup Gradle uses: gradle/actions/setup-gradle@v4 - name: Run Integration Tests diff --git a/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java index 05b92b4..14a8107 100644 --- a/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java +++ b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KuberesolverTest.java @@ -51,7 +51,7 @@ void teardown() throws IOException { @DisplayName("should continuously resolve all addresses of deployment behind a service") @ParameterizedTest(name = "replicas changes = {0}") @MethodSource(value = "testCases") - void continuouslyResolveAllAddressesTest(List replicasHistory) throws IOException, InterruptedException { + void continuouslyResolveAllAddressesTest(List replicasHistory) { for (Integer replicas : replicasHistory) { log.info("Scaling server to {} replicas", replicas); final var serverIPs = manager.scaleServer(replicas); diff --git a/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java index d2ee2ce..394b731 100644 --- a/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java +++ b/integration/test/src/test/java/io/github/lothar1998/kuberesolver/integration/test/KubernetesManager.java @@ -66,11 +66,10 @@ public List awaitScaledReadyDeployment(String deploymentName, int repl .list() .getItems() .stream() - .findAny() .map(EndpointSlice::getEndpoints) - .stream() + .filter(Objects::nonNull) .flatMap(Collection::stream) - .filter(e -> e.getConditions().getReady()) + .filter(e -> e != null && e.getConditions() != null && e.getConditions().getReady()) .toList(), readyEndpoints -> (long) readyEndpoints.size() == replicasCount ); From c0c85686873b0ea0e5daee6cda346325faac174a Mon Sep 17 00:00:00 2001 From: Piotr Kuglin Date: Thu, 26 Jun 2025 14:48:05 +0200 Subject: [PATCH 2/3] feat: fix issue with having multiple endpoint slices for single service --- .../kuberesolver/KubernetesNameResolver.java | 146 +++++++++++++++--- 1 file changed, 125 insertions(+), 21 deletions(-) diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java index 6c7343b..b573065 100644 --- a/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java @@ -3,15 +3,19 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import io.github.lothar1998.kuberesolver.kubernetes.EndpointSliceWatcher; import io.github.lothar1998.kuberesolver.kubernetes.InClusterEndpointSliceWatcher; @@ -143,6 +147,8 @@ private void resolve() { */ private void watch() { watcher.watch(params.service(), new EndpointSliceWatcher.Subscriber() { + private final Map>> endpoints = new ConcurrentHashMap<>(); + @Override public void onEvent(Event event) { // watch event occurred @@ -152,21 +158,48 @@ public void onEvent(Event event) { return; } - if (event.type().equals(EventType.DELETED)) { - LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted", - new Object[]{event.endpointSlice().metadata().name()}); + if (event.endpointSlice() == null) { + LOGGER.log(Level.FINE, "No EndpointSlice found in watch event"); return; } - if (event.endpointSlice() == null) { - LOGGER.log(Level.FINE, "No EndpointSlice found in watch event"); + if (event.endpointSlice().metadata() == null || event.endpointSlice().metadata().name() == null) { + LOGGER.log(Level.FINE, "No EndpointSlice name found in watch event metadata"); + return; + } + + if (event.type().equals(EventType.DELETED)) { + LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted", + new Object[]{event.endpointSlice().metadata().name()}); + endpoints.remove(event.endpointSlice().metadata().name()); return; } LOGGER.log(Level.FINER, "Resolving addresses for service {0}", new Object[]{params.service()}); - buildAddresses(event.endpointSlice()).ifPresentOrElse(a -> listener.onAddresses(a, Attributes.EMPTY), - () -> LOGGER.log(Level.FINE, "No usable addresses found for Kubernetes service {0}", - new Object[]{params.service()})); + var endpointSliceAddresses = buildAddresses(event.endpointSlice()); + if (endpointSliceAddresses.isEmpty()) { + LOGGER.log(Level.FINE, "No usable addresses found for service {0} in EndpointSlice {1}", + new Object[]{params.service(), event.endpointSlice().metadata().name()}); + } else { + LOGGER.log(Level.FINEST, + () -> String.format( + "Resolved addresses for service %s from EndpointSlice %s: %s", + params.service(), + event.endpointSlice().metadata().name(), + addressGroupsToString(endpointSliceAddresses.get()) + )); + endpoints.put(event.endpointSlice().metadata().name(), endpointSliceAddresses.get()); + + var allAddresses = endpoints.values().stream() + .flatMap(List::stream) + .distinct() + .toList(); + + LOGGER.log(Level.FINEST, () -> String.format( + "All resolved addresses for service %s: %s", + params.service(), addressGroupsToString(allAddresses))); + listener.onAddresses(toEquivalentAddressGroups(allAddresses), Attributes.EMPTY); + } } @Override @@ -208,17 +241,28 @@ public String getServiceAuthority() { } /** - * Builds a list of gRPC {@link EquivalentAddressGroup} from the given - * {@link EndpointSlice}. + * Extracts and processes network addresses from a Kubernetes {@link EndpointSlice}. + *

+ * This method performs several key steps in the address resolution process: + *

    + *
  1. Finds the appropriate port to use from the EndpointSlice
  2. + *
  3. Filters for endpoints that are in the "ready" condition
  4. + *
  5. Maps each endpoint's IP addresses to socket addresses using the resolved port
  6. + *
+ *

+ * If no suitable port can be found or if the EndpointSlice contains no ready endpoints, + * an empty Optional will be returned. * - * @param endpointSlice the EndpointSlice to process - * @return an optional list of resolved addresses + * @param endpointSlice the Kubernetes EndpointSlice containing endpoint information + * @return an Optional containing a list of socket address sets for ready endpoints, + * or an empty Optional if no addresses could be resolved */ - private Optional> buildAddresses(EndpointSlice endpointSlice) { + private Optional>> buildAddresses(EndpointSlice endpointSlice) { return findPort(endpointSlice.ports()) .map(port -> endpointSlice.endpoints().stream() .filter(endpoint -> endpoint.conditions().isReady()) .map(endpoint -> buildAddressGroup(endpoint.addresses(), port)) + .filter(group -> !group.isEmpty()) .toList()); } @@ -246,17 +290,77 @@ private Optional findPort(List ports) { } /** - * Builds a gRPC {@link EquivalentAddressGroup} from the given addresses and - * port. + * Builds a set of socket addresses from a list of IP addresses and a port number. + * This method converts each IP address into an {@link InetSocketAddress} using the given port, + * which represents one endpoint in a Kubernetes EndpointSlice. + *

+ * The resulting set of addresses is used in the name resolution process to provide + * gRPC clients with possible connection endpoints for the target service. * - * @param addresses the list of addresses - * @param port the port number - * @return an {@link EquivalentAddressGroup} containing the resolved addresses + * @param addresses the list of IP addresses from a Kubernetes endpoint + * @param port the port number to use for all addresses + * @return a set of {@link SocketAddress} objects representing the endpoint addresses */ - private EquivalentAddressGroup buildAddressGroup(List addresses, int port) { - var socketAddresses = addresses.stream() + private Set buildAddressGroup(List addresses, int port) { + return addresses.stream() .map(address -> (SocketAddress) new InetSocketAddress(address, port)) + .collect(Collectors.toSet()); + } + + /** + * Converts a list of socket address sets into a list of {@link EquivalentAddressGroup} objects. + * Each set of socket addresses is transformed into a single {@link EquivalentAddressGroup}, + * which gRPC uses to represent a group of equivalent addresses for load balancing. + * + * @param addressGroups the list of socket address sets to convert + * @return a list of {@link EquivalentAddressGroup} objects, each representing one set of addresses + */ + private List toEquivalentAddressGroups(List> addressGroups) { + return addressGroups.stream() + .map(group -> new EquivalentAddressGroup(new ArrayList<>(group))) .toList(); - return new EquivalentAddressGroup(socketAddresses, Attributes.EMPTY); + } + + /** + * Converts a list of socket address sets into a human-readable string representation. + * The format is a nested structure like: [(addr1, addr2), (addr3), (addr4, addr5)] + * where each set of addresses is represented as a group in parentheses. + * + * @param addressGroups the list of socket address sets to convert to string + * @return a string representation of the address groups + */ + private String addressGroupsToString(List> addressGroups) { + if (addressGroups == null || addressGroups.isEmpty()) { + return "[]"; + } + + var result = new StringBuilder("["); + + result.append("("); + boolean firstAddr = true; + for (SocketAddress address : addressGroups.get(0)) { + if (!firstAddr) { + result.append(", "); + } + result.append(address); + firstAddr = false; + } + result.append(")"); + + for (int i = 1; i < addressGroups.size(); i++) { + result.append(", ("); + firstAddr = true; + for (SocketAddress address : addressGroups.get(i)) { + if (!firstAddr) { + result.append(", "); + } + result.append(address); + firstAddr = false; + } + result.append(")"); + } + + result.append("]"); + return result.toString(); } } From 1fdd44eb995ff0929b6eb47f5db614b4f6b14337 Mon Sep 17 00:00:00 2001 From: Piotr Kuglin Date: Thu, 26 Jun 2025 14:48:19 +0200 Subject: [PATCH 3/3] fix: use refresh method when starting to do not miss acquiring semaphore --- .../github/lothar1998/kuberesolver/KubernetesNameResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java index b573065..4a82ad1 100644 --- a/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java +++ b/lib/src/main/java/io/github/lothar1998/kuberesolver/KubernetesNameResolver.java @@ -120,7 +120,7 @@ public KubernetesNameResolver(Executor executor, ResolverTarget params) throws I @Override public void start(Listener listener) { this.listener = listener; - resolve(); + refresh(); } /**