diff --git a/util/src/main/java/io/kubernetes/client/util/WaitUtils.java b/util/src/main/java/io/kubernetes/client/util/WaitUtils.java new file mode 100644 index 0000000000..bdf34e1027 --- /dev/null +++ b/util/src/main/java/io/kubernetes/client/util/WaitUtils.java @@ -0,0 +1,368 @@ +/* +Copyright 2024 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client.util; + +import io.kubernetes.client.common.KubernetesObject; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.util.generic.GenericKubernetesApi; +import io.kubernetes.client.util.generic.KubernetesApiResponse; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * Utilities for waiting on Kubernetes resources to reach desired conditions. + * Provides fabric8-style waitUntilReady and waitUntilCondition functionality. + * + *

Example usage: + *

{@code
+ * // Wait for a Pod to be ready
+ * V1Pod pod = WaitUtils.waitUntilReady(
+ *     () -> coreV1Api.readNamespacedPod("my-pod", "default").execute(),
+ *     Duration.ofMinutes(5),
+ *     Duration.ofSeconds(1)
+ * );
+ *
+ * // Wait for a custom condition
+ * V1Pod runningPod = WaitUtils.waitUntilCondition(
+ *     () -> coreV1Api.readNamespacedPod("my-pod", "default").execute(),
+ *     pod -> "Running".equals(pod.getStatus().getPhase()),
+ *     Duration.ofMinutes(5),
+ *     Duration.ofSeconds(1)
+ * );
+ *
+ * // Using GenericKubernetesApi
+ * V1Pod readyPod = WaitUtils.waitUntilReady(
+ *     podApi,
+ *     "default",
+ *     "my-pod",
+ *     Duration.ofMinutes(5)
+ * );
+ * }
+ */ +public class WaitUtils { + + private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofSeconds(1); + + private WaitUtils() { + // Utility class + } + + /** + * Waits until the resource is ready using the Readiness utility. + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param timeout maximum time to wait + * @param pollInterval time between checks + * @return the ready resource + * @throws TimeoutException if the resource doesn't become ready within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static T waitUntilReady( + Supplier resourceSupplier, + Duration timeout, + Duration pollInterval) throws TimeoutException, InterruptedException { + return waitUntilCondition(resourceSupplier, Readiness::isReady, timeout, pollInterval); + } + + /** + * Waits until the resource is ready using the Readiness utility with default poll interval. + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param timeout maximum time to wait + * @return the ready resource + * @throws TimeoutException if the resource doesn't become ready within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static T waitUntilReady( + Supplier resourceSupplier, + Duration timeout) throws TimeoutException, InterruptedException { + return waitUntilReady(resourceSupplier, timeout, DEFAULT_POLL_INTERVAL); + } + + /** + * Waits until the resource satisfies the given condition. + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param condition predicate that tests if the condition is met + * @param timeout maximum time to wait + * @param pollInterval time between checks + * @return the resource that satisfies the condition + * @throws TimeoutException if the condition is not met within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static T waitUntilCondition( + Supplier resourceSupplier, + Predicate condition, + Duration timeout, + Duration pollInterval) throws TimeoutException, InterruptedException { + + Objects.requireNonNull(resourceSupplier, "resourceSupplier must not be null"); + Objects.requireNonNull(condition, "condition must not be null"); + Objects.requireNonNull(timeout, "timeout must not be null"); + Objects.requireNonNull(pollInterval, "pollInterval must not be null"); + + CompletableFuture future = new CompletableFuture<>(); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + try { + ScheduledFuture scheduledTask = executor.scheduleAtFixedRate(() -> { + try { + T resource = resourceSupplier.get(); + if (resource != null && condition.test(resource)) { + future.complete(resource); + } + } catch (Exception e) { + // Log but don't fail - resource might not exist yet + // We'll keep polling until timeout + } + }, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS); + + try { + return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } finally { + scheduledTask.cancel(true); + } + } catch (ExecutionException e) { + throw new RuntimeException("Unexpected error while waiting", e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + /** + * Waits until the resource satisfies the given condition with default poll interval. + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param condition predicate that tests if the condition is met + * @param timeout maximum time to wait + * @return the resource that satisfies the condition + * @throws TimeoutException if the condition is not met within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static T waitUntilCondition( + Supplier resourceSupplier, + Predicate condition, + Duration timeout) throws TimeoutException, InterruptedException { + return waitUntilCondition(resourceSupplier, condition, timeout, DEFAULT_POLL_INTERVAL); + } + + /** + * Waits until the resource is deleted (returns null or throws 404). + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param timeout maximum time to wait + * @param pollInterval time between checks + * @throws TimeoutException if the resource is not deleted within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static void waitUntilDeleted( + Supplier resourceSupplier, + Duration timeout, + Duration pollInterval) throws TimeoutException, InterruptedException { + + Objects.requireNonNull(resourceSupplier, "resourceSupplier must not be null"); + Objects.requireNonNull(timeout, "timeout must not be null"); + Objects.requireNonNull(pollInterval, "pollInterval must not be null"); + + CompletableFuture future = new CompletableFuture<>(); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + try { + ScheduledFuture scheduledTask = executor.scheduleAtFixedRate(() -> { + try { + T resource = resourceSupplier.get(); + if (resource == null) { + future.complete(null); + } + } catch (Exception e) { + // Treat any exception as deleted (typically 404) + future.complete(null); + } + }, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS); + + try { + future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } finally { + scheduledTask.cancel(true); + } + } catch (ExecutionException e) { + throw new RuntimeException("Unexpected error while waiting for deletion", e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + /** + * Waits until the resource is deleted with default poll interval. + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param timeout maximum time to wait + * @throws TimeoutException if the resource is not deleted within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static void waitUntilDeleted( + Supplier resourceSupplier, + Duration timeout) throws TimeoutException, InterruptedException { + waitUntilDeleted(resourceSupplier, timeout, DEFAULT_POLL_INTERVAL); + } + + /** + * Waits until a resource is ready using GenericKubernetesApi. + * + * @param the resource type + * @param the list type + * @param api the GenericKubernetesApi + * @param namespace the namespace (null for cluster-scoped resources) + * @param name the resource name + * @param timeout maximum time to wait + * @return the ready resource + * @throws TimeoutException if the resource doesn't become ready within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static T waitUntilReady( + GenericKubernetesApi api, + String namespace, + String name, + Duration timeout) throws TimeoutException, InterruptedException { + + return waitUntilCondition( + () -> { + KubernetesApiResponse response = namespace != null + ? api.get(namespace, name) + : api.get(name); + return response.isSuccess() ? response.getObject() : null; + }, + Readiness::isReady, + timeout, + DEFAULT_POLL_INTERVAL + ); + } + + /** + * Waits until a resource satisfies the given condition using GenericKubernetesApi. + * + * @param the resource type + * @param the list type + * @param api the GenericKubernetesApi + * @param namespace the namespace (null for cluster-scoped resources) + * @param name the resource name + * @param condition predicate that tests if the condition is met + * @param timeout maximum time to wait + * @return the resource that satisfies the condition + * @throws TimeoutException if the condition is not met within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static T waitUntilCondition( + GenericKubernetesApi api, + String namespace, + String name, + Predicate condition, + Duration timeout) throws TimeoutException, InterruptedException { + + return waitUntilCondition( + () -> { + KubernetesApiResponse response = namespace != null + ? api.get(namespace, name) + : api.get(name); + return response.isSuccess() ? response.getObject() : null; + }, + condition, + timeout, + DEFAULT_POLL_INTERVAL + ); + } + + /** + * Waits until a cluster-scoped resource is ready. + * + * @param the resource type + * @param the list type + * @param api the GenericKubernetesApi + * @param name the resource name + * @param timeout maximum time to wait + * @return the ready resource + * @throws TimeoutException if the resource doesn't become ready within the timeout + * @throws InterruptedException if the thread is interrupted + */ + public static T waitUntilReady( + GenericKubernetesApi api, + String name, + Duration timeout) throws TimeoutException, InterruptedException { + return waitUntilReady(api, null, name, timeout); + } + + /** + * Asynchronously waits until the resource is ready. + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param timeout maximum time to wait + * @param pollInterval time between checks + * @return CompletableFuture that completes with the ready resource + */ + public static CompletableFuture waitUntilReadyAsync( + Supplier resourceSupplier, + Duration timeout, + Duration pollInterval) { + return waitUntilConditionAsync(resourceSupplier, Readiness::isReady, timeout, pollInterval); + } + + /** + * Asynchronously waits until the resource satisfies the given condition. + * + * @param the resource type + * @param resourceSupplier supplier that fetches the current state of the resource + * @param condition predicate that tests if the condition is met + * @param timeout maximum time to wait + * @param pollInterval time between checks + * @return CompletableFuture that completes with the resource or exceptionally with TimeoutException + */ + public static CompletableFuture waitUntilConditionAsync( + Supplier resourceSupplier, + Predicate condition, + Duration timeout, + Duration pollInterval) { + + CompletableFuture result = new CompletableFuture<>(); + + CompletableFuture.runAsync(() -> { + try { + T resource = waitUntilCondition(resourceSupplier, condition, timeout, pollInterval); + result.complete(resource); + } catch (TimeoutException e) { + result.completeExceptionally(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + result.completeExceptionally(e); + } + }); + + return result; + } +} diff --git a/util/src/test/java/io/kubernetes/client/util/WaitUtilsTest.java b/util/src/test/java/io/kubernetes/client/util/WaitUtilsTest.java new file mode 100644 index 0000000000..b5b9f3a016 --- /dev/null +++ b/util/src/test/java/io/kubernetes/client/util/WaitUtilsTest.java @@ -0,0 +1,263 @@ +/* +Copyright 2024 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client.util; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodCondition; +import io.kubernetes.client.openapi.models.V1PodStatus; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.junit.jupiter.api.Test; + +class WaitUtilsTest { + + @Test + void waitUntilReady_alreadyReady_returnsImmediately() throws Exception { + V1Pod readyPod = createReadyPod(); + Supplier supplier = () -> readyPod; + + V1Pod result = WaitUtils.waitUntilReady(supplier, Duration.ofSeconds(5)); + + assertThat(result).isEqualTo(readyPod); + } + + @Test + void waitUntilReady_becomesReady_returnsWhenReady() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + V1Pod readyPod = createReadyPod(); + V1Pod notReadyPod = createNotReadyPod(); + + Supplier supplier = () -> { + if (callCount.incrementAndGet() >= 3) { + return readyPod; + } + return notReadyPod; + }; + + V1Pod result = WaitUtils.waitUntilReady(supplier, Duration.ofSeconds(5), Duration.ofMillis(100)); + + assertThat(result).isEqualTo(readyPod); + assertThat(callCount.get()).isGreaterThanOrEqualTo(3); + } + + @Test + void waitUntilReady_timesOut_throwsTimeoutException() { + V1Pod notReadyPod = createNotReadyPod(); + Supplier supplier = () -> notReadyPod; + + assertThatThrownBy(() -> + WaitUtils.waitUntilReady(supplier, Duration.ofMillis(200), Duration.ofMillis(50))) + .isInstanceOf(TimeoutException.class); + } + + @Test + void waitUntilCondition_conditionMet_returnsResource() throws Exception { + V1Pod pod = new V1Pod() + .metadata(new V1ObjectMeta().name("test")) + .status(new V1PodStatus().phase("Running")); + + V1Pod result = WaitUtils.waitUntilCondition( + () -> pod, + p -> "Running".equals(p.getStatus().getPhase()), + Duration.ofSeconds(5), + Duration.ofMillis(100)); + + assertThat(result).isEqualTo(pod); + } + + @Test + void waitUntilCondition_becomesTrue_returnsWhenConditionMet() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + + Supplier supplier = () -> { + V1Pod pod = new V1Pod().metadata(new V1ObjectMeta().name("test")); + if (callCount.incrementAndGet() >= 3) { + pod.setStatus(new V1PodStatus().phase("Running")); + } else { + pod.setStatus(new V1PodStatus().phase("Pending")); + } + return pod; + }; + + V1Pod result = WaitUtils.waitUntilCondition( + supplier, + p -> "Running".equals(p.getStatus().getPhase()), + Duration.ofSeconds(5), + Duration.ofMillis(100)); + + assertThat(result.getStatus().getPhase()).isEqualTo("Running"); + } + + @Test + void waitUntilCondition_nullResource_keepsPolling() { + AtomicInteger callCount = new AtomicInteger(0); + Supplier supplier = () -> { + callCount.incrementAndGet(); + return null; + }; + + assertThatThrownBy(() -> + WaitUtils.waitUntilCondition( + supplier, + p -> true, + Duration.ofMillis(300), + Duration.ofMillis(50))) + .isInstanceOf(TimeoutException.class); + + assertThat(callCount.get()).isGreaterThan(1); + } + + @Test + void waitUntilCondition_supplierThrows_keepsPolling() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + V1Pod readyPod = createReadyPod(); + + Supplier supplier = () -> { + if (callCount.incrementAndGet() < 3) { + throw new RuntimeException("Simulated error"); + } + return readyPod; + }; + + V1Pod result = WaitUtils.waitUntilCondition( + supplier, + p -> true, + Duration.ofSeconds(5), + Duration.ofMillis(100)); + + assertThat(result).isEqualTo(readyPod); + } + + @Test + void waitUntilDeleted_resourceGone_returns() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + + Supplier supplier = () -> { + if (callCount.incrementAndGet() >= 3) { + return null; + } + return createReadyPod(); + }; + + WaitUtils.waitUntilDeleted(supplier, Duration.ofSeconds(5), Duration.ofMillis(100)); + + assertThat(callCount.get()).isGreaterThanOrEqualTo(3); + } + + @Test + void waitUntilDeleted_supplierThrows_treatsAsDeleted() throws Exception { + AtomicInteger callCount = new AtomicInteger(0); + + Supplier supplier = () -> { + if (callCount.incrementAndGet() >= 2) { + throw new RuntimeException("Not found"); + } + return createReadyPod(); + }; + + WaitUtils.waitUntilDeleted(supplier, Duration.ofSeconds(5), Duration.ofMillis(100)); + + assertThat(callCount.get()).isGreaterThanOrEqualTo(2); + } + + @Test + void waitUntilDeleted_notDeleted_timesOut() { + Supplier supplier = () -> createReadyPod(); + + assertThatThrownBy(() -> + WaitUtils.waitUntilDeleted(supplier, Duration.ofMillis(200), Duration.ofMillis(50))) + .isInstanceOf(TimeoutException.class); + } + + @Test + void waitUntilReadyAsync_returnsCompletableFuture() throws Exception { + V1Pod readyPod = createReadyPod(); + Supplier supplier = () -> readyPod; + + CompletableFuture future = WaitUtils.waitUntilReadyAsync( + supplier, Duration.ofSeconds(5), Duration.ofMillis(100)); + + assertThat(future).isNotNull(); + assertThat(future.get()).isEqualTo(readyPod); + } + + @Test + void waitUntilConditionAsync_returnsCompletableFuture() throws Exception { + V1Pod pod = new V1Pod() + .metadata(new V1ObjectMeta().name("test")) + .status(new V1PodStatus().phase("Running")); + + CompletableFuture future = WaitUtils.waitUntilConditionAsync( + () -> pod, + p -> "Running".equals(p.getStatus().getPhase()), + Duration.ofSeconds(5), + Duration.ofMillis(100)); + + assertThat(future).isNotNull(); + assertThat(future.get()).isEqualTo(pod); + } + + @Test + void waitUntilCondition_nullSupplier_throwsNullPointerException() { + assertThatThrownBy(() -> + WaitUtils.waitUntilCondition(null, p -> true, Duration.ofSeconds(1), Duration.ofMillis(100))) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("resourceSupplier"); + } + + @Test + void waitUntilCondition_nullCondition_throwsNullPointerException() { + assertThatThrownBy(() -> + WaitUtils.waitUntilCondition(() -> createReadyPod(), null, Duration.ofSeconds(1), Duration.ofMillis(100))) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("condition"); + } + + @Test + void waitUntilCondition_nullTimeout_throwsNullPointerException() { + assertThatThrownBy(() -> + WaitUtils.waitUntilCondition(() -> createReadyPod(), p -> true, null, Duration.ofMillis(100))) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("timeout"); + } + + private V1Pod createReadyPod() { + return new V1Pod() + .metadata(new V1ObjectMeta().name("test-pod")) + .status(new V1PodStatus() + .phase("Running") + .conditions(List.of( + new V1PodCondition() + .type("Ready") + .status("True")))); + } + + private V1Pod createNotReadyPod() { + return new V1Pod() + .metadata(new V1ObjectMeta().name("test-pod")) + .status(new V1PodStatus() + .phase("Pending") + .conditions(List.of( + new V1PodCondition() + .type("Ready") + .status("False")))); + } +}