Introduction
Today I would like to share with you how to wrap an Elasticsearch client
response into Java’s CompletableFuture
in different clients: the Transport
Client
and the Java High Level REST
Client
in Elasticsearch 7. The motivation of doing this is quite simple: being able to
perform tasks concurrently in the “standard” Java way. For example, it’s useful
when you want to send multiple requests to Elasticsearch concurrently, when
you want to request multiple Elasticsearch clusters, or when you want to create
your client interacting with Elasticsearch and exposing asynchronous APIs to
callers.
After reading this article, you will understand:
- How to wrap transport client calls into
CompletableFuture
? - How to choose executor (thread-pool)?
- How to handle timeout?
- How to do these in Java High Level REST Client?
- How to go further from here?
This article is written in Java 11. Now, let’s get started!
Actually, Elasticsearch has a pull-request to “Replace custom Future implementations by CompletableFuture”, made by Yannick Welsch. But this feature won’t be available before v8.0.0.
Action Listener Wrapping
The easiest way to wrap the response into CompletableFuture
is to do it via
ActionListener
. Elasticsearch Transport Client provides an asynchronous API
which accepts an action listener as input. Therefore, you can control the
CompletableFuture
completion inside the listener:
var cf = new CompletableFuture<ClusterStateResponse>(); // 1
client
.admin()
.cluster()
.prepareState()
.execute(ActionListener.wrap(cf::complete, cf::completeExceptionally)); // 2
var stateFuture = cf.thenApply(ClusterStateResponse::getState); // 3
In this code snippet,
- Completable future
cf
is created at the beginning, but it’s incomplete. - Completable future
cf
is wrapped into an action listener. When the cluster state response is received, the completable future will be completed viacf::complete
. If any error occurs, then the completable future will be completed exceptionally viacf::completeExceptionally
. How the action listener is hooked into the Elasticsearch client is delegated to Elasticsearch transport client. - You can manipulate the response
ClusterStateResponse
as you desired in the down-stream. You can retrieve the cluster state viathenApply
; you can add a log; you can return the completable future as method output, etc. It’s up to you.
Custom Action Listener
A sightly different approach is to create a custom action listener as an anonymous class:
var cf = new CompletableFuture<ClusterStateResponse>();
client
.admin()
.cluster()
.prepareState()
.execute(new ActionListener<>() {
@Override
public void onResponse(ClusterStateResponse response) {
cf.complete(response); // 1
}
@Override
public void onFailure(Exception e) {
cf.completeExceptionally(e); // 2
}
});
Compared to the previous approach, we have more flexibility about the completable
future completion. We can transform the response, use variables from outside of
the anonymous class, etc. This is valid for both point “1” and “2”. Personally, I
prefer the previous approach because it’s less verbose and makes the code easier
to read. About the transformation, we can do it in the completable future via
thenApply
or other thenXxx
methods anyway.
Thread Pool
I’m neither an Elasticsearch expert nor a Java concurrency expert so please leave me a comment if you think the analysis below is wrong.
Since communicating with Elasticsearch is an I/O blocking operation, it means
that you may want to treat the response carefully and avoid blocking the common
fork-join pool, used by default by CompletableFuture
. The two approaches
mentioned above via ActionListener
should handle it well because our code
didn’t block the method waiting for the response of Elasticsearch before returning
it. A 3rd approach to do so is to use the CompletableFuture.supplyAsnyc(...)
method with an executor (thread pool):
// I don't think you need this, `client` has a thread pool already
var cf = CompletableFuture.supplyAsync(
() -> client.admin().cluster().prepareState().get(), executor);
But in my opinion, this is a waste because Elasticsearch client already uses a separate thread pool to handle its requests (source code). Using yet another thread pool is not a good idea.
Timeout
Setting the timeout is important. It allows you to control the actions and avoid
waiting too long. When using Elasticsearch Java clients in a synchronous way,
you can use the ActionFututure#actionGet(...)
methods to set the timeout. But
you cannot do this anymore with CompletableFuture
… With CompletableFuture
,
there are mainly two ways to set the timeouts: get(...)
or orTimeout(...)
after Java 9.
In Java 8+, you can use get(long timeout, TimeUnit unit)
. Use it to
set the timeout at the end of the completion stages (end of
completable future). But the problems of this approach are: 1) when you have
multiple dependents on the downstream of Elasticsearch response, you need to set
them all, but actually only the Elasticsearch response timeout matters. 2) the
timeout is not accurate: it does not measure the response time of the
Elasticsearch call, but the entire completable future instead. 3) the method
throws a checked exception, so you have to handle the exception or re-throw it.
var response = cf.get(3000, TimeUnit.MILLISECONDS);
In Java 9+, you can use orTimeout(long timeout, TimeUnit unit)
. Use it to
complete exceptionally this future with TimeoutException
if this
future is not completed before the given timeout. It also solves the problems
mentioned above: we set the timeout once; we can control the future of the
response explicitly instead of controlling their dependents; the timeout is
accurate; how to handle exception will depend on your choices.
var responseFuture = cf.orTimeout(3000, TimeUnit.MILLISECONDS);
If you were using Java Time or Elasticsearch Time Value, here are two blocks showing you how to use them for controlling the timeout:
// java.time.Duration
var timeout = Duration.ofSeconds(3);
var responseFuture = cf.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS);
// org.elasticsearch.common.unit.TimeValue
var timeout = TimeValue.timeValueSeconds(3);
var responseFuture = cf.orTimeout(timeout.millis(), TimeUnit.MILLISECONDS);
Java High Level REST Client
As you may know, Java transport client was deprecated in Elasticsearch 7.0.0 in favour of the Java High Level REST Client and will be removed in Elasticsearch 8.0. So I believe it is worth to mention how to do the same thing in the new REST client as well, i.e. creating a completable future from the Elasticsearch response. The code looks very similar to the previous ones:
var cf = new CompletableFuture<ClusterHealthResponse>();
restClient
.cluster()
.healthAsync(
new ClusterHealthRequest(),
RequestOptions.DEFAULT,
ActionListener.wrap(cf::complete, cf::completeExceptionally));
// TODO: implement your logic about completable future here
Going Further
How to go further from here?
- To know more about Elasticsearch Transport Client, read Elasticsearch official
documentation: “Transport Client (7.x)”
https://www.elastic.co/guide/en/elasticsearch/client/java-api/7.x/transport-client.html - To know more about Elasticsearch Java High Level REST Client, read
Elasticsearch official documentation: “Java High Level REST Client (7.x)”
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.x/java-rest-high.html
If you want to see the source code of this article, you can visit them on GitHub as CompletableFutureTransportClientTest and CompletableFutureRestClientIT.
Conclusion
In this article, we discussed how to wrap a response into
CompletableFuture
from Elasticsearch Transport Client or Java High
Level REST Client via action listener or completable future’s supplyAsync
. We
saw different ways to control the timeout for the completable future. And how to
go further by reading the official documentation of Elasticsearch.
Interested to know more? You can subscribe to the feed of my blog, follow me
on Twitter or
GitHub. Hope you enjoy this article, see you the next time!