ES + Spring boot的正确姿势 (ES系列三)

13,945 阅读5分钟

前言

在前边我们探讨了ES的基本概念以及根据不同的场景选择数据迁移的方案。在这一篇我们来探讨如何与Spring boot集成,以及为了平滑地从Mysql迁移到ES中我们如何”翻译SQL“。

一、Spring Boot集成ES

第一步我们就要实现Spring boot和ES集成,在Spring boot中主要有Java REST Clientspring-data-elasticsearch两种方式,这里我建议使用Elasticsearch官方提供的Java High Level REST Client来集成,也方便在生产环境中使用阿里云的ES云服务。关键的版本信息如下:

  • ES集群:7.3.0
  • ES相关依赖:7.3.0

这里有两点需要注意:

  • High Level Client能够向上兼容,例如7.3.0版本的Java High Level REST Client能确保与大于等于7.3.0版本的Elasticsearch集群通信。为了保证最大程度地使用最新版客户端的特性,推荐High Level Client版本与集群版本一致。

  • 在集成的过程中可能会踩到一些坑,因为Spring Boot的版本、ES集群的版本、High Level Client的版本之间会存在”关联关系“,所以当Demo无法正常跑起来的时候能做的就是多尝试一些High Level Client版本。

1、pom依赖

<dependency>
		<groupId>org.elasticsearch.client</groupId>
		<artifactId>elasticsearch-rest-high-level-client</artifactId>
		<version>7.3.0</version>
	</dependency>

	<dependency>
		<groupId>org.elasticsearch</groupId>
		<artifactId>elasticsearch</artifactId>
		<version>7.3.0</version>
	</dependency>

	<dependency>
		<groupId>org.elasticsearch.client</groupId>
		<artifactId>elasticsearch-rest-client</artifactId>
		<version>7.3.0</version>
	</dependency>

	<dependency>
		<groupId>org.elasticsearch.plugin</groupId>
		<artifactId>rank-eval-client</artifactId>
		<version>7.3.0</version>
	</dependency>

	<dependency>
		<groupId>org.elasticsearch.plugin</groupId>
		<artifactId>lang-mustache-client</artifactId>
		<version>7.3.0</version>
</dependency>

2、初始化客户端

@Configuration
public class EsConfig {

    @Value("${elasticsearch.host}")
    public String host;

    /**
     * 之前使用transport的接口的时候是9300端口,现在使用HighLevelClient则是9200端口
     */
    @Value("${elasticsearch.port:9200}")
    public int port;

    public static final String SCHEME = "http";

    @Value("${elasticsearch.username:admin}")
    public String username;

    @Value("${elasticsearch.authenticationPassword}")
    public String authenticationPassword;

    @Bean(name = "remoteHighLevelClient")
    public RestHighLevelClient restHighLevelClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,
                authenticationPassword));
        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, SCHEME)).
                setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setDefaultCredentialsProvider(credentialsProvider));
        return new RestHighLevelClient(builder);
    }
}

在上边的代码中需要注意username和authenticationPassword的认证信息都是在Kibana中设置的。

二、Java API

下面的代码片段均能在单元测试中正常运行,在执行下边的单元测试之前,我们先创建一个_template,大家可以选择在Kibana提供的Dev Tools里边执行。

PUT _template/hero_template
{
    "index_patterns":[
        "hero*"
    ],
    "mappings":{
        "properties":{
            "@timestamp":{
                "type":"date"
            },
            "id":{
                "type":"integer"
            },
            "name":{
                "type":"keyword"
            },
            "country":{
                "type":"keyword"
            },
            "birthday":{
                "type":"keyword"
            },
            "longevity":{
                "type":"integer"
            }
        }
    }
}

1、创建索引

@Test
public void createIndex() throws IOException {
    IndexRequest request = new IndexRequest("hero");
    request.id("1");
    Map<String, String> map = new HashMap<>();
    map.put("id", "1");
    map.put("name", "曹操");
    map.put("country", "魏");
    map.put("birthday", "公元155年");
    map.put("longevity", "65");
    request.source(map);
    IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
    long version = indexResponse.getVersion();
    assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
    assertEquals(1, version);
}

在ES中索引是我们存储、查询数据的逻辑单元,在ES7.0之后对应的是Mysql中表的概念。上边的代码我们创建了一个名为hero的索引,然后我们创建一个map作为我们插入的第一条数据,然后设置到IndexRequest请求对象中。

2、批量插入

@Test
public void bulkRequestTest() throws IOException {
    BulkRequest request = new BulkRequest();
    request.add(new IndexRequest("hero").id("2")
            .source(XContentType.JSON,"id", "2", "name", "刘备", "country", "蜀", "birthday", "公元161年", "longevity", "61"));
    request.add(new IndexRequest("hero").id("3")
            .source(XContentType.JSON,"id", "3", "name", "孙权", "country", "吴", "birthday", "公元182年", "longevity", "61"));
    request.add(new IndexRequest("hero").id("4")
            .source(XContentType.JSON,"id", "4", "name", "诸葛亮", "country", "蜀", "birthday", "公元181年", "longevity", "53"));
    request.add(new IndexRequest("hero").id("5")
            .source(XContentType.JSON,"id", "5", "name", "司马懿", "country", "魏", "birthday", "公元179年", "longevity", "72"));
    request.add(new IndexRequest("hero").id("6")
            .source(XContentType.JSON,"id", "6", "name", "荀彧", "country", "魏", "birthday", "公元163年", "longevity", "49"));
    request.add(new IndexRequest("hero").id("7")
            .source(XContentType.JSON,"id", "7", "name", "关羽", "country", "蜀", "birthday", "公元160年", "longevity", "60"));
    request.add(new IndexRequest("hero").id("8")
            .source(XContentType.JSON,"id", "8", "name", "周瑜", "country", "吴", "birthday", "公元175年",  "longevity", "35"));
    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
    assertFalse(bulkResponse.hasFailures());
}

在kibana中查询到的数据如下图

我们后边的查询、更新等等操作都是基于这里的数据。

3、更新数据

@Test
public void updateTest() throws IOException {
    Map<String, Object> jsonMap = new HashMap<>();
    jsonMap.put("country", "魏");
    UpdateRequest request = new UpdateRequest("hero", "7").doc(jsonMap);
    UpdateResponse updateResponse = client.update(request,  RequestOptions.DEFAULT);
    assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult());
}

上边的代码如果用SQL来表示就是下边这样

> update hero set country='魏' where id=7;

4、插入/更新数据

@Test
public void insertOrUpdateOne(){
    Hero hero = new Hero();
    hero.setId(5);
    hero.setName("曹丕");
    hero.setCountry("魏");
    hero.setBirthday("公元187年");
    hero.setLongevity(39);
    IndexRequest request = new IndexRequest("hero");
    request.id(hero.getId().toString());
    request.source(JSON.toJSONString(hero), XContentType.JSON);
    try {
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);   //  1
        assertEquals(DocWriteResponse.Result.UPDATED, indexResponse.getResult());
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

注意在上边代码中标注1的这行代码,是不是和前边创建索引很像?这里使用方法index()我们可以轻松的实现创建索引、插入数据、更新数据于一体,当指定的索引不存在时即创建索引,当数据不存在时就插入,数据存在时就更新。

5、删除数据

@Test
public void deleteByIdTest() throws IOException {
    DeleteRequest deleteRequest = new DeleteRequest("hero");
    deleteRequest.id("1");
    DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
    assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
}

上边我们删除了在前边创建id=1的数据,其对应的SQL如下:

> delete from hero where id=1;

当然,在ES中我们不仅仅可以使用主键来删除,我们还可以通过其他的字段条件来删除。

@Test
public void deleteByQueryRequestTest() throws IOException {
    DeleteByQueryRequest request = new DeleteByQueryRequest("hero");
    request.setConflicts("proceed");
    request.setQuery(new TermQueryBuilder("country", "吴"));
    BulkByScrollResponse bulkResponse =
            client.deleteByQuery(request, RequestOptions.DEFAULT);
    assertEquals(0, bulkResponse.getBulkFailures().size());
}

对应的SQL:

> delete from hero where country='吴';

6、复合操作

在上边的增删改都是一次只能操作一种类型,而ES还给我们提供了一次进行多种类型的操作,例如下边的代码

@Test
public void bulkDiffRequestTest() throws IOException {
    BulkRequest request = new BulkRequest();
    request.add(new DeleteRequest("hero", "3"));
    request.add(new UpdateRequest("hero", "7")
            .doc(XContentType.JSON,"longevity", "70"));
    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
    BulkItemResponse[]  bulkItemResponses = bulkResponse.getItems();
    for (BulkItemResponse item : bulkItemResponses){
        DocWriteResponse itemResponse = item.getResponse();
        switch (item.getOpType()) {
            case UPDATE:
                UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                break;
            case DELETE:
                DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
        }
        assertEquals(RestStatus.OK, item.status());
    }
}

我们使用了BulkRequest对象,将DeleteRequestUpdateRequest两种操作addBulkRequet中,然后将返回的BulkItemResponse[]数组根据不同的操作类型进行分类处理即可。当然据我所知,目前Mysql并没有类似的语法支持,如果有希望大家留言指正哈。

7、查询

到这里才是我们真正的重点,在ES里边支持多种类型的查询,例如**”精确“(和RDBMS有所区别)查询、模糊查询、相关性查询、范围查询、全文检索、分页查询、排序、聚合**等等查询功能,在Mysql中的大部分查询功能在ES中均能实现。同还允许我们选择同步、异步的方式来执行查询

单条件查询 + limit

@Test
public void selectByUserTest(){
    SearchRequest request = new SearchRequest("hero");
    SearchSourceBuilder builder = new SearchSourceBuilder();
    builder.query(new TermQueryBuilder("country", "魏"));
    // 相当于mysql里边的limit 1;
    builder.size(1);
    request.source(builder);
    try {
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        assertEquals(1, hits.length);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

上边的单元测试中,我们用user作为查询条件,并且限制返回条数,类似SQL如下

> select * from posts where country='魏' limit 1;

多条件查询 + 排序 + 分页

@Test
public void boolQueryTest(){
    SearchRequest request = new SearchRequest("hero");
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
    boolQueryBuilder.must(termQuery("country", "魏"));
    boolQueryBuilder.must(rangeQuery("longevity").gte(50));
    sourceBuilder.query(boolQueryBuilder);
    sourceBuilder.from(0).size(2);
    sourceBuilder.query(boolQueryBuilder);
    sourceBuilder.sort("longevity", SortOrder.DESC);
    request.source(sourceBuilder);
    SearchResponse response = null;
    try {
        response = client.search(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        log.error("Query by Condition execution failed: {}", e.getMessage(), e);
    }
    assert response != null;
    assertEquals(0, response.getShardFailures().length);
    SearchHit[] hits = response.getHits().getHits();
    List<Hero> herosList = new ArrayList<>(hits.length);
    for (SearchHit hit : hits) {
        herosList.add(JSON.parseObject(hit.getSourceAsString(), Hero.class));
    }
    log.info("print info: {}, size: {}", herosList.toString(), herosList.size());
}

上边的将曹魏集团的寿命50岁以上的英雄查询出来,并根据寿命从高到低排序,只截取两位英雄,其对应的sql:

> select * from hero where country='魏' and longevity >= 50 order by longevity DESC limit 2;

这里要注意,我们在ES提供的API中使用多条件查询时需要将多个条件封装到BoolQueryBuilder对象中,其支持下边几种查询类型

private static final String MUSTNOT = "mustNot";
private static final String MUST_NOT = "must_not";
private static final String FILTER = "filter";
private static final String SHOULD = "should";
private static final String MUST = "must";

具体解释参考官方文档

总结

在这部分我们先分享了如何将Spring boot和ES集成,以及最佳实践的建议——采用Java High Level REST Client来构建我们的API,然后分享了相关的依赖以及如何初始化客户端。

紧接着我们开始用High Level REST Client实现了创建索引、批量插入、更新数据、插入/更新数据、删除数据、复合操作,最后我们用两个简单的例子实现了查询数据,当然还有很多的查询例子没有展示出来,建议大家根据自己的需求,去官网查询使用的方法。

参考

Java High Level REST Client