SpringBoot连接Elasticsearch实战总结

4,169 阅读9分钟

记一次线上的elasticsearch查询采坑

第一次使用elasticsearch,于是从网上找轮子复制粘贴。早好轮子测试完毕,上线。可是几天下来发现接口响应时间一直都偏高(默认的超时时间是500ms),所以就不停的对代码优化,缩短时间。但是到最后代码已经不能再优化了,响应时间依然没有明显的下降趋势,甚至在高峰期会严重超时。接下来会慢慢讲解elasticsearch使用优化。

Spring Boot添加elasticsearch依赖

有很多种方案可以选择,1)添加spring的data依赖。2)使用elasticsearch提供的client依赖。3)使用jestClient依赖。前两种并没有什么区别,第三种是通过http请求访问elasticsearch的。

使用elasticsearch官方依赖

使用IDE初始化Springboot时勾选elasticsearch即可,或者你也可以直接添加如下依赖:

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  <version>{elasticserch.version}</version>
		</dependency>

或者到maven网站查找对应elasticsearch版本的依赖:

<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>{elasticserch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>transport</artifactId>
			<version>{elasticserch.version}</version>
		</dependency>

需要注意的是,一定要使用与你的elasticsearch版本一直的依赖,否则可能会出错

elasticsearch的配置

@Configuration
public class ElasticsearchConfig {
    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);

    @Value("${elasticsearch.port}")
    private String port;
    @Value("${elasticsearch.cluster.name}")
    private String clusterName;
    @Value("${elasticsearch.pool}")
    private String poolSize;
    @Value("${elasticsearch.ip}")
    private String esHost;

    @Bean(name = "transportClient")
    public TransportClient transportClient() {
        logger.info("Elasticsearch初始化开始。。。。。");
        TransportClient transportClient = null;
        try {
            // 配置信息
            Settings esSetting = Settings.builder()
                    //集群名字
                    .put("cluster.name", clusterName)
                    //增加嗅探机制,找到ES集群
                    .put("client.transport.sniff", true)
//                    .put("client.transport.ignore_cluster_name", true)
                    //增加线程池个数,暂时设为5
                    .put("thread_pool.search.size", Integer.parseInt(poolSize))
                    .build();
            //配置信息Settings自定义
            transportClient = new PreBuiltTransportClient(esSetting);
            TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(esHost), Integer.valueOf(port));
            transportClient.addTransportAddresses(transportAddress);
            logger.info("连接elasticsearch");
        } catch (Exception e) {
            logger.error("elasticsearch TransportClient create error!!", e);
        }
        return transportClient;
    }
}

低版本的elasticsearch在配置setting自定义内容时会不一样。使用elasticsearch节点连接的端口是9300。

简单的使用:

@Component
public class ElasticsearchUtils {
    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchUtils.class);

    @Resource(name = "transportClient")
    private TransportClient transportClient;

    private static TransportClient client;

    @PostConstruct
    public void init() {
        client = this.transportClient;
    }

    /**
     * @author xiaosen
     * @description 判断索引是否存在
     * @date 2019/1/23
     * @param
     * @return
     */
    public static boolean isIndexExist(String index) {
        IndicesExistsResponse inExistsResponse = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet();
        if (inExistsResponse.isExists()) {
            logger.info("索引:{}存在", index);
        } else {
            logger.info("索引:{}不存在", index);
        }
        return inExistsResponse.isExists();
    }
  
   public static List<Map<String, Object>> searchListData(String index, String type, long startTime, long endTime, Integer size, String fields, String sortField, boolean matchPhrase, String highlightField, String matchStr) {

        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index);
        if (StringUtils.isNotEmpty(type)) {
            searchRequestBuilder.setTypes(type.split(","));
        }
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

        if (startTime > 0 && endTime > 0) {
            boolQuery.must(QueryBuilders.rangeQuery("processTime")
                    .format("epoch_millis")
                    .from(startTime)
                    .to(endTime)
                    .includeLower(true)
                    .includeUpper(true));
        }

        //搜索的的字段
        if (StringUtils.isNotEmpty(matchStr)) {
            for (String s : matchStr.split(",")) {
                String[] ss = s.split("=");
                if (ss.length > 1) {
                    if (matchPhrase == Boolean.TRUE) {
                        boolQuery.must(QueryBuilders.matchPhraseQuery(s.split("=")[0], s.split("=")[1]));
                    } else {
                        boolQuery.must(QueryBuilders.matchQuery(s.split("=")[0], s.split("=")[1]));
                    }
                }

            }
        }

        // 高亮(xxx=111,aaa=222)
        if (StringUtils.isNotEmpty(highlightField)) {
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            // 设置高亮字段
            highlightBuilder.field(highlightField);
            searchRequestBuilder.highlighter(highlightBuilder);
        }


        searchRequestBuilder.setQuery(boolQuery);

        if (StringUtils.isNotEmpty(fields)) {
            searchRequestBuilder.setFetchSource(fields.split(","), null);
        }
        searchRequestBuilder.setFetchSource(true);

        if (StringUtils.isNotEmpty(sortField)) {
            searchRequestBuilder.addSort(sortField, SortOrder.DESC);
        }

        if (size != null && size > 0) {
            searchRequestBuilder.setSize(size);
        }
        SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();

        long totalHits = searchResponse.getHits().totalHits;
        long length = searchResponse.getHits().getHits().length;

        if (searchResponse.status().getStatus() == 200) {
            // 解析对象
            return setSearchResponse(searchResponse, highlightField);
        }

        return null;

    }

使用JestClient

添加maven依赖(这里的elasticsearch版本比较低,而且还没有开放9300端口,只能使用http请求)

 <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>1.5.2</version>
        </dependency>
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>6.3.1</version>
        </dependency>

io.searchbox是操作elasticsearch的依赖,使用其9200端口。

配置文件就比较简单了:

@Configuration
@RefreshScope
public class ElasticsearchConfigure {
    private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfigure.class);

    @Value("${elasticsearch.ip}")
    private String hostAndPort;

    @Bean(name = "elasticsearchClient")
    public JestClient getJestClient() throws Exception{
        JestClientFactory factory = new JestClientFactory();
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (X509Certificate[] arg0, String arg1) -> true).build();
      // http配置
        factory.setHttpClientConfig(new HttpClientConfig.Builder("http://"+hostAndPort).connTimeout(2000)
                .readTimeout(2000).plainSocketFactory(PlainConnectionSocketFactory.getSocketFactory())
                .sslSocketFactory(new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE))
                .multiThreaded(true).maxTotalConnection(100).defaultMaxTotalConnectionPerRoute(4).build());
        return factory.getObject();
    }
}

创建一个JestClientFactory并配置httpClient。

简单的一个例子:

@Resource(name = "elasticsearchClient")
    private JestClient jestClient;

public static void main(String[] args){
  FilterBuilder filterBuilder = FilterBuilders.boolFilter()
                    .must(FilterBuilders.geoDistanceRangeFilter("location")
                            .point(lat, lon).from(Constants.MIN_RADIUS).to(Constants.MAX_RADIUS))
                    .should(FilterBuilders.termFilter("status", 200), FilterBuilders.termFilter("status", 201));
  FilteredQueryBuilder filteredQueryBuilder = new FilteredQueryBuilder(null, filterBuilder);
            // 按在线时间排序,先按时间再按距离排序
            FieldSortBuilder sortBuilderField = SortBuilders.fieldSort("time").order(SortOrder.DESC);
            // 按距离排序,为返回客户端距离,返回的单位:米
            GeoDistanceSortBuilder sortBuilderDis = SortBuilders.geoDistanceSort("location").point(lat, lon).unit(DistanceUnit.KILOMETERS).order(SortOrder.ASC).geoDistance(GeoDistance.SLOPPY_ARC);
   SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  searchSourceBuilder.query(filteredQueryBuilder).sort(sortBuilderField).sort(sortBuilderDis)
                    .from((queryNearbyDto.getCurrentPage()-1)*queryNearbyDto.getPageSize())
                    .size(queryNearbyDto.getPageSize()).fetchSource(Constants.QUERY_FIELD_TTID, null);
            String query = searchSourceBuilder.toString();
  result = search(jestClient, index, Constants.ES_NEARBY_TYPE, query);
  
}



private List<Map<String, Object>> search(JestClient jestClient, String indexName, String typeName, String query) throws Exception {
        Search search = new Search.Builder(query).setSearchType(SearchType.QUERY_THEN_FETCH)
                .addIndex(indexName)
                .addType(typeName)
                .build();
        SearchResult jr = jestClient.execute(search);
        if (!jr.isSucceeded()||jr.getResponseCode()!=200){
            return null;
        }
        Long total = jr.getTotal();
        List<SearchResult.Hit<Map, Void>> maps = jr.getHits(Map.class, false);
        List<Map<String, Object>> sourceList = maps.stream().map(source -> {
            source.source.put("sort", Double.valueOf(source.sort.get(1)));
            return (Map<String, Object>)source.source;
        }).collect(Collectors.toList());
        return sourceList;
    }

其中的变量query是查询的elasticsearch的语句,如果你知道elasticsearch的语法也可以直接写一个json代替。

距离排序

在jestClient中有一个按距离和时间排序的例子,是先按时间排序再按距离排序,目的是返回距离。es是可以按多个字段排序的,靠前的为优先匹配排序,最后的排序结果会在返回的sort数组中返回,数组中的位置即排序的匹配位置,我这里将返回的距离提取出来放到map中。

5.2的elasticsearch的api的距离排序方法如下:

GeoDistanceSortBuilder sortBuilderDis = SortBuilders.geoDistanceSort("location", lat, lon).point(lat, lon).unit(DistanceUnit.METERS).order(SortOrder.ASC).geoDistance(GeoDistance.ARC);

这里如果不想让elasticsearch计算距离也可以用他提供的方法自己计算,前提知道二者的经纬度,调用GeoDistance的calculate方法,具体使用的精确度可以按照业务要求选择,不过我有做过测试,自己计算距离和elasticsearch计算的耗时几乎相差不多,如果是额外的计算距离可以不再查一遍elasticsearch减少io消耗。

分页

对于elasticsearch不太熟悉的同学,分页也是一个坑。

浅分页

elasticsearch的的浅分页from&size,from是查询的索引位置,size是每页数量,优点类似于mysql的limit和start。

现在我们可以假设在一个有 5 个主分片的索引中搜索。 当我们请求结果的第一页(结果从 1 到 10 ),每一个分片产生前 10 的结果,并且返回给 协调节点 ,协调节点对 50 个结果排序得到全部结果的前 10 个。现在假设我们请求第 1000 页--结果从 10001 到 10010 。所有都以相同的方式工作除了每个分片不得不产生前10010个结果以外。 然后协调节点对全部 50050 个结果排序最后丢弃掉这些结果中的 50040 个结果。可以看到,在分布式系统中,对结果排序的成本随分页的深度成指数上升。这就是 web 搜索引擎对任何查询都不要返回超过 1000 个结果的原因。你翻页的时候,翻的越深,每个 Shard 返回的数据就越多,而且协调节点处理的时间越长,非常坑爹。所以用 ES 做分页的时候,你会发现越翻到后面,就越是慢。我们之前也是遇到过这个问题,用 ES 作分页,前几页就几十毫秒,翻到 10 页或者几十页的时候,基本上就要 5~10 秒才能查出来一页数据了。

使用from&size的最大查询量是10000条数据,这个值可以在elasticsearch中配置文件中设置。

scroll 深分页

为了解决上面的问题,elasticsearch提出了一个scroll滚动的方式。scroll 类似于sql中的cursor,使用scroll,每次只能获取一页的内容,然后会返回一个scroll_id。根据返回的这个scroll_id可以不断地获取下一页的内容,所以scroll并不适用于有跳页的情景.

POST /twitter/_search?scroll=1m
{
    "size": 100,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}
  1. scroll=1m表示设置scroll_id保留1分钟可用。
  2. 使用scroll必须要将from设置为0。
  3. size决定后面每次调用_search搜索返回的数量
POST /_search/scroll 
{
    "scroll" : "1m", 
    "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==" 
}

然后我们可以通过数据返回的_scroll_id读取下一页内容,每次请求将会读取下10条数据,直到数据读取完毕或者scroll_id保留时间截止。请求的接口不再使用索引名了,而是 _search/scroll,其中GET和POST方法都可以使用。

search_after

Scroll 被推荐用于深度查询,但是contexts的代价是昂贵的,不推荐用于实时用户请求,而更适用于后台批处理任务,比如群发。search_after 提供了一个实时的光标来避免深度分页的问题,其思想是使用前一页的结果来帮助检索下一页。search_after不能自由跳到一个随机页面,只能按照 sort values 跳转到下一页。使用 search_after 参数的时候,from参数必须被设置成 0 或 -1 (当然你也可以不设置这个from参数)

search_after 需要使用一个唯一值的字段作为排序字段,否则不能使用search_after方法 推荐使用_uid 作为唯一值的排序字段。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}      
    ]
}

在下一次查询的时候讲返回的最后的一条数据的sort的数组放放到search_after中。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

总结

  • 深度分页不管是关系型数据库还是Elasticsearch还是其他搜索引擎,都会带来巨大性能开销,特别是在分布式情况下。
  • 有些问题可以考业务解决而不是靠技术解决,比如很多业务都对页码有限制,google 搜索,往后翻到一定页码就不行了。
  • scroll 并不适合用来做实时搜索,而更适用于后台批处理任务,比如群发。
  • search_after不能自由跳到一个随机页面,只能按照 sort values 跳转到下一页。

排序与相关性

默认情况下,返回的结果是按照 相关性 进行排序的——最相关的文档排在最前。每个文档都有相关性评分,用一个正浮点数字段 _score 来表示 。 _score 的评分越高,相关性越高。

查询语句会为每个文档生成一个 _score 字段。评分的计算方式取决于查询类型 不同的查询语句用于不同的目的: fuzzy 查询会计算与关键词的拼写相似程度,terms 查询会计算 找到的内容与关键词组成部分匹配的百分比,但是通常我们说的 relevance 是我们用来计算全文本字段的值相对于全文本检索词相似程度的算法。

具体score算法可以到官网查询。

在代码中设置:

// 设置是否按查询匹配度排序
searchRequestBuilder.setExplain(true);

注意:

相关项排序消耗资源非常大,如果不是对文本精确度要求特别高的情况下,生产环境不建议按相关性排序。

参考:

www.elastic.co/guide/en/el…

blog.csdn.net/yiyiholic/a…

www.souyunku.com/2017/11/06/…

www.cnblogs.com/yangzhenlon…

www.elastic.co/guide/cn/el…


欢迎关注公众号:

公众号微信