DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Jakarta Security and REST in the Cloud: Part 2 Getting to Know the Basics
  • Jakarta Security and REST in the Cloud Part 1: Hello World
  • Hybris Multi-Tenant System Using REST Webservices
  • How To Build Web Service Using Spring Boot 2.x

Trending

  • It’s Not About Control — It’s About Collaboration Between Architecture and Security
  • Mastering Fluent Bit: Installing and Configuring Fluent Bit on Kubernetes (Part 3)
  • Unlocking the Benefits of a Private API in AWS API Gateway
  • AI Meets Vector Databases: Redefining Data Retrieval in the Age of Intelligence
  1. DZone
  2. Coding
  3. Java
  4. Reactive Elasticsearch With Quarkus

Reactive Elasticsearch With Quarkus

By 
Triphon Penakov user avatar
Triphon Penakov
DZone Core CORE ·
Jan. 16, 20 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
14.7K Views

Join the DZone community and get the full member experience.

Join For Free

I’ve implemented one service with Quarkus as a main framework and Elasticsearch as a data store. During the implementation, I came up with the idea to write an article on how to bind Quarkus in a reactive manner with Elasticsearch's Java High Level REST Client.

I started to make notes about the article and separated the common Elasticsearch related code in the common library (otaibe-commons-quarkus-elasticsearch module) stored in Github. Then, it took me a few hours to assemble a simple example project (also in Github) in a way as in the Quarkus Guides page. For the moment, an Elasticsearch guide is missing there.

Let's continue with a more detailed explanation of how to connect Quarkus with Elasticsearch.

Creating a Quarkus project

Shell
 




xxxxxxxxxx
1


 
1
mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \
2
    -DprojectGroupId=org.otaibe.quarkus.elasticsearch.example \
3
    -DprojectArtifactId=otaibe-quarkus-elasticsearch-example \
4
    -DclassName="org.otaibe.quarkus.elasticsearch.example.web.controller.FruitResource" \
5
    -Dpath="/fruits" \
6
    -Dextensions="resteasy-jackson,elasticsearch-rest-client"



You amy also like: Build a Java REST API With Quarkus.

Maven Settings

As you can see an elasticsearch-rest-client is present in Quarkus; however, this is an Elasticsearch Java Low Level REST Client. If we want to use Elasticsearch Java High Level REST Client, we simply have to add it as a dependency in the pom.xml file:

XML
 




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>org.elasticsearch.client</groupId>
3
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
4
    <version>7.4.0</version>
5
</dependency>



Please, make sure that the version of Elasticsearch Java Low Level REST Client matches the Elasticsearch Java High Level REST Client.

Since we are using the Elasticsearch in a reactive way,  I prefer to use a Project Reactor. We have to add the BOM in the dependency management section:

XML
 




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>io.projectreactor</groupId>
3
    <artifactId>reactor-bom</artifactId>
4
    <version>Dysprosium-SR2</version>
5
    <type>pom</type>
6
    <scope>import</scope>
7
</dependency>



We also have to add reactor-core as a dependency:

XML
 




xxxxxxxxxx
1


 
1
<dependency>
2
    <groupId>io.projectreactor</groupId>
3
    <artifactId>reactor-core</artifactId>
4
</dependency>



I’ve separated the common code in a library, so we should add this library to our example project. For this purpose, we will use Jitpack. It is an awesome service. You just have to point the right way to your Github project, and it will build an artifact for it. Here is the way how I am using it:

XML
 




xxxxxxxxxx
1
15


 
1
<dependency>
2
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
3
    <artifactId>otaibe-commons-quarkus-core</artifactId>
4
    <version>elasticsearch-example.02</version>
5
</dependency>
6
<dependency>
7
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
8
    <artifactId>otaibe-commons-quarkus-elasticsearch</artifactId>
9
    <version>elasticsearch-example.02</version>
10
</dependency>
11
<dependency>
12
    <groupId>com.github.tpenakov.otaibe-commons-quarkus</groupId>
13
    <artifactId>otaibe-commons-quarkus-rest</artifactId>
14
    <version>elasticsearch-example.02</version>
15
</dependency>



Start Elasticsearch Through Docker

Also, we should have Elastisearch started. The easiest way to do this is to run it through Docker:

Shell
 




xxxxxxxxxx
1


 
1
docker run -it --rm=true --name elasticsearch_quarkus_test \
2
    -p 11027:9200 -p 11028:9300 \
3
    -e "discovery.type=single-node" \
4
    docker.elastic.co/elasticsearch/elasticsearch:7.4.0



Connecting to Elasticsearch

Let's start with connecting our service to Elasticsearch — the implementation in the example project is simple — so it will listen to the Quarkus startup and shutdown events and init or terminate the connections:

Java
 




xxxxxxxxxx
1
30


 
1
package org.otaibe.quarkus.elasticsearch.example.service;
2

          
3
import io.quarkus.runtime.ShutdownEvent;
4
import io.quarkus.runtime.StartupEvent;
5
import lombok.Getter;
6
import lombok.Setter;
7
import lombok.extern.slf4j.Slf4j;
8
import org.otaibe.commons.quarkus.elasticsearch.client.service.AbstractElasticsearchService;
9

          
10
import javax.enterprise.context.ApplicationScoped;
11
import javax.enterprise.event.Observes;
12

          
13
@ApplicationScoped
14
@Getter
15
@Setter
16
@Slf4j
17
public class ElasticsearchService extends AbstractElasticsearchService {
18

          
19
    public void init(@Observes StartupEvent event) {
20
        log.info("init started");
21
        super.init();
22
        log.info("init completed");
23
    }
24

          
25
    public void shutdown(@Observes ShutdownEvent event) {
26
        log.info("shutdown started");
27
        super.shutdown();
28
        log.info("shutdown completed");
29
    }
30
}



The actual job of connecting to the Elasticsearch is done in the AbstractElasticsearchService:

Java
 




xxxxxxxxxx
1
31


 
1
public abstract class AbstractElasticsearchService {
2
    @ConfigProperty(name = "service.elastic-search.hosts")
3
    String[] hosts;
4
    @ConfigProperty(name = "service.elastic-search.num-threads", defaultValue = "10")
5
    Optional<Integer> numThreads;
6

          
7
    private RestHighLevelClient restClient;
8
    private Sniffer sniffer;
9

          
10
    @PostConstruct
11
    public void init() {
12
        log.info("init started");
13
        List<HttpHost> httpHosts = Arrays.stream(hosts)
14
                .map(s -> StringUtils.split(s, ':'))
15
                .map(strings -> new HttpHost(strings[0], Integer.valueOf(strings[1])))
16
                .collect(Collectors.toList());
17
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
18
        getNumThreads().ifPresent(integer ->
19
                builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultIOReactorConfig(
20
                        IOReactorConfig
21
                                .custom()
22
                                .setIoThreadCount(integer)
23
                                .build())
24
                ));
25

          
26
        restClient = new RestHighLevelClient(builder);
27
        sniffer = Sniffer.builder(getRestClient().getLowLevelClient()).build();
28
        log.info("init completed");
29
    }
30

          
31
}



As you can see, the connection here is done in the way suggested in the Elasticsearch documentation. My implementation, it depends on two config properties:

Properties files
 




xxxxxxxxxx
1


 
1
service.elastic-search.hosts=localhost:11027



This is the Elasticsearch connection string after starting it from Docker.

The second optional property is:

Properties files
 




xxxxxxxxxx
1


 
1
service.elastic-search.num-threads



This is the number of threads needed for the Client.

Creating POJO

Now, let’s create our domain object (Fruit):

Java
 




xxxxxxxxxx
1
29


 
1
package org.otaibe.quarkus.elasticsearch.example.domain;
2

          
3
import com.fasterxml.jackson.annotation.JsonProperty;
4
import lombok.AllArgsConstructor;
5
import lombok.Data;
6
import lombok.NoArgsConstructor;
7

          
8
@Data
9
@NoArgsConstructor
10
@AllArgsConstructor(staticName = "of")
11
public class Fruit {
12

          
13
    public static final String ID = "id";
14
    public static final String EXT_REF_ID = "ext_ref_id";
15
    public static final String NAME = "name";
16
    public static final String DESCRIPTION = "description";
17
    public static final String VERSION = "version";
18

          
19
    @JsonProperty(ID)
20
    public String id;
21
    @JsonProperty(EXT_REF_ID)
22
    public String extRefId;
23
    @JsonProperty(NAME)
24
    public String name;
25
    @JsonProperty(DESCRIPTION)
26
    public String description;
27
    @JsonProperty(VERSION)
28
    public Long version;
29
}



Creating and Implementing DAO

Creating the Index

Let's create FruitDaoImpl. It is a high-level class built to fill in the AbstractElasticsearchReactiveDaoImplementation and implement the required business logic. The other important part here is to create an index for the Fruit class:

Java
 




xxxxxxxxxx
1
15


 
1
@Override
2
protected Mono<Boolean> createIndex() {
3
    CreateIndexRequest request = new CreateIndexRequest(getTableName());
4
    Map<String, Object> mapping = new HashMap();
5
    Map<String, Object> propsMapping = new HashMap<>();
6
    propsMapping.put(Fruit.ID, getKeywordTextAnalizer());
7
    propsMapping.put(Fruit.EXT_REF_ID, getKeywordTextAnalizer());
8
    propsMapping.put(Fruit.NAME, getTextAnalizer(ENGLISH));
9
    propsMapping.put(Fruit.DESCRIPTION, getTextAnalizer(ENGLISH));
10
    propsMapping.put(Fruit.VERSION, getLongFieldType());
11
    mapping.put(PROPERTIES, propsMapping);
12
    request.mapping(mapping);
13

          
14
    return createIndex(request);
15
}



The real create index call to the Elasticsearch is implemented in the parent class (AbstractElasticsearchReactiveDaoImplementation):

Java
 




xxxxxxxxxx
1
17


 
1
protected Mono<Boolean> createIndex(CreateIndexRequest request) {
2
    return Flux.<Boolean>create(fluxSink -> getRestClient().indices().createAsync(request, RequestOptions.DEFAULT, new ActionListener<CreateIndexResponse>() {
3
        @Override
4
        public void onResponse(CreateIndexResponse createIndexResponse) {
5
            log.info("CreateIndexResponse: {}", createIndexResponse);
6
            fluxSink.next(createIndexResponse.isAcknowledged());
7
            fluxSink.complete();
8
        }
9

          
10
        @Override
11
        public void onFailure(Exception e) {
12
            log.error("unable to create index", e);
13
            fluxSink.error(new RuntimeException(e));
14
        }
15
    }))
16
            .next();
17
}



Playing With the DAO

Most of the CRUD operations are implemented in the AbstractElasticsearchReactiveDaoImplementation.

It has  save,  update,  findById, and  deleteById  public methods. It also has findByExactMatch and findByMatch protected methods. The FindBy* methods are very helpful in the descendant classes when the business logic needs tobe filled in.

The business find methods are implemented in the FruitDaoImpl class:

Java
 




xxxxxxxxxx
1
18


 
1
public Flux<Fruit> findByExternalRefId(String value) {
2
    return findByMatch(Fruit.EXT_REF_ID, value);
3
}
4

          
5
public Flux<Fruit> findByName(String value) {
6
    return findByMatch(Fruit.NAME, value);
7
}
8

          
9
public Flux<Fruit> findByDescription(String value) {
10
    return findByMatch(Fruit.NAME, value);
11
}
12

          
13
public Flux<Fruit> findByNameOrDescription(String value) {
14
    Map<String, Object> query = new HashMap<>();
15
    query.put(Fruit.NAME, value);
16
    query.put(Fruit.DESCRIPTION, value);
17
    return findByMatch(query);
18
}



Encapsulating DAO in the Service Class

FruitDaoImpl is encapsulated in the FruitService:

Java
 




xxxxxxxxxx
1
46


 
1
@ApplicationScoped
2
@Getter
3
@Setter
4
@Slf4j
5
public class FruitService {
6

          
7
    @Inject
8
    FruitDaoImpl dao;
9

          
10
    public Mono<Fruit> save(Fruit entity) {
11
        return getDao().save(entity);
12
    }
13

          
14
    public Mono<Fruit> findById(Fruit entity) {
15
        return getDao().findById(entity);
16
    }
17

          
18
    public Mono<Fruit> findById(String id) {
19
        return Mono.just(Fruit.of(id, null, null, null, null))
20
                .flatMap(entity -> findById(entity))
21
                ;
22
    }
23

          
24
    public Flux<Fruit> findByExternalRefId(String value) {
25
        return getDao().findByExternalRefId(value);
26
    }
27

          
28
    public Flux<Fruit> findByName(String value) {
29
        return getDao().findByName(value);
30
    }
31

          
32
    public Flux<Fruit> findByDescription(String value) {
33
        return getDao().findByDescription(value);
34
    }
35

          
36
    public Flux<Fruit> findByNameOrDescription(String value) {
37
        return getDao().findByNameOrDescription(value);
38
    }
39

          
40
    public Mono<Boolean> delete(Fruit entity) {
41
        return Mono.just(entity.getId())
42
                .filter(s -> StringUtils.isNotBlank(s))
43
                .flatMap(s -> getDao().deleteById(entity))
44
                .defaultIfEmpty(false);
45
    }
46
}



Testing the FruitService

The FruitServiceTests is written in order to test basic functionality. It is also used to ensure that the Fruit class fields are properly indexed and the full-text search is working as expected:

Java
 




xxxxxxxxxx
1
26


 
1
@Test
2
public void manageFruitTest() {
3
    Fruit apple = getTestUtils().createApple();
4

          
5
    Fruit apple1 = getFruitService().save(apple).block();
6
    Assertions.assertNotNull(apple1.getId());
7
    Assertions.assertTrue(apple1.getVersion() > 0);
8
    log.info("saved result: {}", getJsonUtils().toStringLazy(apple1));
9

          
10
    List<Fruit> fruitList = getFruitService().findByExternalRefId(TestUtils.EXT_REF_ID).collectList().block();
11
    Assertions.assertTrue(fruitList.size() > 0);
12

          
13
    List<Fruit> fruitList1 = getFruitService().findByNameOrDescription("bulgaria").collectList().block();
14
    Assertions.assertTrue(fruitList1.size() > 0);
15

          
16
    //Ensure that the full text search is working - it is 'Apples' in description
17
    List<Fruit> fruitList2 = getFruitService().findByDescription("apple").collectList().block();
18
    Assertions.assertTrue(fruitList2.size() > 0);
19

          
20
    //Ensure that the full text search is working - it is 'Apple' in name
21
    List<Fruit> fruitList3 = getFruitService().findByName("apples").collectList().block();
22
    Assertions.assertTrue(fruitList3.size() > 0);
23

          
24
    Boolean deleteAppleResult = getFruitService().getDao().deleteById(apple1).block();
25
    Assertions.assertTrue(deleteAppleResult);
26
}



Adding REST Endpoints

Because this is an example project, full CRUD functionality is not added as REST endpoints. Only the save and findById are added as REST endpoints. They are added in FruitResource. The methods there return CompletionStage<Response>, which ensures that there will be no blocked threads in our application.

Testing REST endpoints

FruitResourceTest is added in order to test the RESTendpoints:

Java
 




xxxxxxxxxx
1
97


 
1
package org.otaibe.quarkus.elasticsearch.example.web.controller;
2

          
3
import io.quarkus.test.junit.QuarkusTest;
4
import lombok.AccessLevel;
5
import lombok.Getter;
6
import lombok.extern.slf4j.Slf4j;
7
import org.apache.commons.lang3.StringUtils;
8
import org.eclipse.microprofile.config.inject.ConfigProperty;
9
import org.junit.jupiter.api.Assertions;
10
import org.junit.jupiter.api.Test;
11
import org.otaibe.commons.quarkus.core.utils.JsonUtils;
12
import org.otaibe.quarkus.elasticsearch.example.domain.Fruit;
13
import org.otaibe.quarkus.elasticsearch.example.service.FruitService;
14
import org.otaibe.quarkus.elasticsearch.example.utils.TestUtils;
15

          
16
import javax.inject.Inject;
17
import javax.ws.rs.core.HttpHeaders;
18
import javax.ws.rs.core.MediaType;
19
import javax.ws.rs.core.Response;
20
import javax.ws.rs.core.UriBuilder;
21
import java.net.URI;
22
import java.util.Optional;
23

          
24
import static io.restassured.RestAssured.given;
25

          
26
@QuarkusTest
27
@Getter(value = AccessLevel.PROTECTED)
28
@Slf4j
29
public class FruitResourceTest {
30

          
31
    @ConfigProperty(name = "service.http.host")
32
    Optional<URI> host;
33

          
34
    @Inject
35
    TestUtils testUtils;
36
    @Inject
37
    JsonUtils jsonUtils;
38
    @Inject
39
    FruitService service;
40

          
41
    @Test
42
    public void restEndpointsTest() {
43
        log.info("restEndpointsTest start");
44
        Fruit apple = getTestUtils().createApple();
45

          
46
        Fruit savedApple = given()
47
                .when()
48
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
49
                .body(apple)
50
                .post(getUri(FruitResource.ROOT_PATH))
51
                .then()
52
                .statusCode(200)
53
                .extract()
54
                .as(Fruit.class);
55
        String id = savedApple.getId();
56
        Assertions.assertTrue(StringUtils.isNotBlank(id));
57

          
58
        URI findByIdPath = UriBuilder.fromPath(FruitResource.ROOT_PATH)
59
                .path(id)
60
                .build();
61

          
62
        Fruit foundApple = given()
63
                .when().get(getUri(findByIdPath.getPath()).getPath())
64
                .then()
65
                .statusCode(200)
66
                .extract()
67
                .as(Fruit.class);
68

          
69
        Assertions.assertEquals(savedApple, foundApple);
70

          
71
        Boolean deleteResult = getService().delete(foundApple).block();
72
        Assertions.assertTrue(deleteResult);
73

          
74
        given()
75
                .when().get(findByIdPath.getPath())
76
                .then()
77
                .statusCode(Response.Status.NOT_FOUND.getStatusCode())
78
                ;
79

          
80
        log.info("restEndpointsTest end");
81
    }
82

          
83
    private URI getUri(String path) {
84
        return getUriBuilder(path)
85
                .build();
86
    }
87

          
88
    private UriBuilder getUriBuilder(String path) {
89
        return getHost()
90
                .map(uri -> UriBuilder.fromUri(uri))
91
                .map(uriBuilder -> uriBuilder.path(path))
92
                .orElse(UriBuilder
93
                        .fromPath(path)
94
                );
95
    }
96

          
97
}



Building a Native Executable

Before building a native executable, we have to register our Fruit domain object. The reason for this is that our FruitResource returns CompletionStage<Response>, and because of that, the actual return type is unknown for the application, so we have to register it explicitly for reflection. There are at least two methods to do that in Quarkus:

  1. Via @RegisterForReflection annotation.

  2. Via reflection-config.json.

I personally prefer the second method because the classes that you want to register might be in a third-party library, and it would be impossible to put the @RegisterForReflection there.

Now, the reflection-config.json looks like this:

JSON
 




xxxxxxxxxx
1
11


 
1
[
2
  {
3
    "name" : "org.otaibe.quarkus.elasticsearch.example.domain.Fruit",
4
    "allDeclaredConstructors" : true,
5
    "allPublicConstructors" : true,
6
    "allDeclaredMethods" : true,
7
    "allPublicMethods" : true,
8
    "allDeclaredFields" : true,
9
    "allPublicFields" : true
10
  }
11
]



The next step is to make Quarkus aware of the reflection-config.json file. You should add this line in to the native profile in your pom.xml file:

XML
 




xxxxxxxxxx
1


 
1
<quarkus.native.additional-build-args>-H:ReflectionConfigurationFiles=${project.basedir}/src/main/resources/reflection-config.json</quarkus.native.additional-build-args>



You can now build your native application:

Shell
 




xxxxxxxxxx
1


 
1
mvn clean package -Pnative



And start it:

Shell
 




xxxxxxxxxx
1


 
1
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner



The service will be available on http://localhost:11025 because that is the port is explicitly specified in application.properties.

Properties files
 




xxxxxxxxxx
1


 
1
quarkus.http.port=11025



Testing the Native Build

The FruitResourceTest expects the following optional property:

Properties files
 




xxxxxxxxxx
1


 
1
service.http.host



If it is present, the test requests will hit the specified host. If you start the native executable:

Shell
 




xxxxxxxxxx
1


 
1
./target/otaibe-quarkus-elasticsearch-example-1.0-SNAPSHOT-runner



and execute the tests/build with the following code:

Shell
 




xxxxxxxxxx
1


 
1
mvn package -D%test.service.http.host=http://localhost:11025



the tests will run against the native build.

Conclusion

I was pleasantly surprised that Elasticsearch works out of the box with Quarkus and can be compiled to native code, which combined with reactive implementation via Project Reactor, will make the footprint of the application almost insignificant.

 

Further Reading

  • Introduction to Elasticsearch and the ELK Stack, Part 1.
  • Thoughts on Quarkus.
Elasticsearch Quarkus Database Java (programming language) REST Web Protocols application Property (programming) shell Build (game engine)

Opinions expressed by DZone contributors are their own.

Related

  • Jakarta Security and REST in the Cloud: Part 2 Getting to Know the Basics
  • Jakarta Security and REST in the Cloud Part 1: Hello World
  • Hybris Multi-Tenant System Using REST Webservices
  • How To Build Web Service Using Spring Boot 2.x

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: