前言
在前边我们探讨了ES的基本概念以及根据不同的场景选择数据迁移的方案。在这一篇我们来探讨如何与Spring boot集成,以及为了平滑地从Mysql
迁移到ES中我们如何”翻译SQL
“。
一、Spring Boot集成ES
第一步我们就要实现Spring boot和ES集成,在Spring boot中主要有Java REST Client
、spring-data-elasticsearch
两种方式,这里我建议使用Elasticsearch官方提供的Java High Level REST Client
来集成,也方便在生产环境中使用阿里云的ES云服务。关键的版本信息如下:
- ES集群:7.3.0
- ES相关依赖:7.3.0
这里有两点需要注意:
-
High Level Client
能够向上兼容,例如7.3.0版本的Java High Level REST Client
能确保与大于等于7.3.0版本的Elasticsearch
集群通信。为了保证最大程度地使用最新版客户端的特性,推荐High Level Client版本与集群版本一致。 -
在集成的过程中可能会踩到一些坑,因为
Spring Boot
的版本、ES
集群的版本、High Level Client
的版本之间会存在”关联关系“,所以当Demo无法正常跑起来的时候能做的就是多尝试一些High Level Client
版本。
1、pom依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>rank-eval-client</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>lang-mustache-client</artifactId>
<version>7.3.0</version>
</dependency>
2、初始化客户端
@Configuration
public class EsConfig {
@Value("${elasticsearch.host}")
public String host;
/**
* 之前使用transport的接口的时候是9300端口,现在使用HighLevelClient则是9200端口
*/
@Value("${elasticsearch.port:9200}")
public int port;
public static final String SCHEME = "http";
@Value("${elasticsearch.username:admin}")
public String username;
@Value("${elasticsearch.authenticationPassword}")
public String authenticationPassword;
@Bean(name = "remoteHighLevelClient")
public RestHighLevelClient restHighLevelClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,
authenticationPassword));
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, SCHEME)).
setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider));
return new RestHighLevelClient(builder);
}
}
在上边的代码中需要注意username和authenticationPassword的认证信息都是在Kibana
中设置的。
二、Java API
下面的代码片段均能在单元测试中正常运行,在执行下边的单元测试之前,我们先创建一个_template
,大家可以选择在Kibana提供的Dev Tools
里边执行。
PUT _template/hero_template
{
"index_patterns":[
"hero*"
],
"mappings":{
"properties":{
"@timestamp":{
"type":"date"
},
"id":{
"type":"integer"
},
"name":{
"type":"keyword"
},
"country":{
"type":"keyword"
},
"birthday":{
"type":"keyword"
},
"longevity":{
"type":"integer"
}
}
}
}
1、创建索引
@Test
public void createIndex() throws IOException {
IndexRequest request = new IndexRequest("hero");
request.id("1");
Map<String, String> map = new HashMap<>();
map.put("id", "1");
map.put("name", "曹操");
map.put("country", "魏");
map.put("birthday", "公元155年");
map.put("longevity", "65");
request.source(map);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
long version = indexResponse.getVersion();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
assertEquals(1, version);
}
在ES中索引是我们存储、查询数据的逻辑单元,在ES7.0之后对应的是Mysql中表的概念。上边的代码我们创建了一个名为hero
的索引,然后我们创建一个map作为我们插入的第一条数据,然后设置到IndexRequest
请求对象中。
2、批量插入
@Test
public void bulkRequestTest() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("hero").id("2")
.source(XContentType.JSON,"id", "2", "name", "刘备", "country", "蜀", "birthday", "公元161年", "longevity", "61"));
request.add(new IndexRequest("hero").id("3")
.source(XContentType.JSON,"id", "3", "name", "孙权", "country", "吴", "birthday", "公元182年", "longevity", "61"));
request.add(new IndexRequest("hero").id("4")
.source(XContentType.JSON,"id", "4", "name", "诸葛亮", "country", "蜀", "birthday", "公元181年", "longevity", "53"));
request.add(new IndexRequest("hero").id("5")
.source(XContentType.JSON,"id", "5", "name", "司马懿", "country", "魏", "birthday", "公元179年", "longevity", "72"));
request.add(new IndexRequest("hero").id("6")
.source(XContentType.JSON,"id", "6", "name", "荀彧", "country", "魏", "birthday", "公元163年", "longevity", "49"));
request.add(new IndexRequest("hero").id("7")
.source(XContentType.JSON,"id", "7", "name", "关羽", "country", "蜀", "birthday", "公元160年", "longevity", "60"));
request.add(new IndexRequest("hero").id("8")
.source(XContentType.JSON,"id", "8", "name", "周瑜", "country", "吴", "birthday", "公元175年", "longevity", "35"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
assertFalse(bulkResponse.hasFailures());
}
在kibana中查询到的数据如下图
我们后边的查询、更新等等操作都是基于这里的数据。
3、更新数据
@Test
public void updateTest() throws IOException {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("country", "魏");
UpdateRequest request = new UpdateRequest("hero", "7").doc(jsonMap);
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult());
}
上边的代码如果用SQL来表示就是下边这样
> update hero set country='魏' where id=7;
4、插入/更新数据
@Test
public void insertOrUpdateOne(){
Hero hero = new Hero();
hero.setId(5);
hero.setName("曹丕");
hero.setCountry("魏");
hero.setBirthday("公元187年");
hero.setLongevity(39);
IndexRequest request = new IndexRequest("hero");
request.id(hero.getId().toString());
request.source(JSON.toJSONString(hero), XContentType.JSON);
try {
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); // 1
assertEquals(DocWriteResponse.Result.UPDATED, indexResponse.getResult());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
注意在上边代码中标注1的这行代码,是不是和前边创建索引很像?这里使用方法index()我们可以轻松的实现创建索引、插入数据、更新数据于一体,当指定的索引不存在时即创建索引,当数据不存在时就插入,数据存在时就更新。
5、删除数据
@Test
public void deleteByIdTest() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest("hero");
deleteRequest.id("1");
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
}
上边我们删除了在前边创建id=1的数据,其对应的SQL如下:
> delete from hero where id=1;
当然,在ES中我们不仅仅可以使用主键来删除,我们还可以通过其他的字段条件来删除。
@Test
public void deleteByQueryRequestTest() throws IOException {
DeleteByQueryRequest request = new DeleteByQueryRequest("hero");
request.setConflicts("proceed");
request.setQuery(new TermQueryBuilder("country", "吴"));
BulkByScrollResponse bulkResponse =
client.deleteByQuery(request, RequestOptions.DEFAULT);
assertEquals(0, bulkResponse.getBulkFailures().size());
}
对应的SQL:
> delete from hero where country='吴';
6、复合操作
在上边的增删改都是一次只能操作一种类型,而ES还给我们提供了一次进行多种类型的操作,例如下边的代码
@Test
public void bulkDiffRequestTest() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("hero", "3"));
request.add(new UpdateRequest("hero", "7")
.doc(XContentType.JSON,"longevity", "70"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
BulkItemResponse[] bulkItemResponses = bulkResponse.getItems();
for (BulkItemResponse item : bulkItemResponses){
DocWriteResponse itemResponse = item.getResponse();
switch (item.getOpType()) {
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
assertEquals(RestStatus.OK, item.status());
}
}
我们使用了BulkRequest
对象,将DeleteRequest
、UpdateRequest
两种操作add
到BulkRequet
中,然后将返回的BulkItemResponse[]
数组根据不同的操作类型进行分类处理即可。当然据我所知,目前Mysql
并没有类似的语法支持,如果有希望大家留言指正哈。
7、查询
到这里才是我们真正的重点,在ES里边支持多种类型的查询,例如**”精确“(和RDBMS有所区别)查询、模糊查询、相关性查询、范围查询、全文检索、分页查询、排序、聚合**等等查询功能,在Mysql中的大部分查询功能在ES中均能实现。同还允许我们选择同步、异步的方式来执行查询
单条件查询 + limit
@Test
public void selectByUserTest(){
SearchRequest request = new SearchRequest("hero");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(new TermQueryBuilder("country", "魏"));
// 相当于mysql里边的limit 1;
builder.size(1);
request.source(builder);
try {
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
assertEquals(1, hits.length);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
上边的单元测试中,我们用user
作为查询条件,并且限制返回条数,类似SQL如下
> select * from posts where country='魏' limit 1;
多条件查询 + 排序 + 分页
@Test
public void boolQueryTest(){
SearchRequest request = new SearchRequest("hero");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must(termQuery("country", "魏"));
boolQueryBuilder.must(rangeQuery("longevity").gte(50));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.from(0).size(2);
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.sort("longevity", SortOrder.DESC);
request.source(sourceBuilder);
SearchResponse response = null;
try {
response = client.search(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("Query by Condition execution failed: {}", e.getMessage(), e);
}
assert response != null;
assertEquals(0, response.getShardFailures().length);
SearchHit[] hits = response.getHits().getHits();
List<Hero> herosList = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
herosList.add(JSON.parseObject(hit.getSourceAsString(), Hero.class));
}
log.info("print info: {}, size: {}", herosList.toString(), herosList.size());
}
上边的将曹魏集团的寿命50岁以上的英雄查询出来,并根据寿命从高到低排序,只截取两位英雄,其对应的sql:
> select * from hero where country='魏' and longevity >= 50 order by longevity DESC limit 2;
这里要注意,我们在ES提供的API中使用多条件查询时需要将多个条件封装到BoolQueryBuilder
对象中,其支持下边几种查询类型
private static final String MUSTNOT = "mustNot";
private static final String MUST_NOT = "must_not";
private static final String FILTER = "filter";
private static final String SHOULD = "should";
private static final String MUST = "must";
具体解释参考官方文档
总结
在这部分我们先分享了如何将Spring boot和ES集成,以及最佳实践的建议——采用Java High Level REST Client
来构建我们的API,然后分享了相关的依赖以及如何初始化客户端。
紧接着我们开始用High Level REST Client
实现了创建索引、批量插入、更新数据、插入/更新数据、删除数据、复合操作,最后我们用两个简单的例子实现了查询数据,当然还有很多的查询例子没有展示出来,建议大家根据自己的需求,去官网查询使用的方法。