升级改造报警系统(3-2)

386 阅读13分钟

使用elasticsearch进行搜索 2


ES的聚合分析简介

在报警系统中,不仅需要对es中的数据进行_search endpoint的查询,还需要对数据进行分析,如按时间、按业务;求报警最多的天数,报警最少的天,或者按业务等。基于这些需求 则需要熟悉并灵活应用es的聚合分析等;

一、聚合分析的概念

  1. ES聚合分析是什么

聚合分析是数据库中重要的功能特性,完成对一个查询的数据集中数据的聚合计算,如:找出某字段(或计算表达式的结果)的最大值、最小值,计算和、平均值等。ES作为搜索引擎兼数据库,同样提供了强大的聚合分析能力。

对一个数据集求最大、最小、和、平均值等指标的聚合,在ES中称为指标聚合 metric

而关系型数据库中除了有聚合函数外,还可以对查询出的数据进行分组group by,再在组上进行指标聚合。在 ES 中group by 称为分桶,桶聚合 bucketing

ES中还提供了矩阵聚合(matrix)、管道聚合(pipleline)

  1. ES聚合分析查询的方法

在查询请求体中以aggregations节点按如下语法定义聚合分析:

"aggregations" : {
    "<aggregation_name>" : { <!--聚合的名字 -->
        "<aggregation_type>" : { <!--聚合的类型 -->
            <aggregation_body> <!--聚合体:对哪些字段进行聚合 -->
        }
        [,"meta" : {  [<meta_data_body>] } ]? <!--元 -->
        [,"aggregations" : { [<sub_aggregation>]+ } ]? <!--在聚合里面在定义子聚合 -->
    }
    [,"<aggregation_name_2>" : { ... } ]*<!--聚合的名字 -->
}
  1. 聚合分析的值来源

聚合计算的值可以取字段的值,也可是脚本计算的结果。

二、指标聚合

1. max min sum avg 示例1:查询所有客户中余额的最大值

POST /bank/_search?
{
  "size": 0, 
  "aggs": {
    "masssbalance": { #聚合名
      "max": {# 最大值 聚合类型 max
        "field": "balance"#字段名()
      }
    }
  }
}
结果:
{
  "took": 2080,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "masssbalance": { #聚合名
      "value": 49989 #filed的max值
    }
  }
}

示例2:查询年龄为24岁的客户中的余额最大值

POST /bank/_search?
{
  "size": 2, 
  "query": {
    "match": { #全文检索
      "age": 24 #age 字段值为24
    }
  },
  "sort": [
    {
      "balance": { # balance字段 排序
        "order": "desc"
      }
    }
  ],
  "aggs": {
    "max_balance": { # 聚合名称
      "max": { # max类型
        "field": "balance" #聚合字段名称 是balance(余额)
      }
    }
  }
}


 结果2:
 
 {
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": { # query 结果
    "total": 42,
    "max_score": null,
    "hits": [
      {
        "_index": "bank",
        "_type": "_doc",
        "_id": "697",
        "_score": null,
        "_source": {
          "account_number": 697,
          "balance": 48745,
          "firstname": "Mallory",
          "lastname": "Emerson",
          "age": 24,
          "gender": "F",
          "address": "318 Dunne Court",
          "employer": "Exoplode",
          "email": "malloryemerson@exoplode.com",
          "city": "Montura",
          "state": "LA"
        },
        "sort": [
          48745
        ]
      },
      {
        "_index": "bank",
        "_type": "_doc",
        "_id": "917",
        "_score": null,
        "_source": {
          "account_number": 917,
          "balance": 47782,
          "firstname": "Parks",
          "lastname": "Hurst",
          "age": 24,
          "gender": "M",
          "address": "933 Cozine Avenue",
          "employer": "Pyramis",
          "email": "parkshurst@pyramis.com",
          "city": "Lindcove",
          "state": "GA"
        },
        "sort": [
          47782
        ]
      }
    ]
  },
  "aggregations": { # query执行后的 聚合匹配结果
    "max_balance": {
      "value": 48745
    }
  }
}

2. 文档计数count 示例1:统计银行索引bank下年龄为24的文档数量

POST /bank/_doc/_count
{
  "query": {
    "match": {
      "age" : 24
    }
  }
}

 结果1:
 {
  "count": 42,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  }
}
GET 13.9.16.2:9223/alerts_es/alerts_es/_count

{
  "count": 829227,
  "_shards": {
    "total": 1,
    "successful": 1,
    "failed": 0
  }
}

3. Value count 统计某字段有值的文档数 示例1:

POST /bank/_search?size=0
{
  "aggs": { # 聚合关键字 
    "age_count": { #聚合名称
      "value_count": { #有值统计
        "field": "age" #字段名
      }
    }
  }
}

结果1:
{
  "took": 2022,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_count": {
      "value": 1000
    }
  }
}

4. cardinality 值去重计数

示例1:

POST /bank/_search?size=0
{
  "aggs": { # 关键字
    "age_count": { #聚合名称
      "cardinality": { #去重计数
        "field": "age" #聚合字段
      }
    },
    "state_count": { #聚合名称
      "cardinality": { #去重计数
        "field": "state.keyword" #聚合字段说明:state的使用它的keyword版
      }
    }
  }
}

结果 两个字段聚合 可得两个聚合返回:

{
  "took": 2074,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "state_count": {
      "value": 51
    },
    "age_count": {
      "value": 21
    }
  }
}

5. stats 统计 count max min avg sum 5个值

示例1:

POST /bank/_search?size=0
{
 "aggs": {
   "age_stats": {
     "stats": { #包含五个关键值
       "field": "age"
     }
   }
 }
}

#结果

{
 "took": 7,
 "timed_out": false,
 "_shards": {
   "total": 5,
   "successful": 5,
   "skipped": 0,
   "failed": 0
 },
 "hits": {
   "total": 1000,
   "max_score": 0,
   "hits": []
 },
 "aggregations": {
   "age_stats": {
     "count": 1000,
     "min": 20,
     "max": 40,
     "avg": 30.171,
     "sum": 30171
   }
 }
}

6. Extended stats 高级统计,比stats多4个统计结果: 平方和、方差、标准差、平均值加/减两个标准差的区间

示例1

POST /bank/_search?size=0
{
 "aggs": {
   "age_stats": {
     "extended_stats": {
       "field": "age"
     }
   }
 }
}

结果:

{
 "took": 7,
 "timed_out": false,
 "_shards": {
   "total": 5,
   "successful": 5,
   "skipped": 0,
   "failed": 0
 },
 "hits": {
   "total": 1000,
   "max_score": 0,
   "hits": []
 },
 "aggregations": {
   "age_stats": {
     "count": 1000,
     "min": 20,
     "max": 40,
     "avg": 30.171,
     "sum": 30171,
     "sum_of_squares": 946393,
     "variance": 36.10375899999996,
     "std_deviation": 6.008640362012022,
     "std_deviation_bounds": {
       "upper": 42.18828072402404,
       "lower": 18.153719275975956
     }
   }
 }
}

7. Percentiles 占比百分位对应的值统计 对指定字段(脚本)的值按从小到大累计每个值对应的文档数的占比(占所有命中文档数的百分比),返回指定占比比例对应的值。默认返回[ 1, 5, 25, 50, 75, 95, 99 ]分位上的值。如下中间的结果,可以理解为:占比为50%的文档的age值 <= 31,或反过来:age<=31的文档数占总命中文档数的50%

示例1

POST /bank/_search?size=0
{
  "aggs": {
    "age_percents": {
      "percentiles": {
        "field": "age"
      }
    }
  }
}

结果1:
{
  "took": 87,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_percents": {
      "values": {
        "1.0": 20,
        "5.0": 21,
        "25.0": 25,
        "50.0": 31,
        "75.0": 35.00000000000001,
        "95.0": 39,
        "99.0": 40
      }
    }
  }
}

3结果说明:

#占比为50%的文档的age值 <= 31,或反过来:age<=31的文档数占总命中文档数的50%

指定分位值

POST /bank/_search?size=0
{
  "aggs": {
    "age_percents": {
      "percentiles": {
        "field": "age",
        "percents" : [95, 99, 99.9] 
      }
    }
  }
}

{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_percents": {
      "values": {
        "95.0": 39,
        "99.0": 40,
        "99.9": 40
      }
    }
  }
}

8. Percentiles rank 统计值小于等于指定值的文档占比 示例1:统计年龄小于25和30的文档的占比,和第7项相反

POST /bank/_search?size=0
{
 "aggs": {
   "gge_perc_rank": {
     "percentile_ranks": {
       "field": "age",
       "values": [
         25,
         30
       ]
     }
   }
 }
}

结果2:

{
 "took": 8,
 "timed_out": false,
 "_shards": {
   "total": 5,
   "successful": 5,
   "skipped": 0,
   "failed": 0
 },
 "hits": {
   "total": 1000,
   "max_score": 0,
   "hits": []
 },
 "aggregations": {
   "gge_perc_rank": {
     "values": { #年龄小于25的文档占比为26.1%,年龄小于30的文档占比为49.2%,
       "25.0": 26.1,
       "30.0": 49.2
     }
   }
 }
}

三、桶聚合buckets

1. Terms Aggregation 根据字段值项分组聚合

size 指定返回多少个分组:"size": 20,在聚合桶的分组

POST /bank/_search?size=0
{
  "aggs": {
    "age_terms": {
      "terms": { #分组
        "field": "age",
        "size": 10 #桶聚合分组个数
      }
    }
  }
}

结果1

{
  "took": 2000,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_terms": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 463,
      "buckets": [
        {
          "key": 31,
          "doc_count": 61
        },
        {
          "key": 39,
          "doc_count": 60
        },
        {
          "key": 26,
          "doc_count": 59
        },
        {
          "key": 32,
          "doc_count": 52
        },
        {
          "key": 35,
          "doc_count": 52
        },
        {
          "key": 36,
          "doc_count": 52
        },
        {
          "key": 22,
          "doc_count": 51
        },
        {
          "key": 28,
          "doc_count": 51
        },
        {
          "key": 33,
          "doc_count": 50
        },
        {
          "key": 34,
          "doc_count": 49
        }
      ]
    }
  }
}

结果说明:

"doc_count_error_upper_bound": 0:文档计数的最大偏差值

"sum_other_doc_count": 463:未返回的其他项的文档数

默认情况下返回按文档计数从高到低的前10个分组,"size": 可指定返回的buckets分组个数

示例2:每个分组上显示偏差值

POST /bank/_search?size=0
{
  "aggs": {
    "age_terms": {
      "terms": {
        "field": "age",
        "size": 5,
        "shard_size": 20,
        "show_term_doc_count_error": true
      }
    }
  }
}

结果

{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_terms": {
      "doc_count_error_upper_bound": 25,
      "sum_other_doc_count": 716,
      "buckets": [
        {
          "key": 31,
          "doc_count": 61,
          "doc_count_error_upper_bound": 0
        },
        {
          "key": 39,
          "doc_count": 60,
          "doc_count_error_upper_bound": 0
        },
        {
          "key": 26,
          "doc_count": 59,
          "doc_count_error_upper_bound": 0
        },
        {
          "key": 32,
          "doc_count": 52,
          "doc_count_error_upper_bound": 0
        },
        {
          "key": 36,
          "doc_count": 52,
          "doc_count_error_upper_bound": 0
        }
      ]
    }
  }
}

示例3:shard_size 指定每个分片上返回多少个分组

shard_size 的默认值为: 索引只有一个分片 =size 多分片 = size * 1.5 +10

POST /bank/_search?size=0
{
  "aggs": {
    "age_terms": {
      "terms": {
        "field": "age",
        "size": 5,
        "shard_size": 20
      }
    }
  }
}

结果4:

{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_terms": {
      "doc_count_error_upper_bound": 25,
      "sum_other_doc_count": 716,
      "buckets": [
        {
          "key": 31,
          "doc_count": 61
        },
        {
          "key": 39,
          "doc_count": 60
        },
        {
          "key": 26,
          "doc_count": 59
        },
        {
          "key": 32,
          "doc_count": 52
        },
        {
          "key": 36,
          "doc_count": 52
        }
      ]
    }
  }
}

*** order 指定分组的排序**

示例4:根据文档计数排序

POST /bank/_search?size=0
{
  "aggs": {
    "age_terms": {
      "terms": {
        "field": "age",
        "order" : { "_count" : "asc" }
      }
    }
  }
}

结果

{
  "took": 3,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_terms": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 584,
      "buckets": [
        {
          "key": 29,
          "doc_count": 35
        },
        {
          "key": 27,
          "doc_count": 39
        },
        {
          "key": 38,
          "doc_count": 39
        },
        {
          "key": 23,
          "doc_count": 42
        },
        {
          "key": 24,
          "doc_count": 42
        },
        {
          "key": 25,
          "doc_count": 42
        },
        {
          "key": 37,
          "doc_count": 42
        },
        {
          "key": 20,
          "doc_count": 44
        },
        {
          "key": 40,
          "doc_count": 45
        },
        {
          "key": 21,
          "doc_count": 46
        }
      ]
    }
  }
}

示例6:根据分组值排序

POST /bank/_search?size=0
{
  "aggs": {
    "age_terms": {
      "terms": {
        "field": "age",# 根据filed排序
        "order" : { "_key" : "asc" }
      }
    }
  }
}

result

{
  "took": 10,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_terms": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 549,
      "buckets": [
        {
          "key": 20,
          "doc_count": 44
        },
        {
          "key": 21,
          "doc_count": 46
        },
        {
          "key": 22,
          "doc_count": 51
        },
        {
          "key": 23,
          "doc_count": 42
        },
        {
          "key": 24,
          "doc_count": 42
        },
        {
          "key": 25,
          "doc_count": 42
        },
        {
          "key": 26,
          "doc_count": 59
        },
        {
          "key": 27,
          "doc_count": 39
        },
        {
          "key": 28,
          "doc_count": 51
        },
        {
          "key": 29,
          "doc_count": 35
        }
      ]
    }
  }
}

示例7:取分组指标值排序

POST /bank/_search?size=0
{
  "aggs": { # 关键字
    "age_terms": { #聚合名称
      "terms": { #桶排序 根据字段值项分组
        "field": "age", #分组字段值
        "order": {
          "max_balance": "asc" #指标值排序,按max_balance 指定的field字段值 升序排序
        }
      },
      "aggs": { # 关键字
        "max_balance": { #聚合名称
          "max": { #指标 最大
            "field": "balance" #字段名
          }
        },
        "min_balance": { #聚合名称
          "min": {#指标 最小
            "field": "balance"#字段名
          }
        }
      }
    }
  }
}

结果

{
  "took": 28,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_terms": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 511,
      "buckets": [
        {
          "key": 27,
          "doc_count": 39,
          "min_balance": {
            "value": 1110
          },
          "max_balance": {
            "value": 46868
          }
        },
        {
          "key": 39,
          "doc_count": 60,
          "min_balance": {
            "value": 3589
          },
          "max_balance": {
            "value": 47257
          }
        },
        {
          "key": 37,
          "doc_count": 42,
          "min_balance": {
            "value": 1360
          },
          "max_balance": {
            "value": 47546
          }
        },
        {
          "key": 32,
          "doc_count": 52,
          "min_balance": {
            "value": 1031
          },
          "max_balance": {
            "value": 48294
          }
        },
        {
          "key": 26,
          "doc_count": 59,
          "min_balance": {
            "value": 1447
          },
          "max_balance": {
            "value": 48466
          }
        },
        {
          "key": 33,
          "doc_count": 50,
          "min_balance": {
            "value": 1314
          },
          "max_balance": {
            "value": 48734
          }
        },
        {
          "key": 24,
          "doc_count": 42,
          "min_balance": {
            "value": 1011
          },
          "max_balance": {
            "value": 48745
          }
        },
        {
          "key": 31,
          "doc_count": 61,
          "min_balance": {
            "value": 2384
          },
          "max_balance": {
            "value": 48758
          }
        },
        {
          "key": 34,
          "doc_count": 49,
          "min_balance": {
            "value": 3001
          },
          "max_balance": {
            "value": 48997
          }
        },
        {
          "key": 29,
          "doc_count": 35,
          "min_balance": {
            "value": 3596
          },
          "max_balance": {
            "value": 49119
          }
        }
      ]
    }
  }
}

** 示例8:筛选分组-正则表达式匹配值**

GET /_search
{
    "aggs" : {
        "tags" : {
            "terms" : {
                "field" : "tags",
                "include" : ".*sport.*",#包含
                "exclude" : "water_.*"#排除
            }
        }
    }
}

示例9:筛选分组-指定值列表

GET /_search
{
    "aggs" : {
        "JapaneseCars" : {
             "terms" : {
                 "field" : "make",
                 "include" : ["mazda", "honda"]
             }
         },
        "ActiveCarManufacturers" : {
             "terms" : {
                 "field" : "make",
                 "exclude" : ["rover", "jensen"]
             }
         }
    }
}

2. filter Aggregation 对满足过滤查询的文档进行聚合计算

在查询命中的文档中选取符合过滤条件的文档进行聚合,先过滤再聚合

示例1:(o゜▽゜)o☆[BINGO!]

POST /bank/_search?size=0
{
  "aggs": {
    "age_terms": {
      "filter": {"match":{"gender":"F"}},
      "aggs": {
        "avg_age": {
          "avg": {
            "field": "age"
          }
        }
      }
    }
  }
}

结果

{
  "took": 163,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_terms": {
      "doc_count": 493,
      "avg_age": {
        "value": 30.3184584178499
      }
    }
  }
}

3. Filters Aggregation 多个过滤组聚合计算

示例1:

GET logs/_search
{
  "size": 0,
  "aggs": {
    "messages": {
      "filters": {
        "filters": {
          "errors": {
            "match": {
              "body": "error"
            }
          },
          "warnings": {
            "match": {
              "body": "warning"
            }
          }
        }
      }
    }
  }
}

resutl

{
  "took": 18,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "messages": {
      "buckets": {
        "errors": {
          "doc_count": 1
        },
        "warnings": {
          "doc_count": 2
        }
      }
    }
  }
}
GET logs/_search
{
  "size": 0,
  "aggs": {
    "messages": {
      "filters": {
        "other_bucket_key": "other_messages",
        "filters": {
          "errors": {
            "match": {
              "body": "error"
            }
          },
          "warnings": {
            "match": {
              "body": "warning"
            }
          }
        }
      }
    }
  }
}

指定一个key

{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "messages": {
      "buckets": {
        "errors": {
          "doc_count": 1
        },
        "warnings": {
          "doc_count": 2
        },
        "other_messages": {
          "doc_count": 0
        }
      }
    }
  }
}

4. Range Aggregation 范围分组聚合

(o゜▽゜)o☆[BINGO!]

示例1:

POST /bank/_search?size=0
{
  "aggs": {
    "age_range": {
      "range": {
        "field": "age",
        "ranges": [
          {
            "to": 25
          },
          {
            "from": 25,
            "to": 35
          },
          {
            "from": 35
          }
        ]
      },
      "aggs": {
        "bmax": {
          "max": {
            "field": "balance"
          }
        }
      }
    }
  }
}

result

{
  "took": 7,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_range": {
      "buckets": [
        {
          "key": "*-25.0",
          "to": 25,
          "doc_count": 225,
          "bmax": {
            "value": 49587
          }
        },
        {
          "key": "25.0-35.0",
          "from": 25,
          "to": 35,
          "doc_count": 485,
          "bmax": {
            "value": 49795
          }
        },
        {
          "key": "35.0-*",
          "from": 35,
          "doc_count": 290,
          "bmax": {
            "value": 49989
          }
        }
      ]
    }
  }
}
  • 示例2:为组指定key (o゜▽゜)o☆[BINGO!]
POST /bank/_search?size=0
{
  "aggs": {
    "age_range": {
      "range": {
        "field": "age",
        "keyed": true,
        "ranges": [
          {
            "to": 25,
            "key": "Ld"
          },
          {
            "from": 25,
            "to": 35,
            "key": "Md"
          },
          {
            "from": 35,
            "key": "Od"
          }
        ]
      }
    }
  }
}

result

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "age_range": {
      "buckets": {
        "Ld": {
          "to": 25,
          "doc_count": 225
        },
        "Md": {
          "from": 25,
          "to": 35,
          "doc_count": 485
        },
        "Od": {
          "from": 35,
          "doc_count": 290
        }
      }
    }
  }
}

5. Date Range Aggregation 时间范围分组聚合

(o゜▽゜)o☆[BINGO!]

示例1:

{
  "aggs": {
    "range": {
      "date_range": {
        "field": "date",
        "format": "MM-yyy",
        "ranges": [
          {
            "to": "now-10M/M"
          },
          {
            "from": "now-10M/M"
          }
        ]
      }
    }
  }
}

result

{
  "took": 115,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "range": {
      "buckets": [
        {
          "key": "*-2017-08-01T00:00:00.000Z",
          "to": 1501545600000,
          "to_as_string": "2017-08-01T00:00:00.000Z",
          "doc_count": 0
        },
        {
          "key": "2017-08-01T00:00:00.000Z-*",
          "from": 1501545600000,
          "from_as_string": "2017-08-01T00:00:00.000Z",
          "doc_count": 0
        }
      ]
    }
  }
}

6. Date Histogram Aggregation 时间直方图(柱状)聚合

(o゜▽゜)o☆[BINGO!] 就是按天、月、年等进行聚合统计。可按 year (1y), quarter (1q), month (1M), week (1w), day (1d), hour (1h), minute (1m), second (1s) 间隔聚合或指定的时间间隔聚合。

POST /bank/_search?size=0
{
  "aggs": {
    "sales_over_time": {
      "date_histogram": {
        "field": "date",
        "interval": "month"
      }
    }
  }
}

resutl

{
  "took": 9,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "sales_over_time": {
      "buckets": []
    }
  }
}

7. Missing Aggregation 缺失值的桶聚合

POST /bank/_search?size=0
{
    "aggs" : {
        "account_without_a_age" : {
            "missing" : { "field" : "age" }
        }
    }
}

#### 8. Geo Distance Aggregation 地理距离分区聚合

聚合分析(指标聚合、桶聚合)

桶聚合一般不单独使用,都是配合指标聚合一起使用,对数据分组之后肯定要统计桶内数据,在ES中如果没有明确指定指标聚合,默认使用Value Count指标聚合,统计桶内文档总数。

四、嵌套聚合 Nested Aggregation


Nested Aggregation

一个特殊的single bucket聚合,可以聚合嵌套的文档

(1) 计算每个tag下图书的平均价格, 再按平均价格降序排序, 查询语法:

GET book_shop/it_book/_search
{
    "size": 0,
    "aggs": {
        "all_tags": {
            "terms": {
                "field": "tags.keyword", 
                "order": { "avg_price": "desc" } // 根据下述统计的结果排序
            },
            "aggs": {
                "avg_price": {
                    "avg": { "field": "price" }
                }
            }
        }
    }
}

更多案例如下:

通过 Elasticsearch 实现聚合检索 (分组统计) 目录

Python应用实例

class AggregationBuckets(object):
    def __init__(self, _from=None, size=None, index=[], **kwargs):
        """
        基于created_at 作为事件参数
        :param _from: ES 查询起始地址,用于分页;
        :param size: ES 查询个数,用于分页;
        :param index: 查询的ES index,list传入;
        :param kwargs: 具体的查询参数;
        """
        self.start_num = _from
        self.end_num = size
        self.index = index  # str alerts_es

        self.start_query_time = kwargs.get('start_query_time', None)
        self.end_query_time = kwargs.get('end_query_time', None)
        
        if not self.index or not self.start_query_time or not self.end_query_time:
            log.error(
                'Setup1: start es aggregation Incomplete parameter ')
            return (3, 'Incomplete parameter...')
        log.info('parameter'
                 'indices: {0} start_query_time: {1} end_query_time: {2}'.format(
                    self.index, self.start_query_time, self.end_query_time))

        self.query_terms = kwargs.get('query_terms', None) # dict {}
        self.query_string = kwargs.get('query_string', None)
        self.histogram_interval = kwargs.get('histogram_interval', None)  # srt day
        self.histogram_field = kwargs.get('histogram_field', 'created_at')
        self.histogram_format = kwargs.get('histogram_format', 'yyy-MM-dd hh:mm:ss')

        # range date
        self.range_date_field = kwargs.get('range_date_field', 'created_at')
        self.range_date_format = kwargs.get('range_date_format', 'yyy-MM-dd hh:mm:ss')
        self.range_date_ranges = kwargs.get('range_date_ranges', None)  # list [{"to": "now", "from": "now-1d"}]

        self.terms_field = kwargs.get('terms_field', None) # list ['ip']
        self.terms_size = kwargs.get('terms_size', 10)
        self.terms_order = kwargs.get('terms_order', {"_count": "desc"})  # dict {"_count": "desc"}

        self.alert_id = kwargs.get('alerts_id', '')
        self.filter_aggs = kwargs.get('filter_aggs', None)
        self.pipeline_aggs = kwargs.get('pipeline_aggs', None)
        self.nested_aggs = kwargs.get('nested_aggs', None)

    def query(self):

        query_aggs = {
            "query": {
                "filtered": {
                    # 'query': {},
                    "filter": {
                        "range": {
                            "created_at": {"lt": "", "gt": ""}}}},
            },
            "aggs": {},
            "sort": {"created_at": {"order": "desc"}}
        }

        query_aggs['size'] = self.end_num
        query_aggs['from'] = self.start_num

        query_aggs['query']['filtered']['filter']['range']['created_at'] = \
            {"lte": self.end_query_time, "gte": self.start_query_time}
        if self.query_terms:
            query_aggs['query']['filtered'].update({'query': self.query_terms})
        if self.query_string:
            query_aggs['query']['filtered'].update({'query': self.query_string})

        if self.histogram_interval:
            histogram_aggs = {"date_histogram": {}}
            histogram_aggs['date_histogram']['field'] = self.histogram_field
            histogram_aggs['date_histogram']['interval'] = self.histogram_interval
            histogram_aggs['date_histogram']['format'] = self.histogram_format

            query_aggs['aggs']['histogram_aggs'] = histogram_aggs
            if self.pipeline_aggs:
                query_aggs['aggs']['histogram_aggs']['aggs'] = self.pipeline_aggs

        if self.range_date_ranges:
            range_date_aggs = {"date_range": {}}
            range_date_aggs['date_range']['field'] = self.range_date_field
            range_date_aggs['date_range']['format'] = self.range_date_format
            range_date_aggs['date_range']['ranges'] = self.range_date_ranges

            query_aggs['aggs']['range_date_aggs'] = range_date_aggs
        if self.terms_field:
            for i in self.terms_field:
                if i:
                    terms_name = {"terms": {}}
                    terms_name['terms']['field'] = i
                    terms_name['terms']['size'] = self.terms_size
                    terms_name['terms']['order'] = self.terms_order
                    # terms_name_aggs = 'terms_aggs_' + i
                    query_aggs['aggs'][i] = terms_name
        if self.filter_aggs:
            query_aggs['aggs']['filter_aggs'] = self.filter_aggs
        if self.nested_aggs:
            query_aggs.pop('sort')
            query_aggs['aggs'].update(self.nested_aggs)
        log.info('query date is {0}'.format(query_aggs))
        return buckets_aggregation_query(self.index[0], query_aggs)

    def query_other_by_alert_id(self, **kwargs):
        ids = kwargs.get('alert_id', None)
        if not ids:
            return (3, 'Incomplete parameter...')
        q_id = {'query': {'terms': {'alerts_id': ids}}}
        q_id["size"] = self.end_num - self.start_num
        q_id["from"] = 0
        return query(self.index[1], q_id)
def active_two_date(size=None, _from=None, index=['alerts_es', 'alerts_server', 'alerts_sended'], **kwargs):
    size = size
    _from =_from
    indexs = index

    ####### two day active #############
    active_data = {'hits': []}
    values = {}
    values['start_query_time'] = 'now-1d'
    values['end_query_time'] = 'now'
    values['histogram_interval'] = 'day'
    values['range_date_ranges'] = [{"to": "now", "from": "now-1d"}]
    values['terms_field'] = ['ip.raw', '']

    log.info('running query active_two_date data ...')

    es_query = AggregationBuckets(size=size, _from=_from, index=index, **values)
    alerts_data = es_query.query()
    if alerts_data[0] != 5:
        return alerts_data
    # alerts_data_hits = alerts_data[1]['hits'][']
    log.info('Setup1: query {0} of active two day OK ...'.format(index[0]))
    
    ids = [int(i['_id']) for i in alerts_data[1]['hits']['hits'] if i['_id']]
    log.info('Setup2: start query {0}'.format(indexs[1]))
    if not ids:
        return (3, 'Alert id not empty...')
    alerts_server = es_query.query_other_by_alert_id(alert_id=ids)
    if alerts_server[0] != 5:
        return alerts_server
    log.info('Setup2: query {0} of {1} OK ...'.format(indexs[1], ids))

    for i in alerts_data[1]['hits']['hits']:
        for j in alerts_server[1]['hits']:
            if int(i['_id']) == int(j['_source']['alerts_id']):
                i['_source']['status'] = u'问题'
                a = datetime.datetime.utcnow() - datetime.datetime.strptime(
                    i['_source']['created_at'], '%Y-%m-%d %H:%M:%S')
                i['_source']['up_time'] = a.__str__().split('.')[0]
                if hasattr(i['_source'], 'deleted_at'):
                    i['_source']['status'] = u'恢复'
                    a = datetime.datetime.strptime(
                        i['_source']['deleted_at'], '%Y-%m-%d %H:%M:%S') - \
                        datetime.datetime.strptime(
                            i['_source']['created_at'], '%Y-%m-%d %H:%M:%S')
                    i['_source']['up_time'] = a.__str__().split('.')[0]
                i['_source']['business'] = j['_source']['division'] + ' ' + \
                                           j['_source']['product'] + ' ' + \
                                           j['_source']['portal'] + ' ' + \
                                           j['_source']['application']
                active_data['hits'].append(i)
    active_data['page_num'] = alerts_data[1]['hits']['total']
    active_data['aggregations'] = alerts_data[1]['aggregations']
    return (5, active_data)

# 获取函数    
def graph_data(size=None, _from=None, index=['alerts_es', 'alerts_server', 'alerts_sended'], **kwargs):
    size = size
    _from = _from
    index = index
    

    nested_aggs = {'nested_aggs':
                       {"terms": {"field": "category.raw"},
                        "aggs": {
                            "histogram_category": {
                                "date_histogram": {
                                    "field": kwargs.get('histogram_field', 'created_at'),
                                    "interval": kwargs.get('histogram_interval', 'day'),
                                    "format": kwargs.get('histogram_format', 'yyy-MM-dd hh:mm:ss')
                                }
                            }
                        }
                    }}
    # nested

    values = {'nested_aggs': nested_aggs}
    values['start_query_time'] = kwargs.get('start_query_time', 'now-1y')
    values['end_query_time'] = kwargs.get('end_query_time', 'now')
    es_query = AggregationBuckets(size=0, _from=_from, index=index,
                                  **values)
    graph_aggs_data = es_query.query()
    if graph_aggs_data[0] != 5:
        return graph_aggs_data
    log.info('Get graph field OK ... values: {0}'.format(
        values))

    return (5, graph_aggs_data[1]['aggregations'])
        
        
def buckets_aggregation_query(indices, q_data):
    try:
        base64string = base64.b64encode(
            '%s:%s' % (options.es_user, options.es_pawd))

        req = urllib2.Request(options.elasticsearch_api + indices + "/_search", json.dumps(q_data))
        req.add_header("Authorization", "Basic %s" % base64string)
        response = urllib2.urlopen(req).read()
        ret = json.loads(response)
        return (5, ret)

    except Exception as e:
        log.error('Request Index {0} request failed.. {1}'.format(indices, e))
        return (4, e)

aggsreaaion 时 size =0 是不返回hits值; 结果:

"nested_aggs": {
      "buckets": [
        {
          "histogram_category": {
            "buckets": [
              {
                "key_as_string": "2020-07-01 12:00:00",
                "key": 1593561600000,
                "doc_count": 2924
              },
              {
                "key_as_string": "2020-08-01 12:00:00",
                "key": 1596240000000,
                "doc_count": 7112
              }
            ]
          },
          "key": "系统故障",
          "doc_count": 10036
        },
        {
          "histogram_category": {
            "buckets": [
              {
                "key_as_string": "2020-07-01 12:00:00",
                "key": 1593561600000,
                "doc_count": 105
              },
              {
                "key_as_string": "2020-08-01 12:00:00",
                "key": 1596240000000,
                "doc_count": 404
              }
            ]
          },
          "key": "安全故障",
          "doc_count": 509
        },
        {
          "histogram_category": {
            "buckets": [
              {
                "key_as_string": "2020-07-01 12:00:00",
                "key": 1593561600000,
                "doc_count": 107
              },
              {
                "key_as_string": "2020-08-01 12:00:00",
                "key": 1596240000000,
                "doc_count": 226
              }
            ]
          },
          "key": "网络故障",
          "doc_count": 333
        },
        {
          "histogram_category": {
            "buckets": [
              {
                "key_as_string": "2020-08-01 12:00:00",
                "key": 1596240000000,
                "doc_count": 15
              }
            ]
          },
          "key": "应用故障",
          "doc_count": 15
        },
        {
          "histogram_category": {
            "buckets": [
              {
                "key_as_string": "2020-07-01 12:00:00",
                "key": 1593561600000,
                "doc_count": 1
              }
            ]
          },
          "key": "数据库故障",
          "doc_count": 1
        }
      ],
      "sum_other_doc_count": 0,
      "doc_count_error_upper_bound": 0
    }