From d2f2ed48da1c43e5954af4680989b5b020de3288 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Wed, 14 Jan 2026 17:44:20 +0100 Subject: [PATCH] Add CompletableFuture-based executeAsync overloads to fluent Async for simpler async composition. Provide an optional bounded default executor with configurable thread and queue limits. Keep existing Future-based API and default execution behavior unchanged. --- .../apache/hc/client5/http/fluent/Async.java | 228 +++++++++++++++++- .../fluent/FluentAsyncCompletableFuture.java | 71 ++++++ .../FluentAsyncCompletableFutureCallback.java | 89 +++++++ 3 files changed, 385 insertions(+), 3 deletions(-) create mode 100644 httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFuture.java create mode 100644 httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFutureCallback.java diff --git a/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java b/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java index a6e13eb233..87df67d606 100644 --- a/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java +++ b/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java @@ -26,21 +26,44 @@ */ package org.apache.hc.client5.http.fluent; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.BasicFuture; +import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.io.HttpClientResponseHandler; +import org.apache.hc.core5.util.Args; /** * Asynchronous executor for {@link Request}s. * * @since 4.3 */ +@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) public class Async { + private static final int DEFAULT_MAX_THREADS = + Math.max(2, Math.min(32, Runtime.getRuntime().availableProcessors() * 2)); + + private static final int DEFAULT_QUEUE_CAPACITY = 1000; + + private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0); + private Executor executor; - private java.util.concurrent.Executor concurrentExec; + private volatile java.util.concurrent.Executor concurrentExec; + private volatile ExecutorService ownedConcurrentExec; + + private int maxThreads = DEFAULT_MAX_THREADS; + private int queueCapacity = DEFAULT_QUEUE_CAPACITY; public static Async newInstance() { return new Async(); @@ -48,6 +71,79 @@ public static Async newInstance() { Async() { super(); + // Keep legacy behavior by default. + } + + public Async maxThreads(final int maxThreads) { + Args.positive(maxThreads, "maxThreads"); + this.maxThreads = maxThreads; + rebuildOwnedExecutorIfActive(); + return this; + } + + public Async queueCapacity(final int queueCapacity) { + Args.positive(queueCapacity, "queueCapacity"); + this.queueCapacity = queueCapacity; + rebuildOwnedExecutorIfActive(); + return this; + } + + /** + * Enables an owned bounded default executor for asynchronous request execution using the + * current {@code maxThreads} and {@code queueCapacity} settings. + * + * @return this instance. + * @since 5.7 + */ + public Async useDefaultExecutor() { + return useDefaultExecutor(this.maxThreads, this.queueCapacity); + } + + /** + * Enables an owned bounded default executor for asynchronous request execution. + * + * @param maxThreads maximum number of threads. + * @param queueCapacity maximum number of queued tasks. + * @return this instance. + * @since 5.7 + */ + public Async useDefaultExecutor(final int maxThreads, final int queueCapacity) { + Args.positive(maxThreads, "maxThreads"); + Args.positive(queueCapacity, "queueCapacity"); + this.maxThreads = maxThreads; + this.queueCapacity = queueCapacity; + + shutdown(); + this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity); + this.concurrentExec = this.ownedConcurrentExec; + return this; + } + + private void rebuildOwnedExecutorIfActive() { + if (this.ownedConcurrentExec != null) { + shutdown(); + this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity); + this.concurrentExec = this.ownedConcurrentExec; + } + } + + private static ExecutorService createDefaultExecutor(final int maxThreads, final int queueCapacity) { + final int instanceId = INSTANCE_COUNT.incrementAndGet(); + final DefaultThreadFactory threadFactory = new DefaultThreadFactory( + "httpclient5-fluent-async-" + instanceId + "-", + true); + + final ThreadPoolExecutor exec = new ThreadPoolExecutor( + maxThreads, + maxThreads, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueCapacity), + threadFactory, + new ThreadPoolExecutor.CallerRunsPolicy()); + + exec.allowCoreThreadTimeOut(true); + return exec; } public Async use(final Executor executor) { @@ -57,9 +153,25 @@ public Async use(final Executor executor) { public Async use(final java.util.concurrent.Executor concurrentExec) { this.concurrentExec = concurrentExec; + shutdown(); return this; } + /** + * Shuts down resources owned by this instance, if any. + *

+ * This method never attempts to shut down executors supplied via {@link #use(java.util.concurrent.Executor)}. + * + * @since 5.7 + */ + public void shutdown() { + final ExecutorService exec = this.ownedConcurrentExec; + if (exec != null) { + this.ownedConcurrentExec = null; + exec.shutdown(); + } + } + static class ExecRunnable implements Runnable { private final BasicFuture future; @@ -100,8 +212,14 @@ public Future execute( request, this.executor != null ? this.executor : Executor.newInstance(), handler); - if (this.concurrentExec != null) { - this.concurrentExec.execute(runnable); + + final java.util.concurrent.Executor exec = this.concurrentExec; + if (exec != null) { + try { + exec.execute(runnable); + } catch (final RejectedExecutionException ex) { + future.failed(ex); + } } else { final Thread t = new Thread(runnable); t.setDaemon(true); @@ -122,4 +240,108 @@ public Future execute(final Request request) { return execute(request, new ContentResponseHandler(), null); } + /** + * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes + * when the response has been fully received and converted by the given response handler. + * + * @param request the request to execute. + * @param handler the response handler. + * @param the handler result type. + * @return a {@code CompletableFuture} producing the handler result. + * @since 5.7 + */ + public CompletableFuture executeAsync(final Request request, final HttpClientResponseHandler handler) { + final CompletableFuture cf = new CompletableFuture<>(); + execute(request, handler, new FutureCallback() { + + @Override + public void completed(final T result) { + cf.complete(result); + } + + @Override + public void failed(final Exception ex) { + cf.completeExceptionally(ex); + } + + @Override + public void cancelled() { + cf.cancel(false); + } + + }); + return cf; + } + + /** + * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes + * when the response has been fully received and converted by the given response handler. The given + * callback is invoked on completion, failure, or cancellation. + * + * @param request the request to execute. + * @param handler the response handler. + * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}. + * @param the handler result type. + * @return a {@code CompletableFuture} producing the handler result. + * @since 5.7 + */ + public CompletableFuture executeAsync( + final Request request, final HttpClientResponseHandler handler, final FutureCallback callback) { + final CompletableFuture cf = new CompletableFuture<>(); + execute(request, handler, new FutureCallback() { + + @Override + public void completed(final T result) { + if (callback != null) { + callback.completed(result); + } + cf.complete(result); + } + + @Override + public void failed(final Exception ex) { + if (callback != null) { + callback.failed(ex); + } + cf.completeExceptionally(ex); + } + + @Override + public void cancelled() { + if (callback != null) { + callback.cancelled(); + } + cf.cancel(false); + } + + }); + return cf; + } + + /** + * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes + * when the response has been fully received and converted to {@link Content}. + * + * @param request the request to execute. + * @return a {@code CompletableFuture} producing the response {@code Content}. + * @since 5.7 + */ + public CompletableFuture executeAsync(final Request request) { + return executeAsync(request, new ContentResponseHandler()); + } + + /** + * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes + * when the response has been fully received and converted to {@link Content}. The given callback + * is invoked on completion, failure, or cancellation. + * + * @param request the request to execute. + * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}. + * @return a {@code CompletableFuture} producing the response {@code Content}. + * @since 5.7 + */ + public CompletableFuture executeAsync(final Request request, final FutureCallback callback) { + return executeAsync(request, new ContentResponseHandler(), callback); + } + } diff --git a/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFuture.java b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFuture.java new file mode 100644 index 0000000000..12f05eb305 --- /dev/null +++ b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFuture.java @@ -0,0 +1,71 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples.fluent; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.hc.client5.http.fluent.Async; +import org.apache.hc.client5.http.fluent.Request; + +/** + * This example demonstrates how the HttpClient fluent API can be used to execute multiple + * requests asynchronously using CompletableFuture. + */ +public class FluentAsyncCompletableFuture { + + public static void main(final String... args) throws Exception { + + final List requests = Arrays.asList( + Request.get("http://www.google.com/"), + Request.get("http://www.yahoo.com/"), + Request.get("http://www.apache.org/"), + Request.get("http://www.apple.com/") + ); + + final Async async = Async.newInstance().useDefaultExecutor(8, 500); + try { + + final CompletableFuture[] futures = requests.stream() + .map(r -> async.executeAsync(r) + .thenAccept(content -> System.out.println("Request completed: " + r)) + .exceptionally(ex -> { + System.out.println(ex.getMessage() + ": " + r); + return null; + })) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(futures).join(); + } finally { + async.shutdown(); + } + + System.out.println("Done"); + } + +} diff --git a/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFutureCallback.java b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFutureCallback.java new file mode 100644 index 0000000000..b1a2374a73 --- /dev/null +++ b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFutureCallback.java @@ -0,0 +1,89 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples.fluent; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.hc.client5.http.fluent.Async; +import org.apache.hc.client5.http.fluent.Content; +import org.apache.hc.client5.http.fluent.Request; +import org.apache.hc.core5.concurrent.FutureCallback; + +/** + * This example demonstrates how the HttpClient fluent API can be used to execute multiple + * requests asynchronously using CompletableFuture while also receiving per-request callbacks. + */ +public class FluentAsyncCompletableFutureCallback { + + public static void main(final String... args) throws Exception { + + final List requests = Arrays.asList( + Request.get("http://www.google.com/"), + Request.get("http://www.yahoo.com/"), + Request.get("http://www.apache.org/"), + Request.get("http://www.apple.com/") + ); + + final Async async = Async.newInstance().maxThreads(8).queueCapacity(500).useDefaultExecutor(); + try { + + final CompletableFuture[] futures = requests.stream() + .map(request -> async.executeAsync(request, new FutureCallback() { + + @Override + public void completed(final Content content) { + System.out.println("Callback completed: " + request); + } + + @Override + public void failed(final Exception ex) { + System.out.println("Callback failed: " + ex.getMessage() + ": " + request); + } + + @Override + public void cancelled() { + System.out.println("Callback cancelled: " + request); + } + + }).thenAccept(content -> System.out.println("Future completed: " + request)) + .exceptionally(ex -> { + System.out.println("Future failed: " + ex.getMessage() + ": " + request); + return null; + })) + .toArray(CompletableFuture[]::new); + + CompletableFuture.allOf(futures).join(); + } finally { + async.shutdown(); + } + + System.out.println("Done"); + } + +}