使用elasticsearch进行搜索 2
ES的聚合分析简介
在报警系统中,不仅需要对es中的数据进行_search endpoint的查询,还需要对数据进行分析,如按时间、按业务;求报警最多的天数,报警最少的天,或者按业务等。基于这些需求 则需要熟悉并灵活应用es的聚合分析等;
一、聚合分析的概念
- ES聚合分析是什么
聚合分析是数据库中重要的功能特性,完成对一个查询的数据集中数据的聚合计算,如:找出某字段(或计算表达式的结果)的最大值、最小值,计算和、平均值等。ES作为搜索引擎兼数据库,同样提供了强大的聚合分析能力。
对一个数据集求最大、最小、和、平均值等指标的聚合,在ES中称为指标聚合 metric
而关系型数据库中除了有聚合函数外,还可以对查询出的数据进行分组group by,再在组上进行指标聚合。在 ES 中group by 称为分桶,桶聚合 bucketing
ES中还提供了矩阵聚合(matrix)、管道聚合(pipleline)
- ES聚合分析查询的方法
在查询请求体中以aggregations节点按如下语法定义聚合分析:
"aggregations" : {
"<aggregation_name>" : { <!--聚合的名字 -->
"<aggregation_type>" : { <!--聚合的类型 -->
<aggregation_body> <!--聚合体:对哪些字段进行聚合 -->
}
[,"meta" : { [<meta_data_body>] } ]? <!--元 -->
[,"aggregations" : { [<sub_aggregation>]+ } ]? <!--在聚合里面在定义子聚合 -->
}
[,"<aggregation_name_2>" : { ... } ]*<!--聚合的名字 -->
}
- 聚合分析的值来源
聚合计算的值可以取字段的值,也可是脚本计算的结果。
二、指标聚合
1. max min sum avg 示例1:查询所有客户中余额的最大值
POST /bank/_search?
{
"size": 0,
"aggs": {
"masssbalance": { #聚合名
"max": {# 最大值 聚合类型 max
"field": "balance"#字段名()
}
}
}
}
结果:
{
"took": 2080,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"masssbalance": { #聚合名
"value": 49989 #filed的max值
}
}
}
示例2:查询年龄为24岁的客户中的余额最大值
POST /bank/_search?
{
"size": 2,
"query": {
"match": { #全文检索
"age": 24 #age 字段值为24
}
},
"sort": [
{
"balance": { # balance字段 排序
"order": "desc"
}
}
],
"aggs": {
"max_balance": { # 聚合名称
"max": { # max类型
"field": "balance" #聚合字段名称 是balance(余额)
}
}
}
}
结果2:
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": { # query 结果
"total": 42,
"max_score": null,
"hits": [
{
"_index": "bank",
"_type": "_doc",
"_id": "697",
"_score": null,
"_source": {
"account_number": 697,
"balance": 48745,
"firstname": "Mallory",
"lastname": "Emerson",
"age": 24,
"gender": "F",
"address": "318 Dunne Court",
"employer": "Exoplode",
"email": "malloryemerson@exoplode.com",
"city": "Montura",
"state": "LA"
},
"sort": [
48745
]
},
{
"_index": "bank",
"_type": "_doc",
"_id": "917",
"_score": null,
"_source": {
"account_number": 917,
"balance": 47782,
"firstname": "Parks",
"lastname": "Hurst",
"age": 24,
"gender": "M",
"address": "933 Cozine Avenue",
"employer": "Pyramis",
"email": "parkshurst@pyramis.com",
"city": "Lindcove",
"state": "GA"
},
"sort": [
47782
]
}
]
},
"aggregations": { # query执行后的 聚合匹配结果
"max_balance": {
"value": 48745
}
}
}
2. 文档计数count 示例1:统计银行索引bank下年龄为24的文档数量
POST /bank/_doc/_count
{
"query": {
"match": {
"age" : 24
}
}
}
结果1:
{
"count": 42,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
}
}
GET 13.9.16.2:9223/alerts_es/alerts_es/_count
{
"count": 829227,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0
}
}
3. Value count 统计某字段有值的文档数 示例1:
POST /bank/_search?size=0
{
"aggs": { # 聚合关键字
"age_count": { #聚合名称
"value_count": { #有值统计
"field": "age" #字段名
}
}
}
}
结果1:
{
"took": 2022,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_count": {
"value": 1000
}
}
}
4. cardinality 值去重计数
示例1:
POST /bank/_search?size=0
{
"aggs": { # 关键字
"age_count": { #聚合名称
"cardinality": { #去重计数
"field": "age" #聚合字段
}
},
"state_count": { #聚合名称
"cardinality": { #去重计数
"field": "state.keyword" #聚合字段说明:state的使用它的keyword版
}
}
}
}
结果 两个字段聚合 可得两个聚合返回:
{
"took": 2074,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"state_count": {
"value": 51
},
"age_count": {
"value": 21
}
}
}
5. stats 统计 count max min avg sum 5个值
示例1:
POST /bank/_search?size=0
{
"aggs": {
"age_stats": {
"stats": { #包含五个关键值
"field": "age"
}
}
}
}
#结果
{
"took": 7,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_stats": {
"count": 1000,
"min": 20,
"max": 40,
"avg": 30.171,
"sum": 30171
}
}
}
6. Extended stats 高级统计,比stats多4个统计结果: 平方和、方差、标准差、平均值加/减两个标准差的区间
示例1
POST /bank/_search?size=0
{
"aggs": {
"age_stats": {
"extended_stats": {
"field": "age"
}
}
}
}
结果:
{
"took": 7,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_stats": {
"count": 1000,
"min": 20,
"max": 40,
"avg": 30.171,
"sum": 30171,
"sum_of_squares": 946393,
"variance": 36.10375899999996,
"std_deviation": 6.008640362012022,
"std_deviation_bounds": {
"upper": 42.18828072402404,
"lower": 18.153719275975956
}
}
}
}
7. Percentiles 占比百分位对应的值统计 对指定字段(脚本)的值按从小到大累计每个值对应的文档数的占比(占所有命中文档数的百分比),返回指定占比比例对应的值。默认返回[ 1, 5, 25, 50, 75, 95, 99 ]分位上的值。如下中间的结果,可以理解为:占比为50%的文档的age值 <= 31,或反过来:age<=31的文档数占总命中文档数的50%
示例1
POST /bank/_search?size=0
{
"aggs": {
"age_percents": {
"percentiles": {
"field": "age"
}
}
}
}
结果1:
{
"took": 87,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_percents": {
"values": {
"1.0": 20,
"5.0": 21,
"25.0": 25,
"50.0": 31,
"75.0": 35.00000000000001,
"95.0": 39,
"99.0": 40
}
}
}
}
3结果说明:
#占比为50%的文档的age值 <= 31,或反过来:age<=31的文档数占总命中文档数的50%
指定分位值
POST /bank/_search?size=0
{
"aggs": {
"age_percents": {
"percentiles": {
"field": "age",
"percents" : [95, 99, 99.9]
}
}
}
}
{
"took": 8,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_percents": {
"values": {
"95.0": 39,
"99.0": 40,
"99.9": 40
}
}
}
}
8. Percentiles rank 统计值小于等于指定值的文档占比 示例1:统计年龄小于25和30的文档的占比,和第7项相反
POST /bank/_search?size=0
{
"aggs": {
"gge_perc_rank": {
"percentile_ranks": {
"field": "age",
"values": [
25,
30
]
}
}
}
}
结果2:
{
"took": 8,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"gge_perc_rank": {
"values": { #年龄小于25的文档占比为26.1%,年龄小于30的文档占比为49.2%,
"25.0": 26.1,
"30.0": 49.2
}
}
}
}
三、桶聚合buckets
1. Terms Aggregation 根据字段值项分组聚合
size 指定返回多少个分组:"size": 20,在聚合桶的分组
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": { #分组
"field": "age",
"size": 10 #桶聚合分组个数
}
}
}
}
结果1
{
"took": 2000,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_terms": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 463,
"buckets": [
{
"key": 31,
"doc_count": 61
},
{
"key": 39,
"doc_count": 60
},
{
"key": 26,
"doc_count": 59
},
{
"key": 32,
"doc_count": 52
},
{
"key": 35,
"doc_count": 52
},
{
"key": 36,
"doc_count": 52
},
{
"key": 22,
"doc_count": 51
},
{
"key": 28,
"doc_count": 51
},
{
"key": 33,
"doc_count": 50
},
{
"key": 34,
"doc_count": 49
}
]
}
}
}
结果说明:
"doc_count_error_upper_bound": 0:文档计数的最大偏差值
"sum_other_doc_count": 463:未返回的其他项的文档数
默认情况下返回按文档计数从高到低的前10个分组,"size": 可指定返回的buckets分组个数
示例2:每个分组上显示偏差值
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"size": 5,
"shard_size": 20,
"show_term_doc_count_error": true
}
}
}
}
结果
{
"took": 8,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_terms": {
"doc_count_error_upper_bound": 25,
"sum_other_doc_count": 716,
"buckets": [
{
"key": 31,
"doc_count": 61,
"doc_count_error_upper_bound": 0
},
{
"key": 39,
"doc_count": 60,
"doc_count_error_upper_bound": 0
},
{
"key": 26,
"doc_count": 59,
"doc_count_error_upper_bound": 0
},
{
"key": 32,
"doc_count": 52,
"doc_count_error_upper_bound": 0
},
{
"key": 36,
"doc_count": 52,
"doc_count_error_upper_bound": 0
}
]
}
}
}
示例3:shard_size 指定每个分片上返回多少个分组
shard_size 的默认值为: 索引只有一个分片 =size 多分片 = size * 1.5 +10
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"size": 5,
"shard_size": 20
}
}
}
}
结果4:
{
"took": 8,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_terms": {
"doc_count_error_upper_bound": 25,
"sum_other_doc_count": 716,
"buckets": [
{
"key": 31,
"doc_count": 61
},
{
"key": 39,
"doc_count": 60
},
{
"key": 26,
"doc_count": 59
},
{
"key": 32,
"doc_count": 52
},
{
"key": 36,
"doc_count": 52
}
]
}
}
}
*** order 指定分组的排序**
示例4:根据文档计数排序
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",
"order" : { "_count" : "asc" }
}
}
}
}
结果
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_terms": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 584,
"buckets": [
{
"key": 29,
"doc_count": 35
},
{
"key": 27,
"doc_count": 39
},
{
"key": 38,
"doc_count": 39
},
{
"key": 23,
"doc_count": 42
},
{
"key": 24,
"doc_count": 42
},
{
"key": 25,
"doc_count": 42
},
{
"key": 37,
"doc_count": 42
},
{
"key": 20,
"doc_count": 44
},
{
"key": 40,
"doc_count": 45
},
{
"key": 21,
"doc_count": 46
}
]
}
}
}
示例6:根据分组值排序
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"terms": {
"field": "age",# 根据filed排序
"order" : { "_key" : "asc" }
}
}
}
}
result
{
"took": 10,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_terms": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 549,
"buckets": [
{
"key": 20,
"doc_count": 44
},
{
"key": 21,
"doc_count": 46
},
{
"key": 22,
"doc_count": 51
},
{
"key": 23,
"doc_count": 42
},
{
"key": 24,
"doc_count": 42
},
{
"key": 25,
"doc_count": 42
},
{
"key": 26,
"doc_count": 59
},
{
"key": 27,
"doc_count": 39
},
{
"key": 28,
"doc_count": 51
},
{
"key": 29,
"doc_count": 35
}
]
}
}
}
示例7:取分组指标值排序
POST /bank/_search?size=0
{
"aggs": { # 关键字
"age_terms": { #聚合名称
"terms": { #桶排序 根据字段值项分组
"field": "age", #分组字段值
"order": {
"max_balance": "asc" #指标值排序,按max_balance 指定的field字段值 升序排序
}
},
"aggs": { # 关键字
"max_balance": { #聚合名称
"max": { #指标 最大
"field": "balance" #字段名
}
},
"min_balance": { #聚合名称
"min": {#指标 最小
"field": "balance"#字段名
}
}
}
}
}
}
结果
{
"took": 28,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_terms": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 511,
"buckets": [
{
"key": 27,
"doc_count": 39,
"min_balance": {
"value": 1110
},
"max_balance": {
"value": 46868
}
},
{
"key": 39,
"doc_count": 60,
"min_balance": {
"value": 3589
},
"max_balance": {
"value": 47257
}
},
{
"key": 37,
"doc_count": 42,
"min_balance": {
"value": 1360
},
"max_balance": {
"value": 47546
}
},
{
"key": 32,
"doc_count": 52,
"min_balance": {
"value": 1031
},
"max_balance": {
"value": 48294
}
},
{
"key": 26,
"doc_count": 59,
"min_balance": {
"value": 1447
},
"max_balance": {
"value": 48466
}
},
{
"key": 33,
"doc_count": 50,
"min_balance": {
"value": 1314
},
"max_balance": {
"value": 48734
}
},
{
"key": 24,
"doc_count": 42,
"min_balance": {
"value": 1011
},
"max_balance": {
"value": 48745
}
},
{
"key": 31,
"doc_count": 61,
"min_balance": {
"value": 2384
},
"max_balance": {
"value": 48758
}
},
{
"key": 34,
"doc_count": 49,
"min_balance": {
"value": 3001
},
"max_balance": {
"value": 48997
}
},
{
"key": 29,
"doc_count": 35,
"min_balance": {
"value": 3596
},
"max_balance": {
"value": 49119
}
}
]
}
}
}
** 示例8:筛选分组-正则表达式匹配值**
GET /_search
{
"aggs" : {
"tags" : {
"terms" : {
"field" : "tags",
"include" : ".*sport.*",#包含
"exclude" : "water_.*"#排除
}
}
}
}
示例9:筛选分组-指定值列表
GET /_search
{
"aggs" : {
"JapaneseCars" : {
"terms" : {
"field" : "make",
"include" : ["mazda", "honda"]
}
},
"ActiveCarManufacturers" : {
"terms" : {
"field" : "make",
"exclude" : ["rover", "jensen"]
}
}
}
}
2. filter Aggregation 对满足过滤查询的文档进行聚合计算
在查询命中的文档中选取符合过滤条件的文档进行聚合,先过滤再聚合
示例1:(o゜▽゜)o☆[BINGO!]
POST /bank/_search?size=0
{
"aggs": {
"age_terms": {
"filter": {"match":{"gender":"F"}},
"aggs": {
"avg_age": {
"avg": {
"field": "age"
}
}
}
}
}
}
结果
{
"took": 163,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_terms": {
"doc_count": 493,
"avg_age": {
"value": 30.3184584178499
}
}
}
}
3. Filters Aggregation 多个过滤组聚合计算
示例1:
GET logs/_search
{
"size": 0,
"aggs": {
"messages": {
"filters": {
"filters": {
"errors": {
"match": {
"body": "error"
}
},
"warnings": {
"match": {
"body": "warning"
}
}
}
}
}
}
}
resutl
{
"took": 18,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"messages": {
"buckets": {
"errors": {
"doc_count": 1
},
"warnings": {
"doc_count": 2
}
}
}
}
}
GET logs/_search
{
"size": 0,
"aggs": {
"messages": {
"filters": {
"other_bucket_key": "other_messages",
"filters": {
"errors": {
"match": {
"body": "error"
}
},
"warnings": {
"match": {
"body": "warning"
}
}
}
}
}
}
}
指定一个key
{
"took": 5,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"messages": {
"buckets": {
"errors": {
"doc_count": 1
},
"warnings": {
"doc_count": 2
},
"other_messages": {
"doc_count": 0
}
}
}
}
}
4. Range Aggregation 范围分组聚合
(o゜▽゜)o☆[BINGO!]
示例1:
POST /bank/_search?size=0
{
"aggs": {
"age_range": {
"range": {
"field": "age",
"ranges": [
{
"to": 25
},
{
"from": 25,
"to": 35
},
{
"from": 35
}
]
},
"aggs": {
"bmax": {
"max": {
"field": "balance"
}
}
}
}
}
}
result
{
"took": 7,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_range": {
"buckets": [
{
"key": "*-25.0",
"to": 25,
"doc_count": 225,
"bmax": {
"value": 49587
}
},
{
"key": "25.0-35.0",
"from": 25,
"to": 35,
"doc_count": 485,
"bmax": {
"value": 49795
}
},
{
"key": "35.0-*",
"from": 35,
"doc_count": 290,
"bmax": {
"value": 49989
}
}
]
}
}
}
- 示例2:为组指定key (o゜▽゜)o☆[BINGO!]
POST /bank/_search?size=0
{
"aggs": {
"age_range": {
"range": {
"field": "age",
"keyed": true,
"ranges": [
{
"to": 25,
"key": "Ld"
},
{
"from": 25,
"to": 35,
"key": "Md"
},
{
"from": 35,
"key": "Od"
}
]
}
}
}
}
result
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"age_range": {
"buckets": {
"Ld": {
"to": 25,
"doc_count": 225
},
"Md": {
"from": 25,
"to": 35,
"doc_count": 485
},
"Od": {
"from": 35,
"doc_count": 290
}
}
}
}
}
5. Date Range Aggregation 时间范围分组聚合
(o゜▽゜)o☆[BINGO!]
示例1:
{
"aggs": {
"range": {
"date_range": {
"field": "date",
"format": "MM-yyy",
"ranges": [
{
"to": "now-10M/M"
},
{
"from": "now-10M/M"
}
]
}
}
}
}
result
{
"took": 115,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"range": {
"buckets": [
{
"key": "*-2017-08-01T00:00:00.000Z",
"to": 1501545600000,
"to_as_string": "2017-08-01T00:00:00.000Z",
"doc_count": 0
},
{
"key": "2017-08-01T00:00:00.000Z-*",
"from": 1501545600000,
"from_as_string": "2017-08-01T00:00:00.000Z",
"doc_count": 0
}
]
}
}
}
6. Date Histogram Aggregation 时间直方图(柱状)聚合
(o゜▽゜)o☆[BINGO!] 就是按天、月、年等进行聚合统计。可按 year (1y), quarter (1q), month (1M), week (1w), day (1d), hour (1h), minute (1m), second (1s) 间隔聚合或指定的时间间隔聚合。
POST /bank/_search?size=0
{
"aggs": {
"sales_over_time": {
"date_histogram": {
"field": "date",
"interval": "month"
}
}
}
}
resutl
{
"took": 9,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": 0,
"hits": []
},
"aggregations": {
"sales_over_time": {
"buckets": []
}
}
}
7. Missing Aggregation 缺失值的桶聚合
POST /bank/_search?size=0
{
"aggs" : {
"account_without_a_age" : {
"missing" : { "field" : "age" }
}
}
}
#### 8. Geo Distance Aggregation 地理距离分区聚合
桶聚合一般不单独使用,都是配合指标聚合一起使用,对数据分组之后肯定要统计桶内数据,在ES中如果没有明确指定指标聚合,默认使用Value Count指标聚合,统计桶内文档总数。
四、嵌套聚合 Nested Aggregation
Nested Aggregation
一个特殊的single bucket聚合,可以聚合嵌套的文档
(1) 计算每个tag下图书的平均价格, 再按平均价格降序排序, 查询语法:
GET book_shop/it_book/_search
{
"size": 0,
"aggs": {
"all_tags": {
"terms": {
"field": "tags.keyword",
"order": { "avg_price": "desc" } // 根据下述统计的结果排序
},
"aggs": {
"avg_price": {
"avg": { "field": "price" }
}
}
}
}
}
更多案例如下:
通过 Elasticsearch 实现聚合检索 (分组统计) 目录
Python应用实例
class AggregationBuckets(object):
def __init__(self, _from=None, size=None, index=[], **kwargs):
"""
基于created_at 作为事件参数
:param _from: ES 查询起始地址,用于分页;
:param size: ES 查询个数,用于分页;
:param index: 查询的ES index,list传入;
:param kwargs: 具体的查询参数;
"""
self.start_num = _from
self.end_num = size
self.index = index # str alerts_es
self.start_query_time = kwargs.get('start_query_time', None)
self.end_query_time = kwargs.get('end_query_time', None)
if not self.index or not self.start_query_time or not self.end_query_time:
log.error(
'Setup1: start es aggregation Incomplete parameter ')
return (3, 'Incomplete parameter...')
log.info('parameter'
'indices: {0} start_query_time: {1} end_query_time: {2}'.format(
self.index, self.start_query_time, self.end_query_time))
self.query_terms = kwargs.get('query_terms', None) # dict {}
self.query_string = kwargs.get('query_string', None)
self.histogram_interval = kwargs.get('histogram_interval', None) # srt day
self.histogram_field = kwargs.get('histogram_field', 'created_at')
self.histogram_format = kwargs.get('histogram_format', 'yyy-MM-dd hh:mm:ss')
# range date
self.range_date_field = kwargs.get('range_date_field', 'created_at')
self.range_date_format = kwargs.get('range_date_format', 'yyy-MM-dd hh:mm:ss')
self.range_date_ranges = kwargs.get('range_date_ranges', None) # list [{"to": "now", "from": "now-1d"}]
self.terms_field = kwargs.get('terms_field', None) # list ['ip']
self.terms_size = kwargs.get('terms_size', 10)
self.terms_order = kwargs.get('terms_order', {"_count": "desc"}) # dict {"_count": "desc"}
self.alert_id = kwargs.get('alerts_id', '')
self.filter_aggs = kwargs.get('filter_aggs', None)
self.pipeline_aggs = kwargs.get('pipeline_aggs', None)
self.nested_aggs = kwargs.get('nested_aggs', None)
def query(self):
query_aggs = {
"query": {
"filtered": {
# 'query': {},
"filter": {
"range": {
"created_at": {"lt": "", "gt": ""}}}},
},
"aggs": {},
"sort": {"created_at": {"order": "desc"}}
}
query_aggs['size'] = self.end_num
query_aggs['from'] = self.start_num
query_aggs['query']['filtered']['filter']['range']['created_at'] = \
{"lte": self.end_query_time, "gte": self.start_query_time}
if self.query_terms:
query_aggs['query']['filtered'].update({'query': self.query_terms})
if self.query_string:
query_aggs['query']['filtered'].update({'query': self.query_string})
if self.histogram_interval:
histogram_aggs = {"date_histogram": {}}
histogram_aggs['date_histogram']['field'] = self.histogram_field
histogram_aggs['date_histogram']['interval'] = self.histogram_interval
histogram_aggs['date_histogram']['format'] = self.histogram_format
query_aggs['aggs']['histogram_aggs'] = histogram_aggs
if self.pipeline_aggs:
query_aggs['aggs']['histogram_aggs']['aggs'] = self.pipeline_aggs
if self.range_date_ranges:
range_date_aggs = {"date_range": {}}
range_date_aggs['date_range']['field'] = self.range_date_field
range_date_aggs['date_range']['format'] = self.range_date_format
range_date_aggs['date_range']['ranges'] = self.range_date_ranges
query_aggs['aggs']['range_date_aggs'] = range_date_aggs
if self.terms_field:
for i in self.terms_field:
if i:
terms_name = {"terms": {}}
terms_name['terms']['field'] = i
terms_name['terms']['size'] = self.terms_size
terms_name['terms']['order'] = self.terms_order
# terms_name_aggs = 'terms_aggs_' + i
query_aggs['aggs'][i] = terms_name
if self.filter_aggs:
query_aggs['aggs']['filter_aggs'] = self.filter_aggs
if self.nested_aggs:
query_aggs.pop('sort')
query_aggs['aggs'].update(self.nested_aggs)
log.info('query date is {0}'.format(query_aggs))
return buckets_aggregation_query(self.index[0], query_aggs)
def query_other_by_alert_id(self, **kwargs):
ids = kwargs.get('alert_id', None)
if not ids:
return (3, 'Incomplete parameter...')
q_id = {'query': {'terms': {'alerts_id': ids}}}
q_id["size"] = self.end_num - self.start_num
q_id["from"] = 0
return query(self.index[1], q_id)
def active_two_date(size=None, _from=None, index=['alerts_es', 'alerts_server', 'alerts_sended'], **kwargs):
size = size
_from =_from
indexs = index
####### two day active #############
active_data = {'hits': []}
values = {}
values['start_query_time'] = 'now-1d'
values['end_query_time'] = 'now'
values['histogram_interval'] = 'day'
values['range_date_ranges'] = [{"to": "now", "from": "now-1d"}]
values['terms_field'] = ['ip.raw', '']
log.info('running query active_two_date data ...')
es_query = AggregationBuckets(size=size, _from=_from, index=index, **values)
alerts_data = es_query.query()
if alerts_data[0] != 5:
return alerts_data
# alerts_data_hits = alerts_data[1]['hits'][']
log.info('Setup1: query {0} of active two day OK ...'.format(index[0]))
ids = [int(i['_id']) for i in alerts_data[1]['hits']['hits'] if i['_id']]
log.info('Setup2: start query {0}'.format(indexs[1]))
if not ids:
return (3, 'Alert id not empty...')
alerts_server = es_query.query_other_by_alert_id(alert_id=ids)
if alerts_server[0] != 5:
return alerts_server
log.info('Setup2: query {0} of {1} OK ...'.format(indexs[1], ids))
for i in alerts_data[1]['hits']['hits']:
for j in alerts_server[1]['hits']:
if int(i['_id']) == int(j['_source']['alerts_id']):
i['_source']['status'] = u'问题'
a = datetime.datetime.utcnow() - datetime.datetime.strptime(
i['_source']['created_at'], '%Y-%m-%d %H:%M:%S')
i['_source']['up_time'] = a.__str__().split('.')[0]
if hasattr(i['_source'], 'deleted_at'):
i['_source']['status'] = u'恢复'
a = datetime.datetime.strptime(
i['_source']['deleted_at'], '%Y-%m-%d %H:%M:%S') - \
datetime.datetime.strptime(
i['_source']['created_at'], '%Y-%m-%d %H:%M:%S')
i['_source']['up_time'] = a.__str__().split('.')[0]
i['_source']['business'] = j['_source']['division'] + ' ' + \
j['_source']['product'] + ' ' + \
j['_source']['portal'] + ' ' + \
j['_source']['application']
active_data['hits'].append(i)
active_data['page_num'] = alerts_data[1]['hits']['total']
active_data['aggregations'] = alerts_data[1]['aggregations']
return (5, active_data)
# 获取函数
def graph_data(size=None, _from=None, index=['alerts_es', 'alerts_server', 'alerts_sended'], **kwargs):
size = size
_from = _from
index = index
nested_aggs = {'nested_aggs':
{"terms": {"field": "category.raw"},
"aggs": {
"histogram_category": {
"date_histogram": {
"field": kwargs.get('histogram_field', 'created_at'),
"interval": kwargs.get('histogram_interval', 'day'),
"format": kwargs.get('histogram_format', 'yyy-MM-dd hh:mm:ss')
}
}
}
}}
# nested
values = {'nested_aggs': nested_aggs}
values['start_query_time'] = kwargs.get('start_query_time', 'now-1y')
values['end_query_time'] = kwargs.get('end_query_time', 'now')
es_query = AggregationBuckets(size=0, _from=_from, index=index,
**values)
graph_aggs_data = es_query.query()
if graph_aggs_data[0] != 5:
return graph_aggs_data
log.info('Get graph field OK ... values: {0}'.format(
values))
return (5, graph_aggs_data[1]['aggregations'])
def buckets_aggregation_query(indices, q_data):
try:
base64string = base64.b64encode(
'%s:%s' % (options.es_user, options.es_pawd))
req = urllib2.Request(options.elasticsearch_api + indices + "/_search", json.dumps(q_data))
req.add_header("Authorization", "Basic %s" % base64string)
response = urllib2.urlopen(req).read()
ret = json.loads(response)
return (5, ret)
except Exception as e:
log.error('Request Index {0} request failed.. {1}'.format(indices, e))
return (4, e)
aggsreaaion 时 size =0 是不返回hits值; 结果:
"nested_aggs": {
"buckets": [
{
"histogram_category": {
"buckets": [
{
"key_as_string": "2020-07-01 12:00:00",
"key": 1593561600000,
"doc_count": 2924
},
{
"key_as_string": "2020-08-01 12:00:00",
"key": 1596240000000,
"doc_count": 7112
}
]
},
"key": "系统故障",
"doc_count": 10036
},
{
"histogram_category": {
"buckets": [
{
"key_as_string": "2020-07-01 12:00:00",
"key": 1593561600000,
"doc_count": 105
},
{
"key_as_string": "2020-08-01 12:00:00",
"key": 1596240000000,
"doc_count": 404
}
]
},
"key": "安全故障",
"doc_count": 509
},
{
"histogram_category": {
"buckets": [
{
"key_as_string": "2020-07-01 12:00:00",
"key": 1593561600000,
"doc_count": 107
},
{
"key_as_string": "2020-08-01 12:00:00",
"key": 1596240000000,
"doc_count": 226
}
]
},
"key": "网络故障",
"doc_count": 333
},
{
"histogram_category": {
"buckets": [
{
"key_as_string": "2020-08-01 12:00:00",
"key": 1596240000000,
"doc_count": 15
}
]
},
"key": "应用故障",
"doc_count": 15
},
{
"histogram_category": {
"buckets": [
{
"key_as_string": "2020-07-01 12:00:00",
"key": 1593561600000,
"doc_count": 1
}
]
},
"key": "数据库故障",
"doc_count": 1
}
],
"sum_other_doc_count": 0,
"doc_count_error_upper_bound": 0
}