diff --git a/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java b/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java
index a6e13eb233..87df67d606 100644
--- a/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java
+++ b/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Async.java
@@ -26,21 +26,44 @@
*/
package org.apache.hc.client5.http.fluent;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
+import org.apache.hc.core5.util.Args;
/**
* Asynchronous executor for {@link Request}s.
*
* @since 4.3
*/
+@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public class Async {
+ private static final int DEFAULT_MAX_THREADS =
+ Math.max(2, Math.min(32, Runtime.getRuntime().availableProcessors() * 2));
+
+ private static final int DEFAULT_QUEUE_CAPACITY = 1000;
+
+ private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);
+
private Executor executor;
- private java.util.concurrent.Executor concurrentExec;
+ private volatile java.util.concurrent.Executor concurrentExec;
+ private volatile ExecutorService ownedConcurrentExec;
+
+ private int maxThreads = DEFAULT_MAX_THREADS;
+ private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
public static Async newInstance() {
return new Async();
@@ -48,6 +71,79 @@ public static Async newInstance() {
Async() {
super();
+ // Keep legacy behavior by default.
+ }
+
+ public Async maxThreads(final int maxThreads) {
+ Args.positive(maxThreads, "maxThreads");
+ this.maxThreads = maxThreads;
+ rebuildOwnedExecutorIfActive();
+ return this;
+ }
+
+ public Async queueCapacity(final int queueCapacity) {
+ Args.positive(queueCapacity, "queueCapacity");
+ this.queueCapacity = queueCapacity;
+ rebuildOwnedExecutorIfActive();
+ return this;
+ }
+
+ /**
+ * Enables an owned bounded default executor for asynchronous request execution using the
+ * current {@code maxThreads} and {@code queueCapacity} settings.
+ *
+ * @return this instance.
+ * @since 5.7
+ */
+ public Async useDefaultExecutor() {
+ return useDefaultExecutor(this.maxThreads, this.queueCapacity);
+ }
+
+ /**
+ * Enables an owned bounded default executor for asynchronous request execution.
+ *
+ * @param maxThreads maximum number of threads.
+ * @param queueCapacity maximum number of queued tasks.
+ * @return this instance.
+ * @since 5.7
+ */
+ public Async useDefaultExecutor(final int maxThreads, final int queueCapacity) {
+ Args.positive(maxThreads, "maxThreads");
+ Args.positive(queueCapacity, "queueCapacity");
+ this.maxThreads = maxThreads;
+ this.queueCapacity = queueCapacity;
+
+ shutdown();
+ this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity);
+ this.concurrentExec = this.ownedConcurrentExec;
+ return this;
+ }
+
+ private void rebuildOwnedExecutorIfActive() {
+ if (this.ownedConcurrentExec != null) {
+ shutdown();
+ this.ownedConcurrentExec = createDefaultExecutor(this.maxThreads, this.queueCapacity);
+ this.concurrentExec = this.ownedConcurrentExec;
+ }
+ }
+
+ private static ExecutorService createDefaultExecutor(final int maxThreads, final int queueCapacity) {
+ final int instanceId = INSTANCE_COUNT.incrementAndGet();
+ final DefaultThreadFactory threadFactory = new DefaultThreadFactory(
+ "httpclient5-fluent-async-" + instanceId + "-",
+ true);
+
+ final ThreadPoolExecutor exec = new ThreadPoolExecutor(
+ maxThreads,
+ maxThreads,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueCapacity),
+ threadFactory,
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ exec.allowCoreThreadTimeOut(true);
+ return exec;
}
public Async use(final Executor executor) {
@@ -57,9 +153,25 @@ public Async use(final Executor executor) {
public Async use(final java.util.concurrent.Executor concurrentExec) {
this.concurrentExec = concurrentExec;
+ shutdown();
return this;
}
+ /**
+ * Shuts down resources owned by this instance, if any.
+ *
+ * This method never attempts to shut down executors supplied via {@link #use(java.util.concurrent.Executor)}.
+ *
+ * @since 5.7
+ */
+ public void shutdown() {
+ final ExecutorService exec = this.ownedConcurrentExec;
+ if (exec != null) {
+ this.ownedConcurrentExec = null;
+ exec.shutdown();
+ }
+ }
+
static class ExecRunnable implements Runnable {
private final BasicFuture future;
@@ -100,8 +212,14 @@ public Future execute(
request,
this.executor != null ? this.executor : Executor.newInstance(),
handler);
- if (this.concurrentExec != null) {
- this.concurrentExec.execute(runnable);
+
+ final java.util.concurrent.Executor exec = this.concurrentExec;
+ if (exec != null) {
+ try {
+ exec.execute(runnable);
+ } catch (final RejectedExecutionException ex) {
+ future.failed(ex);
+ }
} else {
final Thread t = new Thread(runnable);
t.setDaemon(true);
@@ -122,4 +240,108 @@ public Future execute(final Request request) {
return execute(request, new ContentResponseHandler(), null);
}
+ /**
+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
+ * when the response has been fully received and converted by the given response handler.
+ *
+ * @param request the request to execute.
+ * @param handler the response handler.
+ * @param the handler result type.
+ * @return a {@code CompletableFuture} producing the handler result.
+ * @since 5.7
+ */
+ public CompletableFuture executeAsync(final Request request, final HttpClientResponseHandler handler) {
+ final CompletableFuture cf = new CompletableFuture<>();
+ execute(request, handler, new FutureCallback() {
+
+ @Override
+ public void completed(final T result) {
+ cf.complete(result);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ cf.completeExceptionally(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ cf.cancel(false);
+ }
+
+ });
+ return cf;
+ }
+
+ /**
+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
+ * when the response has been fully received and converted by the given response handler. The given
+ * callback is invoked on completion, failure, or cancellation.
+ *
+ * @param request the request to execute.
+ * @param handler the response handler.
+ * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}.
+ * @param the handler result type.
+ * @return a {@code CompletableFuture} producing the handler result.
+ * @since 5.7
+ */
+ public CompletableFuture executeAsync(
+ final Request request, final HttpClientResponseHandler handler, final FutureCallback callback) {
+ final CompletableFuture cf = new CompletableFuture<>();
+ execute(request, handler, new FutureCallback() {
+
+ @Override
+ public void completed(final T result) {
+ if (callback != null) {
+ callback.completed(result);
+ }
+ cf.complete(result);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ if (callback != null) {
+ callback.failed(ex);
+ }
+ cf.completeExceptionally(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ if (callback != null) {
+ callback.cancelled();
+ }
+ cf.cancel(false);
+ }
+
+ });
+ return cf;
+ }
+
+ /**
+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
+ * when the response has been fully received and converted to {@link Content}.
+ *
+ * @param request the request to execute.
+ * @return a {@code CompletableFuture} producing the response {@code Content}.
+ * @since 5.7
+ */
+ public CompletableFuture executeAsync(final Request request) {
+ return executeAsync(request, new ContentResponseHandler());
+ }
+
+ /**
+ * Executes the given request asynchronously and returns a {@link CompletableFuture} that completes
+ * when the response has been fully received and converted to {@link Content}. The given callback
+ * is invoked on completion, failure, or cancellation.
+ *
+ * @param request the request to execute.
+ * @param callback the callback to invoke on completion, failure, or cancellation; may be {@code null}.
+ * @return a {@code CompletableFuture} producing the response {@code Content}.
+ * @since 5.7
+ */
+ public CompletableFuture executeAsync(final Request request, final FutureCallback callback) {
+ return executeAsync(request, new ContentResponseHandler(), callback);
+ }
+
}
diff --git a/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFuture.java b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFuture.java
new file mode 100644
index 0000000000..12f05eb305
--- /dev/null
+++ b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFuture.java
@@ -0,0 +1,71 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.examples.fluent;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hc.client5.http.fluent.Async;
+import org.apache.hc.client5.http.fluent.Request;
+
+/**
+ * This example demonstrates how the HttpClient fluent API can be used to execute multiple
+ * requests asynchronously using CompletableFuture.
+ */
+public class FluentAsyncCompletableFuture {
+
+ public static void main(final String... args) throws Exception {
+
+ final List requests = Arrays.asList(
+ Request.get("http://www.google.com/"),
+ Request.get("http://www.yahoo.com/"),
+ Request.get("http://www.apache.org/"),
+ Request.get("http://www.apple.com/")
+ );
+
+ final Async async = Async.newInstance().useDefaultExecutor(8, 500);
+ try {
+
+ final CompletableFuture>[] futures = requests.stream()
+ .map(r -> async.executeAsync(r)
+ .thenAccept(content -> System.out.println("Request completed: " + r))
+ .exceptionally(ex -> {
+ System.out.println(ex.getMessage() + ": " + r);
+ return null;
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ CompletableFuture.allOf(futures).join();
+ } finally {
+ async.shutdown();
+ }
+
+ System.out.println("Done");
+ }
+
+}
diff --git a/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFutureCallback.java b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFutureCallback.java
new file mode 100644
index 0000000000..b1a2374a73
--- /dev/null
+++ b/httpclient5-fluent/src/test/java/org/apache/hc/client5/http/examples/fluent/FluentAsyncCompletableFutureCallback.java
@@ -0,0 +1,89 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.examples.fluent;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hc.client5.http.fluent.Async;
+import org.apache.hc.client5.http.fluent.Content;
+import org.apache.hc.client5.http.fluent.Request;
+import org.apache.hc.core5.concurrent.FutureCallback;
+
+/**
+ * This example demonstrates how the HttpClient fluent API can be used to execute multiple
+ * requests asynchronously using CompletableFuture while also receiving per-request callbacks.
+ */
+public class FluentAsyncCompletableFutureCallback {
+
+ public static void main(final String... args) throws Exception {
+
+ final List requests = Arrays.asList(
+ Request.get("http://www.google.com/"),
+ Request.get("http://www.yahoo.com/"),
+ Request.get("http://www.apache.org/"),
+ Request.get("http://www.apple.com/")
+ );
+
+ final Async async = Async.newInstance().maxThreads(8).queueCapacity(500).useDefaultExecutor();
+ try {
+
+ final CompletableFuture>[] futures = requests.stream()
+ .map(request -> async.executeAsync(request, new FutureCallback() {
+
+ @Override
+ public void completed(final Content content) {
+ System.out.println("Callback completed: " + request);
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ System.out.println("Callback failed: " + ex.getMessage() + ": " + request);
+ }
+
+ @Override
+ public void cancelled() {
+ System.out.println("Callback cancelled: " + request);
+ }
+
+ }).thenAccept(content -> System.out.println("Future completed: " + request))
+ .exceptionally(ex -> {
+ System.out.println("Future failed: " + ex.getMessage() + ": " + request);
+ return null;
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ CompletableFuture.allOf(futures).join();
+ } finally {
+ async.shutdown();
+ }
+
+ System.out.println("Done");
+ }
+
+}