阅读 200

Elasticsearch7.1中文文档-第五章-文档API

本节首先简要介绍Elasticsearch的数据复制模型,然后会对CRUD API进行详细说明:

单文档API
多文档API

所有CRUD API都是单索引API。 index参数接受单个索引名称或指向单个索引的alias

读写文档

介绍

在Elasticsearch中,每个索引都会被分成分片,每个分片会有多个副本分片。这些副本称为复制组(replication group),在添加或删除文档时会保持同步。如果我们同步,从一个副本中读取将导致与从另一个副本中读取的结果截然不同的结果。保持副本同步和读取数据提供服务的过程称为数据复制模型。

Elasticsearch的是数据复制模型是基于主从模型的,并在微软研究院的PacificA论文中有更好的描述。主分片是从复制组中复制的一个副本。其他副本则被称为副本分片。主节点是所有索引操作的主要入口点,它负责验证它们并确保它们是正确的。一旦索引操作被主节点接受,主节点还负责将该操作复制到其他副本。

本节的目的是对Elasticsearch复制模型进行更深的概述,并讨论它对写操作和读操作之间的各种交互的影响。

基础的写模型

Elasticsearch中的每个索引操作首先使用路由(通常基于文档ID)解析到一个复制组。一旦确定了复制组,操作将在内部转发到该组的当前主分片。主分片负责验证操作并将其转发到其他副本。由于副本可以脱机,所以不需要主分片复制到所有副本。相反,Elasticsearch维护一个应该接收操作的分片副本列表。这个列表称为同步副本,由主节点维护,因此必须将所有操作复制到这个副本列表中的每个副本。顾名思义,分片副本保证已经处理了所有已向用户确认的索引和删除操作。

主分片遵循以下基本流程:

  1. 验证传入操作并在结构无效时拒绝它,换言之,需要传入符合结构的参数(例如:在需要一个数字的对象字段中)
  2. 在本地执行操作,即索引或删除相关文档。这还将验证字段的内容,不符要求会拒绝(例如:关键字值太长,无法在Lucene中进行索引)。
  3. 将操作转发到当前同步复制集中的每个副本,如果有多个副本,则并行执行。
  4. 一旦所有副本都成功地执行了操作并响应了主副本,主副本就会向客户端确认请求成功完成。
失败处理

在索引期间有很多事情可以导致出错——磁盘可能会损坏,节点可能会相互断开连接,或者某些配置错误可能导致副本上的操作失败,尽管它在主服务器上成功。 这些很少见,但主要必须回应它们。

在主节点本身发生故障的情况下,托管主节点的节点将向主请求发送有关它的消息。 索引操作将等待(默认情况下最多1分钟),主节点将其中一个副本提升为新的主节点。 然后,该操作将被转发到新的主节点处理。 请注意,主节点还会监控节点的运行状况,并可能决定主动降级主节点。 当通过网络问题将拥有主节点的节点与群集隔离时,通常会发生这种情况。 有关详细信息,请参见此处

一旦在主节点上成功执行了操作,在副本分片上执行该操作时,主节点必须处理潜在的问题。 这可能是由副本上的实际故障或由于网络问题导致操作无法到达副本(或阻止副本响应)引起的。 所有这些都具有相同的最终结果:作为同步副本列表的一部分的副本错过了即将被确认的操作。 为了避免违反规则,主节点向主请求发送消息,请求从同步副本列表中删除有问题的分片。 只有在主请求确认删除了分片后,主请求才会确认操作。 请注意,主节点还将指示另一个节点开始构建新的分片副本,以便将系统恢复到正常状态。

在将操作转发到副本时,主节点将使用副本来验证它仍然是活动的主节点。如果主节点由于网络分区(或长时间GC)而被隔离,它可能会在意识到已降级之前继续处理传入索引操作。来自旧主节点的操作将被副本拒绝。当主节点收到来自副本的响应,拒绝了它的请求(因为它不再是主节点),那么主节点就会向主服务器发出请求,并知道它已经被替换了。然后将操作路由到新的主节点。

如果没有副本会发生什么?

我们只使用数据的单一副本运行,物理硬件问题可能会导致数据丢失。

基础的读模型

Elasticsearch中的读取可以是ID非常轻量级的查找,也可以是具有复杂聚合的大量搜索请求,这些聚合会占用非常珍贵的CPU能力。 主从模型的一个优点是它使所有分片副本保持一致(除了正在进行的操作)。 因此,单个同步副本足以提供读取请求。

当节点接收到读请求时,该节点负责将其转发给包含相关分片的节点、整理并响应客户端。我们将该节点称为请求的协调节点。基本流程如下:

  1. 将读请求解析到相关分片。注意,由于大多数搜索将被发送到一个或多个索引,因此它们通常需要从多个分片中读取,每个分片表示数据的不同子集。
  2. 从分片复制组中选择每个相关分片的活动副本。这可以是主分片的,也可以是副本。默认情况下,Elasticsearch只是在分片副本之间进行循环。
  3. 将分片级别的读请求发送到所选副本。
  4. 整合结果并做出响应。注意,在通过ID查找的情况下,一定是只有一个结果,可以跳过这一步。
(请求)分片失败

当分片无法响应读请求时,协调节点将请求发送到相同复制组中的另一个分片副本。重复失败会导致没有可用的分片副本。

为了确保快速响应,如果一个或多个分片失败,以下api将响应部分结果:

包含部分结果的响应仍然提供一个200 OK HTTP状态码。分片故障由响应头的timed_out_shards字段表示。

一些简单的含义

这些基本流程中的每一个都决定了Elasticsearch作为读写系统的行为。此外,由于读写请求可以并发执行,所以这两个基本流彼此交互。这有一些固有的含义:

高效读

在正常操作下,对每个相关复制组执行一次读取操作。 只有在失败条件下,同一个分片的多个副本才会执行相同的搜索。

读未应答

由于主节点的第一个索引在本地然后复制请求,因此并发读取可能在确认之前已经看到了更改。

由于主节点的第一个索引是本地索引,然后复制请求,所以并发读取可能在确认更改之前就已经看到了更改。

默认两个分片

这样可以提高容错,同时保留两个数据副本。

失败

以下情况可能导致失败:

单个分片可以降低索引速度

由于主节点在每次操作期间都要等待同步副本列表中的所有副本,因此一个慢速副本可能会减慢整个复制组的速度。这是我们为上面提到的读取效率所付出的代价。当然,一个单一的慢分片也会减慢已经被路由到它的搜索。

脏读

一个独立的主节点暴露写入,写入是不会确认的。这是因为独立主节点只有在向其副本发送请求或向主节点发出请求时才会意识到它是隔离的。此时,操作已经被索引到主节点中,并且可以并发读取。Elasticsearch每秒钟(默认情况下)ping一次主节点,如果不知道主节点,则拒绝索引操作,从而降低了这种风险。

冰山一角

本文档提供了更高级的Elasticsearch如何处理数据的预览,当然,表面之下ES做了很多事情。比如,集群状态发布,主节点选举等在保证系统正常运行方面发挥了不小的作用。这个文档没有涵盖已知的和重要的bug(关闭的和打开的),为了帮助人们知道并了解,我们在我们的网站上保持一个专门的页面。我们强烈建议阅读它。

Index API

添加或更新JSON文档到指定索引,使该文档可搜索,下面的例子演示了插入一个文档到JSON文档到twitter索引中,并且id为1:

curl -X PUT "localhost:9200/twitter/_doc/1" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}
'
复制代码

上面索引的操作结果是:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    },
    "_index" : "twitter",
    "_type" : "_doc",
    "_id" : "1",
    "_version" : 1,
    "_seq_no" : 0,
    "_primary_term" : 1,
    "result" : "created"
}
复制代码

_shard提供了关于索引操作的复制过程的信息。

total表示多少分片需要被执行该索引操作(包含主分片和副本分片)。

successful表示所有分片中有多少成功的。

failed表示有多少分片是执行失败的。

如果索引操作成功,则successful至少为1。

当一个索引操作返回成功(默认情况下,只有主分片是必须的,但是此行为是可以改变的 ),副本分片可能没有全部开始。在上面的例子中,total将等于number_of_replicas设置的总分片数,successful将等于已开始的执行的分片数(主+副),如果没有失败,则failed是0.

自动创建索引

如果索引不存在,创建索引操作将会创建一个索引,并应用预配置index模板。如果映射不存在,创建索引操作还会动态创建索引。默认的,如果需要,新字段和对象将会自动的添加到映射定义。有关映射定义的更多信息,请查看映射部分;有关手动更新映射的信息,请参阅Put Mapping API

自动创建索引是通过action.auto_create_index设置来控制的,这个设置默认是true,意味着索引总是会自动创建。只有匹配特定模式的索引才允许自动创建索引,方法是将此设置的值更改为这些模式的逗号分隔列表。还可以明确的在列表中使用+或者-来指定允许或者禁止,要彻底的禁用可以将其设置为false

curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
    	##只允许自动创建twitter,index10或者以ind开头的索引,禁止创建以index1开头的索引
        "action.auto_create_index": "twitter,index10,-index1*,+ind*" 
    }
}
'
curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
    	##完全禁用自动创建索引
        "action.auto_create_index": "false" 
    }
}
'
curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
    "persistent": {
    	##允许自动创建所有索引
        "action.auto_create_index": "true" 
    }
}
'
复制代码
操作类型

索引操作还接受op_type参数,用于强制create操作,允许省略。当使用了create,在索引中如果索引中已存在该ID的文档,索引操作将失败。

下面有一个例子使用了op_type参数:

curl -X PUT "localhost:9200/twitter/_doc/1?op_type=create" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}
'
复制代码

指定选项为create使用下面的URI:

curl -X PUT "localhost:9200/twitter/_create/1" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}
'
复制代码
自动生成ID

索引操作没有指定id也可以被执行,下面的例子ID将会自动生成。此外,op_type将会自动的被设置为create,请看下面的例子(注意,是POST而不是PUT):

curl -X POST "localhost:9200/twitter/_doc/" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}
'
复制代码

上面索引操作的结果是:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    },
    "_index" : "twitter",
    "_type" : "_doc",
    "_id" : "W0tpsmIBdwcYyG50zbta",
    "_version" : 1,
    "_seq_no" : 0,
    "_primary_term" : 1,
    "result": "created"
}
复制代码
乐观并发控制

索引操作可以是有条件的,只有在为文档的最后一次修改分配了if_seq_noif_primary_term参数指定的序列号和主要术语时才能执行索引操作。 如果检测到不匹配,则操作将导致VersionConflictException和状态代码409.有关详细信息,请参阅乐观并发控制

路由

默认情况下,分片放置? 还是路由? 通过使用文档的id值的哈希来控制。 为了更明确地控制,可以使用路由参数在每个操作的基础上直接指定馈入路由器使用的散列函数的值。 例如:

curl -X POST "localhost:9200/twitter/_doc?routing=kimchy" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}
'
复制代码

在上面的例子中,_doc文档是被路由到routing提供的参数“kimchy”中,当明确的设置映射,__route字段用来直接的索引操作从自身文档中来提取路由值,这是以额外的文档解析传递(非常小)的代价实现的。如果routing映射被定义,并且设置为required,如果没有路由值提供或者可以提取,这个索引操作将失败。

分发

索引操作根据其路由指向主分片,并在包含此分片的实际节点上执行。主分片完成操作后,如果需要,更新将分发到可应用的副本。

等待活动分片

为了提高系统写入的容错性,索引操作可以配置为在继续操作之前等待一定数量的活动分片副本。如果必要数量的活动分片副本不可用,则写入操作必须等待并重试,直到必要的分片副本已启动或发生超时。默认的,写入操作在处理之前,会等待主分片启动(例如:wait_for_active_shards=1)。这个默认值可以通过index.write.wait_for_active_shards来动态的修改此设置。要更改每个操作的此行为,可以使用wait_for_active_shards请求参数。

有效值可以是all也可以是在索引中配置副本节点的每个分片的总数的任意正整数(即number_of_replicas + 1),指定负数,或者大于该分片副本数的值将引发错误。

例如,假设我们有三个节点形成的集群,A,B和C,创建一个索引index副本数量设置为3(产生4个分片副本,比节点多一个副本)。如果我们尝试一个索引操作,默认的在处理之前,操作只会确保每个分片的主副本可用。这意味着,即使B和C挂掉了,A托管了主分片副本,索引操作仍将仅使用一个数据副本。 如果wait_for_active_shards被该请求设置为3(和所有3个节点是正常的),索引操作在处理之前将需要3个活动分片副本,应该满足这个需求,因为集群中有3个活动节点,每个节点都持有切分的副本。但是,如果我们将wait_for_active_shards设置为all(或者设置为4,两者是一样的),索引操作将不会进行,因为索引中我们并没有每个活动分片的4个副本。除非集群中出现一个新节点来承载分片的第四个副本,否则操作将超时。

需要注意的是,这个设置极大地降低了写操作未写入到所需分片副本数量的机会,但它并没有完全消除这种可能性,因为这个检查发生在写操作开始之前。在执行写操作之后,仍然有可能在任意数量的碎片副本上复制失败,但在主副本上仍然成功。写操作的响应的_shards部分显示复制成功/失败的shard副本的数量。

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    }
}
复制代码
刷新

控制何时可以搜索到此请求所做的更改。详情请查看刷新

Noop更新

当使用索引API更新文档时,即使文档没有更改,也总是会创建文档的新版本。如果您不能接受这种结果,请使用_update API,并将detect_noop设置为true。此选项在索引API上不可用,因为索引API不会获取旧源,也无法将其与新源进行比较。

关于何时不接受noop更新,没有一条硬性规定。 它是许多因素的组合,例如您的数据源发送实际noops更新的频率以及Elasticsearch在接收更新的分片上运行的每秒查询数。

超时

在执行索引操作时,分配给执行索引操作的主分片可能不可用。其中的一些原因可能是主分片目前正在从网关中恢复或正在进行重新定位。默认情况下,索引操作将等待主分片可用最多1分钟,然后失败并响应错误。可以使用timeout参数显式地指定它等待的时间。下面是一个设置为5分钟的例子:

curl -X PUT "localhost:9200/twitter/_doc/1?timeout=5m" -H 'Content-Type: application/json' -d'
{
    "user" : "kimchy",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "trying out Elasticsearch"
}
'
复制代码
版本控制

每个索引文档都有一个版本号,默认的,每一次更新或者删除内部版本使用从1自增的数字,当然也可以使用一个外部的值(例如,维护在数据库中的版本号),要启用这个功能,设置version_typeexternal,这个值必须是一个数字,大于或等于0,小于9.2e+18。

当使用外部的版本类型,系统将检查传递给索引请求的版本号是否大于当前存储文档的版本,如果是大于,则文档将被索引,并且生成一个新的版本号来使用。如果提供的值是小于或者等于存储的文档版本号,会发生版本冲突,并且索引操作将失败,例如:

curl -X PUT "localhost:9200/twitter/_doc/1?version=2&version_type=external" -H 'Content-Type: application/json' -d'
{
    "message" : "elasticsearch now has versioning support, double cool!"
}
'
复制代码

版本控制是完全实时的,并且不受搜索操作的近实时方面的影响。如果没有提供版本,接下来的执行操作不会进行版本检查。

上面的操作时成功的,因为提供了版本号为2,并大于当前的版本号1,如果文档已经被更新,版本号设置为2或者更高,索引命令将失败并导致冲突(409 http状态码)。

一个很好的副作用是,只要使用源数据库的版本号,就不需要维护由于更改源数据库而执行的异步索引操作的严格顺序。如果使用外部版本控制,即使使用来自数据库的数据更新Elasticsearch索引的简单情况也会简化,因为如果索引操作由于某种原因出现顺序错误,则只使用最新版本。

版本类型

除了上面解释的external版本类型之外,Elasticsearch还支持针对特定用例的其他类型。下面是不同版本类型及其语义的概述。

internal

​ 只有当给定的版本与存储的文档版本相同时,才索引文档。

external or external_gt

​ 只有当给定的版本严格高于存储文档的版本,或者没有现有文档时,才对文档进行索引。给定的版本将作为新版本使用,并与新文档一起存储。所提供的版本必须是非负的long number。

external_gte

​ 只有当给定的版本等于或高于存储文档的版本时,才对文档进行索引。如果没有现有文档,操作也将成功。给定的版本将作为新版本使用,并与新文档一起存储。所提供的版本必须是非负的long number。

external_gte版本类型用于特殊的用例,应该谨慎使用。如果使用不当,可能会导致数据丢失。还有另一个选项force,它是不推荐的,因为它可能导致主f分片和复制分片出现不一致。

GET API

get API允许根据索引的id从索引中获取JSON文档。下面的例子展示了从一个索引叫做twitter的并且id等于0的索引中获取JSON文档。

curl -X GET "localhost:9200/twitter/_doc/0"
复制代码

上面的get操作结果是:

{
    "_index" : "twitter",
    "_type" : "_doc",
    "_id" : "0",
    "_version" : 1,
    "_seq_no" : 10,
    "_primary_term" : 1,
    "found": true,
    "_source" : {
        "user" : "kimchy",
        "date" : "2009-11-15T14:12:12",
        "likes": 0,
        "message" : "trying out Elasticsearch"
    }
}
复制代码

上面的结果包括我们希望检索的文档的index、id和version,如果可以找到文档的实际_source,则包含它(如响应中的found字段所示)。

该API还可以使用head请求来检查文档是否存在,例如:

curl -I "localhost:9200/twitter/_doc/0"
复制代码
实时

默认的,get API是实时的,它不受index的刷新率影响(当数据在搜索中可见时),如果文档已经更新完成但还没有刷新,get API将发出一个刷新调用,使文档可见,这也将使上次刷新后其他文档发生变化。为了禁用实时GET,可以将realtime参数设置为false

Source过滤

默认情况下,get操作会返回_source字段的内容,除非你使用了stroed_fields参数或者_source字段是被禁用的。你可以使用_source参数来关闭_source检索。

curl -X GET "localhost:9200/twitter/_doc/0?_source=false"
复制代码

如果你只需要从完整的_source中的一个或者两个字段,您可以使用_source_include_source_exclude参数来包含或过滤掉您需要的部分。这对于部分检索可以节省网络开销的大型文档尤其有用。这两个参数都采用逗号分隔的字段列表或通配符表达式。例如:

curl -X GET "localhost:9200/twitter/_doc/0?_source_includes=*.id&_source_excludes=entities"
复制代码

如果你只指定包含,你可以使用一个更短的符号:

curl -X GET "localhost:9200/twitter/_doc/0?_source=*.id,retweeted"
复制代码
存储字段

get操作允许指定一组存储字段,这些字段将通过传递stored_fields参数返回。如果未存储所请求的字段,则将忽略它们。例如:

curl -X PUT "localhost:9200/twitter" -H 'Content-Type: application/json' -d'
{
   "mappings": {
       "properties": {
          "counter": {
             "type": "integer",
             "store": false
          },
          "tags": {
             "type": "keyword",
             "store": true
          }
       }
   }
}
'
复制代码

现在没问可以添加一个文档:

curl -X PUT "localhost:9200/twitter/_doc/1" -H 'Content-Type: application/json' -d'
{
    "counter" : 1,
    "tags" : ["red"]
}
'
复制代码

接下来尝试检索它:

curl -X GET "localhost:9200/twitter/_doc/1?stored_fields=tags,counter"
复制代码

上面的结果是:

{
   "_index": "twitter",
   "_type": "_doc",
   "_id": "1",
   "_version": 1,
   "_seq_no" : 22,
   "_primary_term" : 1,
   "found": true,
   "fields": {
      "tags": [
         "red"
      ]
   }
}
复制代码

从文档本身获取的字段值总是作为数组返回。由于未存储counter字段,因此在尝试获取stored_field时,get请求会忽略它。

也可以检索像_routing字段这样的元数据字段:

curl -X PUT "localhost:9200/twitter/_doc/2?routing=user1" -H 'Content-Type: application/json' -d'
{
    "counter" : 1,
    "tags" : ["white"]
}
'
复制代码
curl -X GET "localhost:9200/twitter/_doc/2?routing=user1&stored_fields=tags,counter"
复制代码

上面的操作结果是:

{
   "_index": "twitter",
   "_type": "_doc",
   "_id": "2",
   "_version": 1,
   "_seq_no" : 13,
   "_primary_term" : 1,
   "_routing": "user1",
   "found": true,
   "fields": {
      "tags": [
         "white"
      ]
   }
}
复制代码

此外,只能通过stored_field选项返回leaf字段。 因此无法返回对象字段,此类请求将失败。

直接获取_source

使用/ {index} / _ source / {id}只获取文档的_source字段,而不包含任何其他内容。 例如:

curl -X GET "localhost:9200/twitter/_source/1"
复制代码

你还可以使用source过滤参数来控制哪些_source部分将会被返回:

curl -X GET "localhost:9200/twitter/_source/1/?_source_includes=*.id&_source_excludes=entities"
复制代码

注意,_source还有一个HEAD变量,用于有效地测试文档_source是否存在。如果在映射中禁用现有文档的_source,则该文档将没有_source。

curl -I "localhost:9200/twitter/_source/1"
复制代码
路由

使用控制路由的能力进行索引时,为了获取文档,还应提供路由值。 例如:

curl -X GET "localhost:9200/twitter/_doc/2?routing=user1"
复制代码

上面的将获得id为2的tweet,根据用户路由。注意,在没有正确路由的情况下发出get将导致无法获取文档。

偏好

控制哪个分片副本执行get请求的偏好。默认情况下,操作是在分片副本间是随机的。

preference 能被设置成以下值:

_local

​ 如果可能,该操作更喜欢在本地分配的分片上执行。

自定义值

​ 将使用自定义值来保证相同的分片将用于相同的自定义值。 当在不同的刷新状态下命中不同的分片时,这可以帮助“jumping values”。 示例值可以是Web会话ID或用户名。

刷新

可以将refresh参数设置为true,以便在get操作之前刷新相关分片并使其可搜索。将其设置为true应该在仔细考虑并验证这不会给系统带来沉重的负载(并降低索引速度)之后执行。

分布式

get操作被散列到一个特定的shard id中,然后被重定向到该shard id中的一个副本,并返回结果。副本是主分片及其在该shard id组中的副本。

版本支持

只有当文档的当前版本等于指定的版本时,才可以使用version参数检索文档。这种行为对于所有版本类型都是相同的,除了总是检索文档的版本类型FORCE之外。注意,不推荐使用FORCE版本类型。

在内部,Elasticsearch将旧文档标记为已删除,并添加了一个全新的文档。旧版本的文档不会立即消失,尽管您无法访问它。当您继续索引更多数据时,Elasticsearch将清理后台已删除的文档。

Delete API

delete API允许从指定的索引中用索引id删除JSON文档。下面的例子展示了删除一个json文档从索引叫做twitter并且ID为1:

curl -X DELETE "localhost:9200/twitter/_doc/1"
复制代码

上面的操作结果是:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 2
    },
    "_index" : "twitter",
    "_type" : "_doc",
    "_id" : "1",
    "_version" : 2,
    "_primary_term": 1,
    "_seq_no": 5,
    "result": "deleted"
}
复制代码
乐观并发控制

删除操作可以是有条件的,只有在为文档的最后一次修改分配了if_seq_noif_primary_term参数指定的序列号和主要术语时才能执行。 如果检测到不匹配,则操作将导致VersionConflictException和状态代码409.有关详细信息,请参阅乐观并发控制

版本控制

索引的每个文档都是版本化的。 删除文档时,可以指定版本以确保我们尝试删除的相关文档实际上已被删除,并且在此期间没有更改。 对文档执行的每个写入操作(包括删除)都会导致其版本递增。 删除文档的版本号在删除后仍可短时间使用,以便控制并发操作。 已删除文档的版本保持可用的时间长度由index.gc_deletes索引设置确定,默认为60秒。

路由

当索引使用控制路由的能力时,为了删除文档,还应该提供路由值。例如:

curl -X DELETE "localhost:9200/twitter/_doc/1?routing=kimchy"
复制代码

以上将删除id为1的tweet ,根据用户进行路由。 请注意,在没有正确路由的情况下发出删除将导致文档不被删除。

_routing映射根据需要设置且未指定路由值时,delete API将抛出RoutingMissingException并拒绝该请求。

自动创建索引

如果使用外部版本控制变体(variant),则删除操作会自动创建索引(如果之前尚未创建)(请查看create index API以手动创建索引)。

分布式

删除操作将散列为特定的分片ID。 然后它被重定向到该id组中的主分片,并复制(如果需要)到该id组内的分片副本。

等待活动分片

在发出删除请求时,可以设置wait_for_active_shards参数,以要求在开始处理删除请求之前激活最少数量的分片副本。有关详细信息和使用示例,请参见此处

刷新

控制何时可以搜索到此请求所做的更改。详情参见?refresh

超时

执行删除操作时,分配用于执行删除操作的主分片可能不可用。 造成这种情况的一些原因可能是主分片当前正在从仓库恢复或正在进行重定位。 默认情况下,删除操作将在主分片上等待最多1分钟,然后失败并响应错误。 timeout参数可用于显式指定等待的时间。 以下是将其设置为5分钟的示例:

curl -X DELETE "localhost:9200/twitter/_doc/1?timeout=5m"
复制代码

通过Query API来删除

_delete_by_query的最简单用法是对查询匹配到的文档执行删除操作。这是API:

curl -X POST "localhost:9200/twitter/_delete_by_query" -H 'Content-Type: application/json' -d'
{
	#查询必须传递一个值给`query`,方法与SearchAPI相同,你也可以使用q参数。
  "query": { 
    "match": {
      "message": "some message"
    }
  }
}
'
复制代码

响应结果为:

{
  "took" : 147,
  "timed_out": false,
  "deleted": 119,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 119,
  "failures" : [ ]
}
复制代码

_delete_by_query启动并删除使用内部版本控制找到的内容时,它会得到一个索引的快照,这意味着,如果在捕获快照和处理删除请求之间的文档发生更改,您将会得到版本冲突。只有文档匹配的时候,才会删除文档。

因为``internal版本控制不支持0作为有效版本号,版本等于0的文档不能使用_delete_by_query来删除,并且请求会失败。

在执行_delete_by_query期间,多个搜索请求依次执行,以便找到所有要删除的匹配文档。每次找到一批文档时,都会执行相应的批量请求来删除这些文档。方式搜索或者删除被拒绝,_delete_by_query依赖一个默认策略来重试被拒绝的请求(最多10次)。到达最大的重试限制,将导致_delete_by_query终止,失败信息将在响应中返回。已经执行的操作仍然保持不变。换言之,这个过程是不可逆的,只能终止。在第一次失败导致终止,失败的批量请求返回的所有故障都在failure元素中返回,因此,有可能存在相当多的失败实体。

如果你想计算版本冲突,而不是终止原因,可以在url中设置conflicts=proceed或者在请求体中添加"conflicts": "proceed"

回到API格式,这将从twitter索引中删除tweet:

curl -X POST "localhost:9200/twitter/_delete_by_query?conflicts=proceed" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match_all": {}
  }
}
'
复制代码

也可以同时删除多个索引的文档,就像搜索API:

curl -X POST "localhost:9200/twitter,blog/_delete_by_query" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match_all": {}
  }
}
'
复制代码

如果提供路由,则将路由复制到滚动查询,将处理过程分配给匹配该路由值的分片:

curl -X POST "localhost:9200/twitter/_delete_by_query?routing=1" -H 'Content-Type: application/json' -d'
{
  "query": {
    "range" : {
        "age" : {
           "gte" : 10
        }
    }
  }
}
'
复制代码

默认情况下,_delete_by_query使用1000的滚动批次。您可以使用scroll_size URL参数更改批量大小:

curl -X POST "localhost:9200/twitter/_delete_by_query?scroll_size=5000" -H 'Content-Type: application/json' -d'
{
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}
'
复制代码
URL参数

除了像pretty这样的标准参数之外,查询API的删除还支持refreshwait_for_completionwait_for_active_shardstimeoutscroll

发送refresh将在请求完成后刷新查询删除中涉及的所有分片。 这与delete API的refresh参数不同,后者只会刷新收到删除请求的分片。 与delete API不同,它不支持wait_for

如果请求包含wait_for_completion = false,则Elasticsearch将执行一些预检查,启动请求,然后返回可与Tasks API一起使用的任务,以取消或获取任务的状态。 Elasticsearch还将在.tasks / task / $ {taskId}中创建此任务的记录作为文档。 这是你的保留或删除你认为合适的。 完成后将会删除它,以便Elasticsearch可以回收它使用的空间。

wait_for_active_shards控制在继续请求之前必须激活分片的副本数。 timeout指示每个写入请求等待不可用分片可用的时间。 两者都完全适用于Bulk API中的工作方式。 由于_delete_by_query使用滚动搜索,您还可以指定scroll参数来控制search context保持活动的时间,例如?scroll=10m。 默认情况下,它是5分钟。

requests_per_second可以设置为任何正十进制数(1.4,6,1000等),并通过在等待时间内填充每个批次来限制查询删除发出批量删除操作的速率。 可以通过将requests_per_second设置为-1来禁用限制。

节流是通过在批之间等待来完成的,这样就可以给_delete_by_query内部使用的滚动设置一个考虑填充的超时。填充时间是批大小除以requests_per_second和写入时间之间的差额。默认情况下批处理大小为1000,所以如果requests_per_second被设置为500:

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
复制代码

由于批处理是作为单个_bulk请求发出的,因此较大的批处理将导致Elasticsearch创建许多请求,然后等待一段时间再启动下一个请求集。这是突发的而不是平滑的。默认值是-1。

响应体
{
  "took" : 147,
  "timed_out": false,
  "total": 119,
  "deleted": 119,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "failures" : [ ]
}
复制代码

took

​ 整个操作从开始到结束的毫秒数。

time_out

​ 如果在delete by query执行期间执行的任何请求超时,则将此标志设置为true。

total

​ 成功执行的文档总数

deleted

​ 成功删除的文档总数

batches

​ 通过查询删除回滚的滚动响应数。

version_conflicts

​ 按查询删除所命中的版本冲突数。

noops

​ 对于delete by query,这个字段总是等于零。它的存在只是为了让delete by query、update by query和reindex api返回具有相同结构的响应。

retries

​ 按查询删除所尝试的重试次数。bulk是重试的批量操作的数量,search是重试的搜索操作的数量。

throttled_millis

​ 符合requests_per_second的请求毫秒数

requests_per_second

​ 在delete by query期间每秒有效执行的请求数。

throttled_until_millis

​ 在_delete_by_query响应中,此字段应始终等于零。 它只在使用Task API时有意义,它表示下一次(自epoch以来的毫秒数),为了符合requests_per_second,将再次执行受限制的请求。

failures

​ 如果流程中存在任何不可恢复的错误,则会出现一系列故障。如果这不是空的,那么请求就会因为这些失败而中止。使用batch实现Delete by query,任何失败都会导致整个进程中止,但是当前batch中的所有失败都会收集到数组中。您可以使用conflicts选项来防止reindex在版本冲突上中止。

使用Task API

你可以使用Task API来获取运行delete by query请求的状态:

curl -X GET "localhost:9200/_tasks?detailed=true&actions=*/delete/byquery"
复制代码

响应结果:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/delete/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 0,
            "created" : 0,
            "deleted" : 3500,
            "batches" : 36,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": 0,
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}
复制代码

该对象包含实际状态。 它就像响应JSON一样,增加了total 字段。 totalreindex期望执行的操作总数。 您可以通过添加updatedcreateddeleted的字段来估计进度。 当它们的总和等于total字段时,请求将结束。

通过Task id可以直接查看Task:

curl -X GET "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619"
复制代码

此API的优点是它与wait_for_completion = false集成,透明地返回已完成任务的状态。 如果任务完成并且在其上设置了wait_for_completion = false,那么它将返回resulterror 字段。 此功能的代价是wait_for_completion = false.tasks / task / $ {taskId}创建的文档。 您可以删除该文档。

使用Cancel Tash API

所有的delete by query能通过task cancel API被关闭:

curl -X POST "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel"
复制代码

Task ID可以通过task API来找到。

取消操作应该快速的发生,但是有时可能会几秒。上面的任务状态API将继续按查询任务列出delete,直到该任务检查它是否已被取消并自行终止。

Rethrottling

可以使用_rethrottle API通过查询在正在运行的删除时更改requests_per_second的值:

curl -X POST "localhost:9200/_delete_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1"
复制代码

通过Task id可以直接查看Task.

就像在查询API的删除中设置它一样,requests_per_second可以是-1来禁用限制,也可以是1.7或12之类的任何十进制数来限制到该级别。 加速查询的Rethrottling会立即生效,但重新启动会减慢查询速度,这将在完成当前批处理后生效。 这可以防止滚动超时。

切分请求

按查询删除支持分割滚动以并行化删除过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

手动切分

通过对每个请求提供一个切分id和切分的总数来手动的切分delete by query

curl -X POST "localhost:9200/twitter/_delete_by_query" -H 'Content-Type: application/json' -d'
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
}
'
curl -X POST "localhost:9200/twitter/_delete_by_query" -H 'Content-Type: application/json' -d'
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
}
'
复制代码

你可以验证任务:

curl -X GET "localhost:9200/_refresh"
curl -X POST "localhost:9200/twitter/_search?size=0&filter_path=hits.total" -H 'Content-Type: application/json' -d'
{
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
}
'
复制代码

结果是这样一个合理的total数:

{
  "hits": {
    "total" : {
        "value": 0,
        "relation": "eq"
    }
  }
}
复制代码

自动切分

您还可以让delete-by-query使用切片滚动自动并行化以在_id上切片。 使用切片指定要使用的切片数:

curl -X POST "localhost:9200/twitter/_delete_by_query?refresh&slices=5" -H 'Content-Type: application/json' -d'
{
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
}
'
复制代码

你可以验证任务:

curl -X POST "localhost:9200/twitter/_search?size=0&filter_path=hits.total" -H 'Content-Type: application/json' -d'
{
  "query": {
    "range": {
      "likes": {
        "lt": 10
      }
    }
  }
}
'
复制代码

结果是这样一个合理的total数:

{
  "hits": {
    "total" : {
        "value": 0,
        "relation": "eq"
    }
  }
}
复制代码

slices设置为auto将允许Elasticsearch选择要使用的切分数。 此设置将使用每个分片一个切片,达到一定限制。 如果有多个源索引,它将根据具有最小分片数的索引选择切片数。

slices 添加到_delete_by_query只会自动执行上一节中使用的手动过程,创建子请求,这意味着它有一点奇怪:

  • 你可以在Task API中查看这些请求。这些子请求切分请求的任务的“子”任务。
  • 为切分的请求获取任务状态只包含已完成片的状态。
  • 这些子请求可以单独寻址,用于取消和重新节流等操作。
  • slices重新节流请求将按比例重新节流未完成的子请求。
  • slices取消请求将取消每个子请求。
  • 由于slices的性质,每个子请求不会得到均匀的处理分布。所有文档都会被处理,但是子请求的处理可能比子请求处理更多。期望更大的片具有更均匀的分布。
  • slices的请求上的requests_per_secondsize等参数按比例分布到每个子请求。结合上面关于分布不均匀的观点,您应该得出结论:使用带slicessize可能不会导致确切大小的文档被删除。
  • 每个子请求都获得源索引的稍微不同的快照,尽管这些快照几乎是同时获取的。
选择切分的数量

如果自动切片,将slices设置为auto将为大多数索引选择一个合理的数字。如果您正在手动切片或以其他方式调优自动切片,请使用以下指南。

slices数等于索引中的分片数时,查询性能最有效。 如果该数字很大(例如,500),请选择较小的数字,因为太多的slices会损害性能。 设置高于分片数量的slices通常不会提高效率并增加开销。

删除性能随着slices的数量在可用资源之间线性扩展。

查询或删除性能是否在运行时占主导地位取决于重新索引的文档和集群资源。

Update API

update API允许基于提供的脚本更新文档。该操作从索引中获取文档(与切分并列),运行脚本(使用可选的脚本语言和参数),索引返回结果(还允许删除或忽略该操作)。它使用版本控制来确保在“get”和“reindex”期间没有发生更新。

注意,这个操作仍然意味着文档的完全重索引,它只是删除了一些网络往返,并减少了get和index之间版本冲突的机会。需要启用_source字段才能使此功能正常工作。

让我们索引一个简单的文档:

curl -X PUT "localhost:9200/test/_doc/1" -H 'Content-Type: application/json' -d'
{
    "counter" : 1,
    "tags" : ["red"]
}
'
复制代码
脚本更新

现在我们可以执行一个脚本来增加计数:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "script" : {
        "source": "ctx._source.counter += params.count",
        "lang": "painless",
        "params" : {
            "count" : 4
        }
    }
}
'
复制代码

我们可以添加一个标签到标签列表:(如果标签存在,它仍会被添加,因为这是一个列表)

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "script" : {
        "source": "ctx._source.tags.add(params.tag)",
        "lang": "painless",
        "params" : {
            "tag" : "blue"
        }
    }
}
'
复制代码

我们也可以移除一个标签从标签列表。注意,用于删除标记的Painless 函数将希望删除的元素的数组索引作为参数,因此需要更多的逻辑来定位它,避免运行时错误。注意,如果标签在列表中出现超过一次,则只会删除一次:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "script" : {
        "source": "if (ctx._source.tags.contains(params.tag)) { ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag)) }",
        "lang": "painless",
        "params" : {
            "tag" : "blue"
        }
    }
}
'
复制代码

除了_source之外,通过ctx map可以获得以下变量:_index_type_id_version_routing_now(当前时间戳)。

也可以添加一个新字段到文档:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "script" : "ctx._source.new_field = \u0027value_of_new_field\u0027"
}
'
复制代码

或者从文档中移除一个字段:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "script" : "ctx._source.remove(\u0027new_field\u0027)"
}
'
复制代码

甚至我们可以改变已执行的操作。这个例子如果tags字段包含green,则会删除文档,否则不会有任何操作。(noop)

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "script" : {
        "source": "if (ctx._source.tags.contains(params.tag)) { ctx.op = \u0027delete\u0027 } else { ctx.op = \u0027none\u0027 }",
        "lang": "painless",
        "params" : {
            "tag" : "green"
        }
    }
}
'
复制代码
使用部分文档更新

update API也支持传入部分文档,它将被合并到现有的文档中(简单的递归合并、对象的内部合并、替换核心“key/value”和数组)。要完全替换现有文档,应该使用IndexAPI。以下部分更新向现有文档添加了一个新字段:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "doc" : {
        "name" : "new_name"
    }
}
'
复制代码

如果docscript都被指定了,doc将会被忽略。最好是将部分文档的字段对放在脚本中。

检测等待更新

如果doc被指定,并且值将与现有的_source合并。默认情况下,不改变任何东西的更新检测它们不改变任何东西,并返回“result”:“noop”,如下所示:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "doc" : {
        "name" : "new_name"
    }
}
'
复制代码

如果在发送请求之前namenew_name,则忽略整个更新请求。 如果忽略请求,响应中的result元素将返回noop。

{
   "_shards": {
        "total": 0,
        "successful": 0,
        "failed": 0
   },
   "_index": "test",
   "_type": "_doc",
   "_id": "1",
   "_version": 7,
   "result": "noop"
}
复制代码

你可以通过"detect_noop": false来禁用这个行为:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "doc" : {
        "name" : "new_name"
    },
    "detect_noop": false
}
'
复制代码
Upserts

如果文档不存在,upsert元素的内容将作为新文档插入。如果文档确实存在,则执行脚本:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "script" : {
        "source": "ctx._source.counter += params.count",
        "lang": "painless",
        "params" : {
            "count" : 4
        }
    },
    "upsert" : {
        "counter" : 1
    }
}
'
复制代码

scripted_upsert

如果您希望脚本运行,无论文档是否存在 - 即脚本处理初始化文档而不是upsert元素 - 将scripted_upsert设置为true

curl -X POST "localhost:9200/sessions/_update/dh3sgudg8gsrgl" -H 'Content-Type: application/json' -d'
{
    "scripted_upsert":true,
    "script" : {
        "id": "my_web_session_summariser",
        "params" : {
            "pageViewEvent" : {
                "url":"foo.com/bar",
                "response":404,
                "time":"2014-01-01 12:32"
            }
        }
    },
    "upsert" : {}
}
'
复制代码

doc_as_upsert

doc_as_upsert设置为true将使用doc的内容作为upsert值,而不是发送部分doc加上upsert文档:

curl -X POST "localhost:9200/test/_update/1" -H 'Content-Type: application/json' -d'
{
    "doc" : {
        "name" : "new_name"
    },
    "doc_as_upsert" : true
}
'
复制代码
参数

update操作支持下面几个参数:

retry_on_conflict

​ 在更新的get和indexing 阶段之间,另一个进程可能已经更新了相同的文档。默认情况下,更新将失败,出现版本冲突异常。retry_on_conflict参数控制在最终抛出异常之前重试更新的次数。

routing

​ 路由用于将更新请求路由到正确的分片,并在更新的文档不存在时为upsert请求设置路由。 不能用于更新现有文档的路由。

timeout

​ 分片变成可用状态的超时等待时间。

wait_for_active_shards

​ 在继续更新操作之前需要处于活动状态的分片副本数。

refresh

​ 控制何时此请求所做的更改对搜索可见。

_source

​ 允许控制是否以及如何在响应中返回更新的源。 默认情况下,不会返回更新的源。

version

​ 更新API在内部使用Elasticsearch版本控制支持,以确保在更新期间文档不会更改。 您可以使用version参数指定仅在文档版本与指定版本匹配时才更新文档。

更新API不支持内部版本以外的版本控制

更新API不支持外部(版本类型external和external_gte)或强制(版本类型强制)版本控制,因为它会导致Elasticsearch版本号与外部系统不同步。 请改用index API。

if_seq_no if_primary_term

更新操作可以是有条件的,只有在为文档的最后一次修改分配了if_seq_noif_primary_term参数指定的值时才能执行。 如果检测到不匹配,则操作将导致VersionConflictException和状态代码409。

通过Query API来更新

_update_by_query的最简单用法只是对索引中的每个文档执行更新而不更改源。 这对于获取新属性或其他一些在线映射更改很有用。 下面是API:

curl -X POST "localhost:9200/twitter/_update_by_query?conflicts=proceed"
复制代码

返回的结果为:

{
  "took" : 147,
  "timed_out": false,
  "updated": 120,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}
复制代码

_update_by_query在索引启动时获取索引的快照,并使用内部版本控制索引它的内容。这意味着如果文档在拍摄快照的时间和处理索引请求之间发生更改,则会出现版本冲突。如果版本匹配,文档会被更新,并且版本号会增加。

因为internal 版本控制不支持0作为有效的版本号,文档的版本号等于0不能使用_update_by_query来更新,这样的请求会失败。

由于_update_by_query终止而导致的查询和更新失败,会在响应体里返回一个failures 字段。已经执行的更新仍然有效。换句话说,这个过程没有回滚,只能中止。当第一个故障导致中止时,失败的批量请求返回的所有故障都将在failure元素中返回,因此,可能存在相当多的失败实体。

如果只想简单的计算版本冲突,并且不是由于_update_by_query引起的中止,你可以设置在url中设置conflicts=proceed,或者在请求体中"conflicts": "proceed"。第一个示例之所以这样做,是因为它只是试图获取一个在线映射更改,而版本冲突仅仅意味着在_update_by_query的开始时间和它试图更新文档的时间之间更新了冲突文档。这是有用的,因为更新将获得在线映射更新。

再回到API,这将从twitter索引更新到tweets:

curl -X POST "localhost:9200/twitter/_update_by_query?conflicts=proceed"
复制代码

你还可以使用Query DSL来对_update_by_query进行限制。下面的例子将会为用户kimchy更新twitter中的所有文档:

curl -X POST "localhost:9200/twitter/_update_by_query?conflicts=proceed" -H 'Content-Type: application/json' -d'
{
#查询必须传入一个query的key,或者也可以传入一个q,就像SearchApi一样
  "query": { 
    "term": {
      "user": "kimchy"
    }
  }
}
'
复制代码

到目前为止,我们只更新了文档,而没有更改它们的source。这确实对添加新属性很有用。_update_by_query支持更新文档的脚本。这将增加kimchy所有tweet上的like字段:

curl -X POST "localhost:9200/twitter/_update_by_query" -H 'Content-Type: application/json' -d'
{
  "script": {
    "source": "ctx._source.likes++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}
'
复制代码

就像在Update API中一样,您可以设置ctx.op更改执行的操作:

noop 如果你的脚本不作任何更改,可以设置ctx.op="noop"。这将导致_update_by_query从其更新中省略该文档。noop操作将在响应体中的noop计数器中展示。
delete 如果你的脚本必须删除,可以设置ctx.op="delete"。删除操作将在响应体中的deleted计数器中展示。

将ctx.op设置为其他任何内容都是错误的。 在ctx中设置任何其他字段是错误的。

请注意,我们停止指定conflict = proceed。 在这种情况下,我们希望版本冲突中止该过程,以便我们可以处理失败。

此API不允许您改变它关联的文档,只需修改它们的source。 这是故意的! 我们没有规定从原始位置删除文档。

也可以在多个索引上同时完成这一切,就像搜索API:

curl -X POST "localhost:9200/twitter,blog/_update_by_query"
复制代码

如果提供routing,则将路由复制到滚动查询,将过程限制为匹配该路由值的分片:

curl -X POST "localhost:9200/twitter/_update_by_query?routing=1"
复制代码

默认情况下,_update_by_query使用滚动批次为1000。您可以使用scroll_size URL参数更改批大小:

curl -X POST "localhost:9200/twitter/_update_by_query?scroll_size=100"
复制代码

_update_by_query也可以通过指定管道来使用Ingest节点功能:

curl -X PUT "localhost:9200/_ingest/pipeline/set-foo" -H 'Content-Type: application/json' -d'
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
'
curl -X POST "localhost:9200/twitter/_update_by_query?pipeline=set-foo"
复制代码
URL参数

除了像pretty这样的标准参数之外,查询API的删除还支持refreshwait_for_completionwait_for_active_shardstimeoutscroll

发送refresh将在请求完成后刷新查询删除中涉及的所有分片。 这与delete API的refresh参数不同,后者只会刷新收到删除请求的分片。 与delete API不同,它不支持wait_for

如果请求包含wait_for_completion = false,则Elasticsearch将执行一些预检查,启动请求,然后返回可与Tasks API一起使用的任务,以取消或获取任务的状态。 Elasticsearch还将在.tasks / task / $ {taskId}中创建此任务的记录作为文档。 这是你的保留或删除你认为合适的。 完成后将会删除它,以便Elasticsearch可以回收它使用的空间。

wait_for_active_shards控制在继续请求之前必须激活分片的副本数。 timeout指示每个写入请求等待不可用分片可用的时间。 两者都完全适用于Bulk API中的工作方式。 由于_delete_by_query使用滚动搜索,您还可以指定scroll参数来控制search context保持活动的时间,例如?scroll=10m。 默认情况下,它是5分钟。

requests_per_second可以设置为任何正十进制数(1.4,6,1000等),并通过在等待时间内填充每个批次来限制查询删除发出批量删除操作的速率。 可以通过将requests_per_second设置为-1来禁用限制。

节流是通过在批之间等待来完成的,这样就可以给_delete_by_query内部使用的滚动设置一个考虑填充的超时。填充时间是批大小除以requests_per_second和写入时间之间的差额。默认情况下批处理大小为1000,所以如果requests_per_second被设置为500:

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
复制代码

由于批处理是作为单个_bulk请求发出的,因此较大的批处理将导致Elasticsearch创建许多请求,然后等待一段时间再启动下一个请求集。这是突发的而不是平滑的。默认值是-1。

响应体

响应结果就像下面的那样:

{
  "took" : 147,
  "timed_out": false,
  "total": 5,
  "updated": 5,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "failures" : [ ]
}
复制代码

took

​ 整个操作从开始到结束的毫秒数。

time_out

​ 如果在update by query执行期间执行的任何请求超时,则将此标志设置为true。

total

​ 成功执行的文档总数

updated

​ 成功更新的文档总数

deleted

​ 成功删除的文档总数

batches

​ 通过查询更新回滚的滚动响应数。

version_conflicts

​ 按查询更新所命中的版本冲突数。

noops

​ 由于用于update by query的脚本返回了ctx.op的noop值而被忽略的文档数量。

retries

​ 按查询更新所尝试的重试次数。bulk是重试的批量操作的数量,search是重试的搜索操作的数量。

throttled_millis

​ 符合requests_per_second的请求毫秒数

requests_per_second

​ 在update by query期间每秒有效执行的请求数。

throttled_until_millis

​ 在_update_by_query响应中,此字段应始终等于零。 它只在使用Task API时有意义,它表示下一次(自epoch以来的毫秒数),为了符合requests_per_second,将再次执行受限制的请求。

failures

​ 如果流程中存在任何不可恢复的错误,则会出现一系列故障。如果这不是空的,那么请求就会因为这些失败而中止。使用batch实现update by query,任何失败都会导致整个进程中止,但是当前batch中的所有失败都会收集到数组中。您可以使用conflicts选项来防止reindex在版本冲突上中止。

使用Task API

你可以使用Task API来获取运行delete by query请求的状态:

curl -X GET "localhost:9200/_tasks?detailed=true&actions=*/delete/byquery"
复制代码

响应结果:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/delete/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 0,
            "created" : 0,
            "deleted" : 3500,
            "batches" : 36,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": 0,
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}
复制代码

该对象包含实际状态。 它就像响应JSON一样,增加了total 字段。 totalreindex期望执行的操作总数。 您可以通过添加updatedcreateddeleted的字段来估计进度。 当它们的总和等于total字段时,请求将结束。

通过Task id可以直接查看Task:

curl -X GET "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619"
复制代码

此API的优点是它与wait_for_completion = false集成,透明地返回已完成任务的状态。 如果任务完成并且在其上设置了wait_for_completion = false,那么它将返回resulterror 字段。 此功能的代价是wait_for_completion = false.tasks / task / $ {taskId}创建的文档。 您可以删除该文档。

使用Cancel Tash API

所有的update by query能通过task cancel API被关闭:

curl -X POST "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel"
复制代码

Task ID可以通过task API来找到。

取消操作应该快速的发生,但是有时可能会几秒。上面的任务状态API将继续按查询任务列出delete,直到该任务检查它是否已被取消并自行终止。

Rethrottling

可以使用_rethrottle API通过查询在正在运行的删除时更改requests_per_second的值:

curl -X POST "localhost:9200/_update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1"
复制代码

通过Task id可以直接查看Task.

就像在查询API的删除中设置它一样,requests_per_second可以是-1来禁用限制,也可以是1.7或12之类的任何十进制数来限制到该级别。 加速查询的Rethrottling会立即生效,但重新启动会减慢查询速度,这将在完成当前批处理后生效。 这可以防止滚动超时。

切分请求

按查询更新支持分割滚动以并行化更新过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

手动切分

通过对每个请求提供一个切分id和切分的总数来手动的切分update by query

curl -X POST "localhost:9200/twitter/_update_by_query" -H 'Content-Type: application/json' -d'
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "source": "ctx._source[\u0027extra\u0027] = \u0027test\u0027"
  }
}
'
curl -X POST "localhost:9200/twitter/_update_by_query" -H 'Content-Type: application/json' -d'
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "source": "ctx._source[\u0027extra\u0027] = \u0027test\u0027"
  }
}
'

复制代码

你可以验证任务:

curl -X GET "localhost:9200/_refresh"
curl -X POST "localhost:9200/twitter/_search?size=0&q=extra:test&filter_path=hits.total"
复制代码

结果是这样一个合理的total数:

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}
复制代码

自动切分

您还可以让update-by-query使用切片滚动自动并行化以在_id上切片。 使用切片指定要使用的切片数:

curl -X POST "localhost:9200/twitter/_update_by_query?refresh&slices=5" -H 'Content-Type: application/json' -d'
{
  "script": {
    "source": "ctx._source[\u0027extra\u0027] = \u0027test\u0027"
  }
}
'

复制代码

你可以验证任务:

curl -X POST "localhost:9200/twitter/_search?size=0&q=extra:test&filter_path=hits.total"
复制代码

结果是这样一个合理的total数:

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}
复制代码

slices设置为auto将允许Elasticsearch选择要使用的切分数。 此设置将使用每个分片一个切片,达到一定限制。 如果有多个源索引,它将根据具有最小分片数的索引选择切片数。

slices 添加到_delete_by_query只会自动执行上一节中使用的手动过程,创建子请求,这意味着它有一点奇怪:

  • 你可以在Task API中查看这些请求。这些子请求切分请求的任务的“子”任务。
  • 为切分的请求获取任务状态只包含已完成片的状态。
  • 这些子请求可以单独寻址,用于取消和重新节流等操作。
  • slices重新节流请求将按比例重新节流未完成的子请求。
  • slices取消请求将取消每个子请求。
  • 由于slices的性质,每个子请求不会得到均匀的处理分布。所有文档都会被处理,但是子请求的处理可能比子请求处理更多。期望更大的片具有更均匀的分布。
  • slices的请求上的requests_per_secondsize等参数按比例分布到每个子请求。结合上面关于分布不均匀的观点,您应该得出结论:使用带slicessize可能不会导致确切大小的文档被删除。
  • 每个子请求都获得源索引的稍微不同的快照,尽管这些快照几乎是同时获取的。
选择切分的数量

如果自动切片,将slices设置为auto将为大多数索引选择一个合理的数字。如果您正在手动切片或以其他方式调优自动切片,请使用以下指南。

slices数等于索引中的分片数时,查询性能最有效。 如果该数字很大(例如,500),请选择较小的数字,因为太多的slices会损害性能。 设置高于分片数量的slices通常不会提高效率并增加开销。

更新性能随着slices的数量在可用资源之间线性扩展。

查询或更新性能是否在运行时占主导地位取决于重新索引的文档和集群资源。

获取新属性

假设您创建了一个没有动态映射的索引,用数据填充它,然后添加一个映射值来从数据中获取更多字段:

curl -X PUT "localhost:9200/test" -H 'Content-Type: application/json' -d'
{
  "mappings": {
  #意味着新字段不会被索引,只会存储在_source中
    "dynamic": false,   
    "properties": {
      "text": {"type": "text"}
    }
  }
}
'
curl -X POST "localhost:9200/test/_doc?refresh" -H 'Content-Type: application/json' -d'
{
  "text": "words words",
  "flag": "bar"
}
'
curl -X POST "localhost:9200/test/_doc?refresh" -H 'Content-Type: application/json' -d'
{
  "text": "words words",
  "flag": "foo"
}
'
# 这个更新映射到新添加的flag字段,要获取新字段,您必须使用它重新索引所有文档。
curl -X PUT "localhost:9200/test/_mapping" -H 'Content-Type: application/json' -d'
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}
'
复制代码

使用flag字段搜索不会找到内容:

curl -X POST "localhost:9200/test/_search?filter_path=hits.total" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
'
复制代码

响应结果:

{
  "hits" : {
    "total": {
        "value": 0,
        "relation": "eq"
    }
  }
}
复制代码

但是你可以发出一个_update_by_query请求来获取新映射:

curl -X POST "localhost:9200/test/_update_by_query?refresh&conflicts=proceed"
curl -X POST "localhost:9200/test/_search?filter_path=hits.total" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
'
复制代码
{
  "hits" : {
    "total": {
        "value": 1,
        "relation": "eq"
    }
  }
}
复制代码

在向多字段添加字段时,您可以执行完全相同的操作。

Multi GET API

Multi get API基于索引,类型,(可选)和id(或者路由)返回多个文档。响应包括一个docs数组,其中包含所有获取的文档,按照multi-get请求对应的顺序排列(如果某个特定get出现失败,则在响应中包含一个包含此错误的对象)。成功get的结构在结构上类似于get API提供的文档。

下面是例子:

curl -X GET "localhost:9200/_mget" -H 'Content-Type: application/json' -d'
{
    "docs" : [
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "1"
        },
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "2"
        }
    ]
}
'
复制代码

mget端点也可以用于索引(在这种情况下,在请求体中不需要它):

curl -X GET "localhost:9200/test/_mget" -H 'Content-Type: application/json' -d'
{
    "docs" : [
        {
            "_type" : "_doc",
            "_id" : "1"
        },
        {
            "_type" : "_doc",
            "_id" : "2"
        }
    ]
}
'
复制代码

用于类型:

curl -X GET "localhost:9200/test/_doc/_mget" -H 'Content-Type: application/json' -d'
{
    "docs" : [
        {
            "_id" : "1"
        },
        {
            "_id" : "2"
        }
    ]
}
'
复制代码

在这个例子中,ids元素可以直接用来简化请求:

curl -X GET "localhost:9200/test/_doc/_mget" -H 'Content-Type: application/json' -d'
{
    "ids" : ["1", "2"]
}
'
复制代码
过滤Source

默认的,_source会被每个文档返回(如果存储),类似get API,可以使用_source参数只检索_source的一部分(或者根本不检索)。你还可以使用url参数,_source,_source_includes,_source_excludes,来指定默认值。当没有针对每个文档的指令时,会使用默认值。

例如:

curl -X GET "localhost:9200/_mget" -H 'Content-Type: application/json' -d'
{
    "docs" : [
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "1",
            "_source" : false
        },
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "2",
            "_source" : ["field3", "field4"]
        },
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "3",
            "_source" : {
                "include": ["user"],
                "exclude": ["user.location"]
            }
        }
    ]
}
'
复制代码
字段

可以为每个要获取的文档指定要检索的特定存储字段,类似于get API的stored_fields参数。例如:

curl -X GET "localhost:9200/_mget" -H 'Content-Type: application/json' -d'
{
    "docs" : [
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "1",
            "stored_fields" : ["field1", "field2"]
        },
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "2",
            "stored_fields" : ["field3", "field4"]
        }
    ]
}
'
复制代码

或者,您可以在查询字符串中指定stored_fields参数作为应用于所有文档的默认值。

curl -X GET "localhost:9200/test/_doc/_mget?stored_fields=field1,field2" -H 'Content-Type: application/json' -d'
{
    "docs" : [
        {
        # 返回field1和field2
            "_id" : "1" 
        },
        {
        # 返回field3和field4
            "_id" : "2",
            "stored_fields" : ["field3", "field4"] 
        }
    ]
}
'
复制代码
路由

你还可以指定一个路由值来作为参数:

curl -X GET "localhost:9200/_mget?routing=key1" -H 'Content-Type: application/json' -d'
{
    "docs" : [
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "1",
            "routing" : "key2"
        },
        {
            "_index" : "test",
            "_type" : "_doc",
            "_id" : "2"
        }
    ]
}
'
复制代码

在此示例中,将从对应于路由密钥key1的分片中提取文档test / _doc / 2,但是将从对应于路由密钥key2的分片中提取文档test / _doc / 1。

在本例中,文档test / _doc / 2将从与路由键key1对应的切分中获取,但是文档`test / _doc / 1将从与路由键key2对应的切分中获取。

部分响应

为确保快速响应,如果一个或多个分片失败,多重获取API将响应部分结果。 有关更多信息,请参阅Shard故障

Bulk API

批量API可以在一个API调用中执行许多索引/删除操作,这可以大大提高索引速度。

/_bulk是其路径,并且使用换行符来分割JSON结构。

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n
复制代码

注意:

最后一行数据必须以换行符\ n结尾。 每个换行符前面都有一个回车符\ r \ n。 向此路径发送请求时,Content-Type标头应设置为application / x-ndjson。

总共有4个动作,分别是index,create,delete,updateindexcreate期望下一行的source,并且与标准index API的op_type参数具有相同的语义(即,如果已存在具有相同索引的文档,则create将失败,而index将根据需要添加或替换文档)。 delete不期望下一行中的source,并且具有与标准删除API相同的语义。 update期望在下一行指定部分doc,upsert和script及其选项。

如果你提供了text文件输入到curl,你必须使用--data-binary标识而不是普通的-d,后面不允许保留新行,例如:

$ cat requests
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
复制代码

因为这个格式使用的是字面意义的\n‘s作为分隔符,请确保JSON操作和源代码没有被很好地打印出来。下面是一个正确的批量命令序列的例子:

curl -X POST "localhost:9200/_bulk" -H 'Content-Type: application/json' -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
'
复制代码

这个批量操作的结果是:

{
   "took": 30,
   "errors": false,
   "items": [
      {
         "index": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 0,
            "_primary_term": 1
         }
      },
      {
         "delete": {
            "_index": "test",
            "_type": "_doc",
            "_id": "2",
            "_version": 1,
            "result": "not_found",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 404,
            "_seq_no" : 1,
            "_primary_term" : 2
         }
      },
      {
         "create": {
            "_index": "test",
            "_type": "_doc",
            "_id": "3",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 2,
            "_primary_term" : 3
         }
      },
      {
         "update": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 2,
            "result": "updated",
            "_shards": {
                "total": 2,
                "successful": 1,
                "failed": 0
            },
            "status": 200,
            "_seq_no" : 3,
            "_primary_term" : 4
         }
      }
   ]
}
复制代码

路径是/_bulk/{index}/_bulk。提供索引时,默认情况下,索引将用于未显式提供索引的Bulk。

格式说明,这里的想法是让处理过程尽可能快。由于一些操作将被重定向到其他节点上的其他切分,因此只有action_meta_data在接收节点端被解析。

使用此协议的客户端库应该尝试在客户端尝试类似的操作,并尽可能减少缓冲。

批量操作的结果是一个大的JSON结果数据,每个操作结果的顺序和请求中的顺序是一致的。 单个操作的失败不会影响剩余的操作。

在单个批量调用中没有要执行的操作的“correct”数量。您应该尝试不同的设置,以找到适合您特定工作负载的最佳大小。

如果使用HTTP API,请确保客户机不发送HTTP chunks,因为这会降低速度。

乐观并发控制

批量API调用中的每个indexdelete操作可以在其各自的操作和元数据行中添加if_seq_noif_primary_term参数。 if_seq_noif_primary_term参数根据对现有文档的最后修改来控制操作的执行方式。 有关更多详细信息,请参阅乐观并发控制

版本控制

每个bulk项都可以使用version字段引入版本值。 它会根据_version映射自动跟踪index/delete操作的行为。 它还支持version_type

路由

每个bulk项都可以使用routing字段来引入路由值,它自动遵循基于_routing映射的index/delete操作的行为。

等待活动分区

在进行批量调用时,可以将wait_for_active_shards参数设置为在开始处理批量请求之前需要激活最少数量的碎片副本。

刷新

控制何时此请求所做的更改对搜索可见。

只有收到bulk请求的分片才会受到刷新的影响。 想象一下_bulk?refresh = wait_for请求,其中包含三个文档,这些文档恰好被路由到具有五个分片的索引中的不同分片。 请求只会等待这三个分片刷新。 构成索引的其他两个分片根本不参与_bulk请求。

更新

使用update操作时,retry_on_conflict可用作操作本身的字段(不在额外的有效payload line中),以指定在版本冲突的情况下应重试更新的次数。

update操作有效内容支持以下选项:doc(部分文档),upsert,doc_as_upsert,script,params(用于脚本),lang(用于脚本)和_source。 更新操作的示例:

curl -X POST "localhost:9200/_bulk" -H 'Content-Type: application/json' -d'
{ "update" : {"_id" : "1", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}
'
复制代码
部分响应

为确保快速响应,如果一个或多个分片失败,多重获取API将响应部分结果。 有关更多信息,请参阅Shard故障

Reindex API

Reindex要求为source index中的所有文档启用_source

Reindex不会尝试设置目标索引,并且不会拷贝source index 中的设置。您应该在运行_reindex操作之前设置目标索引,包括设置映射,分片数量,副本等

Reindex的最基本形式只是将文档从一个索引复制到另一个索引。下面的例子将拷贝文档twitter索引到new_twitter索引中:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
复制代码

返回结果如下:

{
  "took" : 147,
  "timed_out": false,
  "created": 120,
  "updated": 0,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}
复制代码

就像_update_by_queryreindex获取一个source index 的快照,但是目标必须是一个不同的索引,所以不会出现版本冲突。可以像索引API一样配置dest元素来控制乐观并发控制。仅仅需要省略version_type(如上所属)或者设置它为internal 。都会导致Elasticsearch将文档盲目的转储到目标中,覆盖任何碰巧具有相同类型和id的文档:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "internal"
  }
}
'
复制代码

设置version_typeexternal 将导致Elasticsearch保存源文件的版本,创建缺失的文档,并更新目标索引中比源索引中版本更旧的文档:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  }
}
'
复制代码

设置op_typecreate将导致_reindex在目标索引中只创建丢失的文档,所有已存在的文档将会导致版本冲突。

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}
'
复制代码

默认的,版本冲突会中止_reindex。当版本冲突时,conflicts请求体参数可以指导_reindex来处理下一个文档。需要重要注意的时,通过conflicts参数处理其他错误类型是不受影响的,当在请求体中设置 "conflicts": "proceed"_reindex将继续处理版本冲突,并返回所遇到的版本冲突计数:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "conflicts": "proceed",
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}
'
复制代码

您可以通过向source添加查询来限制文档。这将只复制由kimchy发出的tweetnew_twitter:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter",
    "query": {
      "term": {
        "user": "kimchy"
      }
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
复制代码

indexsource是一个列表,在一个请求中,允许你从大量sources中拷贝。下面的例子将从twitterblog文档中拷贝文档:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": ["twitter", "blog"]
  },
  "dest": {
    "index": "all_together"
  }
}
'
复制代码

Reindex API不会处理ID冲突,因此最后编写的文档将“win”,但顺序通常不可预测,因此依赖此行为并不是一个好主意。 而是使用脚本确保ID是唯一的。

通过设置size来限制处理文档的数量这,将会从twitter拷贝一个单独的文档到new _twitter:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "size": 1,
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
复制代码

如果你想要twitter索引中的特定文档集,你需要使用sort。排序使滚动效率降低,但在某些情况下,它是值得的。 如果可能,请选择更具选择性的查询来进行大小和排序。 这会将10000个文件从twitter复制到new_twitter:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "size": 10000,
  "source": {
    "index": "twitter",
    "sort": { "date": "desc" }
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
复制代码

source部分支持搜索请求中支持的所有元素。 例如,可以使用source过滤重新索引原始文档中的一部分字段,如下所示:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter",
    "_source": ["user", "_doc"]
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
复制代码

_update_by_query, _reindex支持脚本修改文档,和_update_by_query不同,它允许修改文档的metadata,这个例子颠覆了源文档的版本:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  },
  "script": {
    "source": "if (ctx._source.foo == \u0027bar\u0027) {ctx._version++; ctx._source.remove(\u0027foo\u0027)}",
    "lang": "painless"
  }
}
'
复制代码

就像在_update_by_query中一样,您可以设置ctx.op来更改在目标索引上执行的操作:

noop

​ 如果脚本决定不需要在目标索引中索引文档,则设置ctx.op = "noop"。此no操作将在响应体中的noop计数器中报告。

delete

​ 如果脚本决定必须从目标索引中删除文档,则设置ctx.op = "delete"。删除操作将在响应主体的已删除计数器中报告。

设置ctx.op为其他,或者在CTX中设置任何其他字段,将会返回错误。

你可以改变:

  • _id
  • _index
  • _version
  • _routing

_version设置为null或将它从ctx映射中清除,就像没有在索引请求中发送版本一样;它会导致在目标索引中覆盖文档,而不管目标上的版本或在_reindex请求中使用的版本类型。

默认情况下,如果_reindex看到带有路由的文档,则除非脚本更改了路由,否则将保留路由。 您可以在dest请求上设置路由以更改此设置:

keep

​ 将针对每个匹配发送的批量请求的路由设置为匹配上的路由。 这是默认值。

discard

​ 将为每个匹配发送的批量请求上的路由设置为null。

=<some text>

​ 将为每个匹配发送的批量请求上的路由设置为=之后的所有文本。

例如,您可以使用以下请求将所有文档从具有公司名称cat的源索引复制到路由设置为cat的dest索引中。

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "source",
    "query": {
      "match": {
        "company": "cat"
      }
    }
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}
'
复制代码

默认情况下,reindex使用的滚动批数为1000。你可以在source元素中设置size字段来改变批数大小。

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "source",
    "size": 100
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}
'
复制代码

Reindex还可以使用通过指定pipeline来使用Ingest node特点,像下面这样:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
    "pipeline": "some_ingest_pipeline"
  }
}
'
复制代码
从远程Reindex

Reindex支持从远程Elasticsearch集群reindexing

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}
'
复制代码

host参数必须包含协议,主机,端口(例如:http:/otherHost:9200)和可选路径(例如:http:/otherHost:9200)。usernamepassword是可选的,如果填写了username和password,那么_reindex将会使用他们作为连接到远程Elasticsearch集群的基础验证。使用基本身份验证时务必使用https,否则密码将以纯文本格式发送。有一系列设置可用来配置https连接的行为。

远程主机必须在elasticsearch.yml中使用reindex.remote.whitelist属性指定白名单。可以将其设置为逗号分隔的允许远程主机和端口组合列表(例如: otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*),协议可以忽略,仅主机和端口是必须的例如:

reindex.remote.whitelist: "otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*"
复制代码

必须在节点上配置白名单来配合reindex。

要启用发送到旧版本Elasticsearch的查询,无需验证或修改即可将查询参数直接发送到远程主机。

从远程集群节点Reindexing不支持手动或者自动切分。

从远程服务器Reindex使用堆上缓冲区,默认最大大小为100mb。如果远程索引包含非常大的文档,则你需要更小的批数size,下面的例子设置批数大小为10这是非常非常小的。

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200"
    },
    "index": "source",
    "size": 10,
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}
'
复制代码

还可以使用socket_timeout字段设置远程连接上的套接字读取超时,使用connect_timeout字段设置连接超时。两者默认都是30秒。下面这个例子,设置套接字读取超时为1分钟,连接超时设置为10秒。

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "socket_timeout": "1m",
      "connect_timeout": "10s"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}
'
复制代码
配置SSL参数

从远程Reindex支持SSL设置,这些设置必须在elasticsearch.yml中配置,除了安全设置,安全设置添加在Elasticsearch KeyStore中。配置SSL不会在_reindex请求体中:

支持下面的设置:

reindex.ssl.certificate_authorities

​ 应受信任的PEM编码证书文件的路径列表。你不能同时指定reindex.ssl.certificate_authoritiesreindex.ssl.truststore.path.

reindex.ssl.truststore.path

​ Java keyStore文件的路径,包含受信任的证书。这个信任库可以是JKS或者PKCS#12格式。你不能同时指定reindex.ssl.certificate_authoritiesreindex.ssl.truststore.path.

reindex.ssl.truststore.password

​ 信任库的密码(reindex.ssl.truststore.path),这个设置不能用于reindex.ssl.truststore.secure_password

reindex.ssl.truststore.secure_password (Source)

​ 信任库的密码(reindex.ssl.truststore.path),这个设置不能用于reindex.ssl.truststore.secure_password

reindex.ssl.truststore.type

​ 信任库的类型 (reindex.ssl.truststore.path),必须是jks 或者PKCS12其中的一个。如果信任库路径以".p12", ".pfx" or "pkcs12"结尾,这个设置默认是PKCS12,否则默认为JKS

reindex.ssl.verification_mode

​ 指示用于防止中间人攻击和伪造证书的验证类型。其中一个是full验证主机名和证书路径,certificate验证正路路径,但是不验证主机名,null不进行验证,在生产环境中建议使用null,默认为full

reindex.ssl.certificate

​ 指定用于HTTP客户端身份验证(如果远程集群需要)的PEM编码证书(或证书链)的路径,此设置需要reindex.ssl.key也被设置。您不能同时指定reindex.ssl.certificatereindex.ssl.keystore.path

reindex.ssl.key

​ 指定证书用于客户端验证PEM解码私钥关联的路径 (reindex.ssl.certificate)。你不能同时指定reindex.ssl.keyreindex.ssl.keystore.path

reindex.ssl.key_passphrase

​ 如果PEM解码私钥是加密的,指定解密PEM解码私钥的密码。不能用于reindex.ssl.secure_key_passphrase

reindex.ssl.secure_key_passphrase(安全)

​ 如果PEM解码私钥是加密的,指定解密PEM解码私钥的密码。不能用于reindex.ssl.key_passphrase

reindex.ssl.keystore.path

​ 指定用于HTTP客户端验证的keyStore包含私钥和证书的路径(如果远程集群需要)。keyStore可以使用JKSPKCS#12格式。你不能同时指定reindex.ssl.keyreindex.ssl.keystore.path

reindex.ssl.keystore.type

​ keyStore的类型 (reindex.ssl.keystore.path)。必须是jks 或者PKCS12其中的一个。如果信任库路径以".p12", ".pfx" or "pkcs12"结尾,这个设置默认是PKCS12,否则默认为JKS

reindex.ssl.keystore.password

​ keystore的密码(reindex.ssl.keystore.path),这个设置不能用于reindex.ssl.keystore.secure_password

reindex.ssl.keystore.secure_password (安全)

​ keystore的密码(reindex.ssl.keystore.path),这个设置不能用于reindex.ssl.keystore.password

reindex.ssl.keystore.key_password

​ 在keystore中用于key的密码(reindex.ssl.keystore.path)默认为keystore密码。这个设置不能用于reindex.ssl.keystore.secure_key_password

reindex.ssl.keystore.secure_key_password(安全)

​ 在keystore中用于key的密码(reindex.ssl.keystore.path)默认为keystore密码。这个设置不能用于reindex.ssl.keystore.key_password

URL参数

除了像pretty这样的标准参数,ReindexAPI还支持refresh,wait_for_completion, wait_for_active_shards, timeout, scroll, 和 requests_per_second

发送refreshurl参数将导致请求写入所有已刷新的索引。这个不同于IndexAPI的refresh参数,后者只会刷新接收新数据的碎片。也不想IndexAPI,它不支持wait_for

在请求中包含wait_for_completion=false,Elasticsearch将会执行一些预检查,开始请求并且能用TasksAPI来关闭或者得到一个Task的状态并将其返回。。Elasticsearch还将创建此任务的记录,作为.tasks/task/${taskId}的文档。保留和删除由你抉择。当您完成它或者删除它时,Elasticsearch可以回收它使用的空间。

wait_for_active_shards 在进行重新索引之前,一个碎片必须有多少个副本处于活动状态。点击此处查看更多。timeout控制每个写请求等待不可用碎片编程可用的时间。这两种方法在BulkAPI中的工作方式完全相同。由于重新索引使用滚动搜索,您还可以指定滚动参数来控制“search context”保持活动的时间(例如?scroll=10m)。默认值为5m。

requests_per_second可以被设置为十进制的正数(1.4, 6, 1000, 等.) ,限制_reindex发出批量索引操作的速率。服务节流可以将 requests_per_second 设置为 -1.

节流是通过在批之间等待来完成的,这样就可以给_reindex内部使用的滚动设置一个考虑填充的超时。填充时间是batchs size除以requests_per_second和写入时间之间的差额。默认情况下batch size为1000,所以如果requests_per_second被设置为500:

target_time = 1000 / 500 per second = 2 seconds
`padding time` = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
复制代码

因为batch是作为一个单独的_bulk请求发出的,大的batch size将导致Elasticsearch创建许多请求并且等一段时间才会开始下一组。这是间歇性的的而不是平滑的,这个默认值为-1

响应体

JSON响应看起来像下面这样:

{
  "took": 639,
  "timed_out": false,
  "total": 5,
  "updated": 0,
  "created": 5,
  "deleted": 0,
  "batches": 1,
  "noops": 0,
  "version_conflicts": 2,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": 1,
  "throttled_until_millis": 0,
  "failures": [ ]
}
复制代码

took

​ 整个操作花费的总时长。

time_out

​ 如果在重新索引期间执行的任何请求超时,则将此标志设置为true。

total

​ 成功处理的文档总数。

updated

​ 成功更新的文档总数。

created

​ 成功创建的文档总数。

deleted

​ 成功删除的文档总数。

batches

​ 通过reindex滚动请求拉回的数量

noops

​ 被忽略的文档数。因为用于重新索引的脚本为ctx.op返回了一个noop值。

version_conflicts

​ reindex命中的版本冲突数。

retries

​ reindex重试也没有成功的数量。bulk是bulk动作重试的数量,search是search动作重试的数量。

throttled_millis

​ 请求休眠以符合requests_per_second的毫秒数。

requests_per_second

​ reindex期间请求每秒有效执行的次数。

throttled_until_millis

​ 在_reindex这个字段应该一直等于0,它仅仅意味着,当使用TaskAPI,它指示下一次(从epoch开始以毫秒为单位)将再次执行节流请求,以符合requests_per_second

failures

​ 如果流程中存在任何不可恢复的错误,则会出现一系列故障。如果这不是空的,那么请求就会因为这些失败而中止。Reindex是使用batch实现的,任何失败都会导致整个进程中止,但是当前batch中的所有失败都会收集到数组中。您可以使用conflicts选项来防止reindex在版本冲突上中止。

使用TaskAPI

你可以使用TaskAPI来获取所有正在运行中的reindex请求的状态。

响应结果如下:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
            #这个对象包含了真正的状态,它与响应JSON相同,只是添加了重要的total字段。total是_reindex希望执行的操作总数。您可以通过添加updated、created的和deleted的字段来估计进度。当它们的和等于整个字段时,请求将结束。
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0,
            "requests_per_second": -1,
            "throttled_until_millis": 0
          },
          "description" : "",
          "start_time_in_millis": 1535149899665,
          "running_time_in_nanos": 5926916792,
          "cancellable": true,
          "headers": {}
        }
      }
    }
  }
}
复制代码

可以通过Task id来直接查看Task,下面的例子展示了取回taskid为r1A2WoRbTwKZ516z6NEs5A:36619:的信息:

curl -X GET "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619"
复制代码

这个API的优势是集成了wait_for_completion=false并且返回了完成任务的状态,如果task已经完成,并且 wait_for_completion=false,它将返回一个results或者error字段。这个特性的代价是wait_for_completion=false.tasks/task/${taskId}处创建的文档。你可以自己决定是否删除。

使用Cancel Task API

任何Reindex都能被Task Calcel API取消。例如:

curl -X POST "localhost:9200/_tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel"
复制代码

TaskId可以使用TaskAPI来查到。

取消应该是在瞬间完成的,但是有可能会需要短暂的几秒。Task API将跳过列出任务,直到它醒来自动取消。

Rethrottling(重新设置节流时长)

在运行reindex期间可以使用_rethrottleAPI来改变requests_per_second 的值。

curl -X POST "localhost:9200/_reindex/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1"
复制代码

TaskId可以使用TaskAPI来查到。

就像在Reindex API上一样设置,requests_per_second 值能被设置成-1来禁用节流或者设置成任何正数来节流。Rethrottling 加快查询立即生效的速度,但是它减慢查询在完成当前批处理后生效的速度。这将防止滚动超时。

Reindex来改变字段名

_reindex可用于构建具有重命名字段的索引的副本。假设您创建了一个包含如下文档的索引:

curl -X POST "localhost:9200/test/_doc/1?refresh" -H 'Content-Type: application/json' -d'
{
  "text": "words words",
  "flag": "foo"
}
'
复制代码

但是你不喜欢flag这个名字,想把flag替换为tag。_reindex能帮你创建另一个索引:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test2"
  },
  "script": {
    "source": "ctx._source.tag = ctx._source.remove(\"flag\")"
  }
}
'
复制代码

现在重新获取文档:

curl -X GET "localhost:9200/test2/_doc/1"
复制代码

返回结果:

{
  "found": true,
  "_id": "1",
  "_index": "test2",
  "_type": "_doc",
  "_version": 1,
  "_seq_no": 44,
  "_primary_term": 1,
  "_source": {
    "text": "words words",
    "tag": "foo"
  }
}
复制代码
切分

Reindex支持切片滚动,以并行化重新索引过程。这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

手动切分

对每个请求通过提供一个slice id和切分总数来手动切分Reindex请求:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
复制代码

如果你想验证结果:

curl -X GET "localhost:9200/_refresh"
curl -X POST "localhost:9200/new_twitter/_search?size=0&filter_path=hits.total"
复制代码

结果是这样一个合理的total:

{
  "hits": {
    "total" : {
        "value": 120,
        "relation": "eq"
    }
  }
}
复制代码

自动切分

您还可以让_reindex使用切片滚动自动并行化_uid上的切片。使用slices指定要使用的片数:

curl -X POST "localhost:9200/_reindex?slices=5&refresh" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}
'
复制代码

你可以验证其结果:

curl -X POST "localhost:9200/new_twitter/_search?size=0&filter_path=hits.total"
复制代码

结果是这样一个合理的total:

{
  "hits": {
    "total" : {
        "value": 120,
        "relation": "eq"
    }
  }
}
复制代码

设置slicesauto,Elasticsearch将自动的选择一个合理的切分数来使用,此设置将在一定限度内使用每个切块一个切分。如果有多个源索引,它将根据分片数量最少的索引选择切片的数量。

slices添加到_reindex只是自动执行上一节中使用的手动过程,创建子请求,这意味着它有一些奇怪的地方:

  • 你可以在TaskAPI中看到这些请求。这些子请求是带有slices请求的Task的childTask

  • 为带有slices的请求获取任务状态,只包含已完成slices的状态。

  • 这些子请求可以单独寻址,用于cancelation(取消) 和rethrottling(改变节流的值)等操作。

  • 带有slices的Rethrottling请求将按比例rethrottle未完成的子请求。

  • 关闭带有slices的请求将关闭每个子请求。

  • 由于切片的性质,每个子请求不会完全得到文档部分。所有文档都将被处理,但是有些部分可能比其他部分更大。期望更大的slices具有更均匀的分布。

  • 带有slices的请求上的requests_per_secondsize等参数按比例分布到每个子请求。将这一点与上面关于分布不均匀的观点结合起来,我们得出结论:使用带slicessize可能不会导致reindex精确大小的文档

  • 每个子请求都获得稍微不同的源索引的快照,尽管这些快照几乎是同时获取的。

带有slices的数量

如果是自动切分请求,设置slicesauto将选择一个合理的数值来给大多数索引。如果手动切片或以其他方式调优自动切片,请继续阅读下方内容:

slices数等于分片数这时候查询性能是最好的。如果slices值很大,选择一个更小的值会有损性能。将slices的数量设置为大于分片片的数量通常不会提高效率,而且会增加开销。

索引性能随着分片的数量在可用资源之间线性扩展。

查询或索引性能是否主导运行时取决于reindex的文档和集群资源。

同时Reindex索引

如果你有多个索引要Reindex,通常更好的做法是一次Reindex一个,而不是使用全局模式同时获取多个。这样,如果有任何错误,您可以删除部分完成的索引并从该索引重新开始,从而恢复该过程。它还使并行化过程变得相当简单:将索引列表拆分为reindex并并行运行每个列表。

一次性bash脚本似乎很好地解决了这个问题:

for index in i1 i2 i3 i4 i5; do
  curl -HContent-Type:application/json -XPOST localhost:9200/_reindex?pretty -d'{
    "source": {
      "index": "'$index'"
    },
    "dest": {
      "index": "'$index'-reindexed"
    }
  }'
done
复制代码
Reindex daily索引

尽管有上面的建议,你可以使用_reindex结合Painless来reindex daily 索引将新模板应用于现有文档。

假设你有一下文件组成的索引:

curl -X PUT "localhost:9200/metricbeat-2016.05.30/_doc/1?refresh" -H 'Content-Type: application/json' -d'
{"system.cpu.idle.pct": 0.908}
'
curl -X PUT "localhost:9200/metricbeat-2016.05.31/_doc/1?refresh" -H 'Content-Type: application/json' -d'
{"system.cpu.idle.pct": 0.105}
'
复制代码

metricbeat-*索引的新模板已经加载到Elasticsearch中,但它只适用于新创建的索引。Painless可用于reindex现有文档并应用新模板。

下面的脚本从索引名称中提取日期,并创建一个附加了-1的新索引。 来自metricbeat-2016.05.31的所有数据将reindex到metricbeat-2016.05.31-1

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "source": {
    "index": "metricbeat-*"
  },
  "dest": {
    "index": "metricbeat"
  },
  "script": {
    "lang": "painless",
    "source": "ctx._index = \u0027metricbeat-\u0027 + (ctx._index.substring(\u0027metricbeat-\u0027.length(), ctx._index.length())) + \u0027-1\u0027"
  }
}
'
复制代码

从metricbeat 索引之前的所有文档现在都可以在*-1中找到:

curl -X GET "localhost:9200/metricbeat-2016.05.30-1/_doc/1"
curl -X GET "localhost:9200/metricbeat-2016.05.31-1/_doc/1"
复制代码

前一种方法还可以与更改字段名称结合使用,以便仅将现有数据加载到新索引中,并在需要时重命名任何字段。

获取索引的随机子集

_reindex可用于提取索引的随机子集进行测试:

curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
{
  "size": 10,
  "source": {
    "index": "twitter",
    "query": {
      "function_score" : {
        "query" : { "match_all": {} },
        "random_score" : {}
      }
    },
    #_reindex默认按_doc排序,因此random_score不会有任何效果,除非您覆盖sort to _score。
    "sort": "_score"    
  },
  "dest": {
    "index": "random_twitter"
  }
}
'
复制代码

Term Vectors

返回特定文档字段中术语的信息和统计信息。文档可以存储在索引中,也可以由用户人工提供。默认情况下,术语向量是实时的,而不是接近实时的。这可以通过将realtime参数设置为false来更改。

curl -X GET "localhost:9200/twitter/_termvectors/1"
复制代码

您可以选择使用url中的参数指定检索信息的字段:

curl -X GET "localhost:9200/twitter/_termvectors/1?fields=message"
复制代码

或者在请求体中添加请求字段,字段也可以使用通配符指定,方法类似于multi match query

返回值

可请求的三种类型的值:term information, term statistics 和 field statistics,默认的,返回term information, 和 field statistics的所有字段信息,但是不返回term statistics 信息。

Term information

  • term frequency in the field (总是返回)
  • term positions (positions : true)
  • start and end offsets (offsets : true)
  • term payloads (payloads : true), as base64 encoded bytes

如果在索引中请求信息没有存储,如果可能的话,它将被实时计算。另外,可以计算甚至不存在于索引中的文档,而是由用户提供的 term vectors。

开始和结束偏移量假设使用的是UTF-16编码。如果您想使用这些偏移量来获得生成此令牌的原始文本,则应该确保您正在获取的子字符串也使用UTF-16编码。

Term statistics

设置 term_statisticstrue (默认为false) 将会返回相关信息

  • total term frequency (一个term在所有文档中出现的频率是多少)
  • document frequency (包含当前term的文档数量)

默认情况下,那些值是不会返回的,因为term statistics对性能有严重影响

Field statistics

设置 term_statisticsfalse(默认为true) 将会不会被返回

  • document count (包含当前字段有多少文档)
  • sum of document frequencies (此字段中所有term的文档频率总和)
  • sum of total term frequencies (该字段中每个项的总term频率之和)

Term过滤

使用参数filter,还可以根据tf-idf分数对返回的term进行筛选。

支持下列子参数:

max_num_terms 每个字段必须返回的最大term数. 默认为2525.
min_term_freq 忽略源文档中小于此频率的单词. 默认为1.
max_term_freq 忽略源文档中小于此频率的单词. 默认为无限制。
min_doc_freq 忽略至少在这么多文档中没有出现的术语. 默认为 1.
max_doc_freq 忽略出现在很多文档中的单词.默认为无限制。
min_word_length 将被忽略的单词的最小单词长度.默认为 0.
max_word_length 超过该长度的单词将被忽略. 默认为无限制 (0).
表现

Term和字段统计是不准确的。删除的文档不考虑在内,仅为所请求的文档所在的切分检索信息。因此,term和字段统计仅作为相对的度量有用,而绝对值在这方面没有意义。默认情况下,在请求人工文档的term vectors时,随机选择一个分片来获取统计信息。使用路由只会命中特定的分片。

返回存储的term vectors

首先我们存储创建一个索引来存储term vectors ,请求体如下:

curl -X PUT "localhost:9200/twitter" -H 'Content-Type: application/json' -d'
{ "mappings": {
    "properties": {
      "text": {
        "type": "text",
        "term_vector": "with_positions_offsets_payloads",
        "store" : true,
        "analyzer" : "fulltext_analyzer"
       },
       "fullname": {
        "type": "text",
        "term_vector": "with_positions_offsets_payloads",
        "analyzer" : "fulltext_analyzer"
      }
    }
  },
  "settings" : {
    "index" : {
      "number_of_shards" : 1,
      "number_of_replicas" : 0
    },
    "analysis": {
      "analyzer": {
        "fulltext_analyzer": {
          "type": "custom",
          "tokenizer": "whitespace",
          "filter": [
            "lowercase",
            "type_as_payload"
          ]
        }
      }
    }
  }
}
'
复制代码

第二步,我们添加一些文档:

curl -X PUT "localhost:9200/twitter/_doc/1" -H 'Content-Type: application/json' -d'
{
  "fullname" : "John Doe",
  "text" : "twitter test test test "
}
'
curl -X PUT "localhost:9200/twitter/_doc/2" -H 'Content-Type: application/json' -d'
{
  "fullname" : "Jane Doe",
  "text" : "Another twitter test ..."
}
'
复制代码

下面的请求返回关于text字段在文档1中的所有信息和统计:

curl -X GET "localhost:9200/twitter/_termvectors/1" -H 'Content-Type: application/json' -d'
{
  "fields" : ["text"],
  "offsets" : true,
  "payloads" : true,
  "positions" : true,
  "term_statistics" : true,
  "field_statistics" : true
}
'
复制代码

响应:

{
    "_id": "1",
    "_index": "twitter",
    "_type": "_doc",
    "_version": 1,
    "found": true,
    "took": 6,
    "term_vectors": {
        "text": {
            "field_statistics": {
                "doc_count": 2,
                "sum_doc_freq": 6,
                "sum_ttf": 8
            },
            "terms": {
                "test": {
                    "doc_freq": 2,
                    "term_freq": 3,
                    "tokens": [
                        {
                            "end_offset": 12,
                            "payload": "d29yZA==",
                            "position": 1,
                            "start_offset": 8
                        },
                        {
                            "end_offset": 17,
                            "payload": "d29yZA==",
                            "position": 2,
                            "start_offset": 13
                        },
                        {
                            "end_offset": 22,
                            "payload": "d29yZA==",
                            "position": 3,
                            "start_offset": 18
                        }
                    ],
                    "ttf": 4
                },
                "twitter": {
                    "doc_freq": 2,
                    "term_freq": 1,
                    "tokens": [
                        {
                            "end_offset": 7,
                            "payload": "d29yZA==",
                            "position": 0,
                            "start_offset": 0
                        }
                    ],
                    "ttf": 2
                }
            }
        }
    }
}
复制代码

动态生成term vectors

没有显式存储在索引中的Term vectors将自动动态计算。下面的请求返回文档1中字段的所有信息和统计信息,即使这些term没有显式地存储在索引中。注意,对于字段text,不重新生成term。

curl -X GET "localhost:9200/twitter/_termvectors/1" -H 'Content-Type: application/json' -d'
{
  "fields" : ["text", "some_field_without_term_vectors"],
  "offsets" : true,
  "positions" : true,
  "term_statistics" : true,
  "field_statistics" : true
}
'
复制代码

人工文档

还可以为人工文档生成term vectors,即索引中不存在的文档。例如,下面的请求将返回与示例1相同的结果。使用的映射由索引决定。

如果打开动态映射(默认),将动态创建原始映射中没有的文档字段。

curl -X GET "localhost:9200/twitter/_termvectors" -H 'Content-Type: application/json' -d'
{
  "doc" : {
    "fullname" : "John Doe",
    "text" : "twitter test test test"
  }
}
'
复制代码

Per-feild 分析器

此外,可以使用per_field_analyzer参数提供不同于字段的分析器。这对于以任何方式生成term vectors都很有用,特别是在使用人工文档时。当为已经存储term vectors的字段提供分析器时,将重新生成term vectors。

curl -X GET "localhost:9200/twitter/_termvectors" -H 'Content-Type: application/json' -d'
{
  "doc" : {
    "fullname" : "John Doe",
    "text" : "twitter test test test"
  },
  "fields": ["fullname"],
  "per_field_analyzer" : {
    "fullname": "keyword"
  }
}
'
复制代码
{
  "_index": "twitter",
  "_type": "_doc",
  "_version": 0,
  "found": true,
  "took": 6,
  "term_vectors": {
    "fullname": {
       "field_statistics": {
          "sum_doc_freq": 2,
          "doc_count": 4,
          "sum_ttf": 4
       },
       "terms": {
          "John Doe": {
             "term_freq": 1,
             "tokens": [
                {
                   "position": 0,
                   "start_offset": 0,
                   "end_offset": 8
                }
             ]
          }
       }
    }
  }
}
复制代码

Term 过滤

最后,返回的term可以根据tf-idf分数进行筛选。在下面的示例中,我们从具有给定“plot”字段值的人工文档中获得三个最“interesting”的关键字。注意,关键字“Tony”或任何停止词都不是响应的一部分,因为它们的tf-idf必须太低。

curl -X GET "localhost:9200/imdb/_termvectors" -H 'Content-Type: application/json' -d'
{
    "doc": {
      "plot": "When wealthy industrialist Tony Stark is forced to build an armored suit after a life-threatening incident, he ultimately decides to use its technology to fight against evil."
    },
    "term_statistics" : true,
    "field_statistics" : true,
    "positions": false,
    "offsets": false,
    "filter" : {
      "max_num_terms" : 3,
      "min_term_freq" : 1,
      "min_doc_freq" : 1
    }
}
'
复制代码
{
   "_index": "imdb",
   "_type": "_doc",
   "_version": 0,
   "found": true,
   "term_vectors": {
      "plot": {
         "field_statistics": {
            "sum_doc_freq": 3384269,
            "doc_count": 176214,
            "sum_ttf": 3753460
         },
         "terms": {
            "armored": {
               "doc_freq": 27,
               "ttf": 27,
               "term_freq": 1,
               "score": 9.74725
            },
            "industrialist": {
               "doc_freq": 88,
               "ttf": 88,
               "term_freq": 1,
               "score": 8.590818
            },
            "stark": {
               "doc_freq": 44,
               "ttf": 47,
               "term_freq": 1,
               "score": 9.272792
            }
         }
      }
   }
}	
复制代码

Multi termvectors API

Multi termvector API允许同时获得多个termvector。检索termvectors的文档由索引和id指定。但是也可以在请求本身中人为地提供这些文档。

响应包括一个文档数组,其中包含所有获取的termvector,每个元素都具有termvectors API提供的结构。举个例子:

curl -X POST "localhost:9200/_mtermvectors" -H 'Content-Type: application/json' -d'
{
   "docs": [
      {
         "_index": "twitter",
         "_id": "2",
         "term_statistics": true
      },
      {
         "_index": "twitter",
         "_id": "1",
         "fields": [
            "message"
         ]
      }
   ]
}
'
复制代码

有关可能的参数的描述,请参见[termvectors API](#Term Vectors)。

_mtermvector路径也可以用于索引(在这种情况下,在body中不需要它):

curl -X POST "localhost:9200/twitter/_mtermvectors" -H 'Content-Type: application/json' -d'
{
   "docs": [
      {
         "_id": "2",
         "fields": [
            "message"
         ],
         "term_statistics": true
      },
      {
         "_id": "1"
      }
   ]
}
'
复制代码

如果所有的文档在相同的索引,并且参数也相同,请求可以被简化:

curl -X POST "localhost:9200/twitter/_mtermvectors" -H 'Content-Type: application/json' -d'
{
    "ids" : ["1", "2"],
    "parameters": {
    	"fields": [
         	"message"
      	],
      	"term_statistics": true
    }
}
'
复制代码

另外,就和termvectors API一样,可以为用户提供的文档生成termvectors。使用的映射由_index确定。

curl -X POST "localhost:9200/_mtermvectors" -H 'Content-Type: application/json' -d'
{
   "docs": [
      {
         "_index": "twitter",
         "doc" : {
            "user" : "John Doe",
            "message" : "twitter test test test"
         }
      },
      {
         "_index": "twitter",
         "doc" : {
           "user" : "Jane Doe",
           "message" : "Another twitter test ..."
         }
      }
   ]
}
'
复制代码

?refresh

当此请求所做的更改对搜索可见时,Index,Update,Delete,Bulk API支持设置refrash来控制。这些是允许的值:

空字符串或者true

​ 在操作发生后立即刷新相关的主分片和副本分片(而不是整个索引),以便更新后的文档立即出现在搜索结果中。只有在确保它不会导致性能低下(无论是从索引还是搜索的角度)之后,才应该这样做。

wait_for

​ 在响应之前,等待请求所做的更改被刷新为可见。这并不强制立即刷新,而是等待刷新发生。Elasticsearch自动刷新已更改每个索引的分片。refresh_interval,默认值为1秒。该设置是动态的。在任何支持刷新的API上调用Refresh API或将Refresh设置为true也会导致刷新,从而导致已经运行的带有Refresh =wait_for的请求返回。

false(默认值)

​ 不采取刷新相关操作。此请求所做的更改将在请求返回后的某个时刻变得可见。

选择要使用的设置

除非你有更好的理由使用等待改变为可见,否则使用refresh=false。或者,因为这是缺省值,所以将refresh参数保留在URL之外。这是最简单和最快的选择。

如果您绝对必须使请求所做的更改与请求同步可见,那么您必须在向Elasticsearch添加更多负载(true)和等待响应更长的时间(wait_for)之间进行选择。以下几点应有助于作出这一决定:

  • 与true相比,对索引进行的更改越多,wait_for保存的工作就越多。在每个索引只更改一次索引的情况下。refresh_interval则不保存任何工作。
  • true创建效率较低的索引构造(小段),这些构造稍后必须合并为更高效的索引构造(更大的段)。这意味着true的代价是在index时创建小段,在search时搜索小段,在merge时生成更大的段。
  • 永远不要在一行中启动多个refresh=wait_for请求。相反,使用refresh=wait_for将它们批处理为单个批量请求,而Elasticsearch将并行地启动它们,只有当它们全部完成时才返回。
  • 如果刷新间隔设置为-1,禁用自动刷新,那么refresh=wait_for的请求将无限期等待,直到某个操作导致刷新。相反,设置索引。refresh_interval小于默认值(如200ms)将使refresh=wait_for更快地返回,但它仍然会生成效率低下的segments。
  • refresh=wait_for只影响正在运行的请求,但是,通过强制立即刷新,refresh=true将影响其他正在进行的请求。通常,如果您有一个正在运行的系统,您不希望干扰它,那么refresh=wait_for是一个较小的修改。
refrash=wait_for可以强制刷新

当已经有索引时,如果出现refresh=wait_for请求。max_refresh_listener(默认值为1000)请求等待对该分片的刷新,然后该请求的行为将与将refresh设置为true一样:它将强制刷新。换言之:当refresh=wait_for请求返回时,它的更改对于搜索是可见的,同时防止对阻塞的请求使用未检查的资源。如果一个请求因为耗尽了侦听器插槽而强制刷新,那么它的响应将包含“forced_refresh”:true。

无论修改切分多少次,批量请求在每个切分上只占用一个插槽。

例子

这些将创建一个文档,并立即刷新索引,使其可见:

curl -X PUT "localhost:9200/test/_doc/1?refresh" -H 'Content-Type: application/json' -d'
{"test": "test"}
'
curl -X PUT "localhost:9200/test/_doc/2?refresh=true" -H 'Content-Type: application/json' -d'
{"test": "test"}
'
复制代码

这将创建一个文档,而不做任何事情,使其搜索可见:

curl -X PUT "localhost:9200/test/_doc/3" -H 'Content-Type: application/json' -d'
{"test": "test"}
'
curl -X PUT "localhost:9200/test/_doc/4?refresh=false" -H 'Content-Type: application/json' -d'
{"test": "test"}
'
复制代码

这将创建一个文档,并等待它成为可见的搜索:

curl -X PUT "localhost:9200/test/_doc/4?refresh=wait_for" -H 'Content-Type: application/json' -d'
{"test": "test"}
'
复制代码

乐观并发控制

Elasticsearch是分布式的,当文档被创建,删除,更新,更改,一个新的文档版本就必须被复制到其他节点。Elasticsearch是异步的,并发的,这意味着那些复制请求是并发的被发送出去的,可能不按顺序到达目的地,elasticsearch需要一种方法开确保老的文档版本号永远不会覆盖新的文档版本号。

为了确保老的版本号永远不会覆盖新的版本号,对文档执行的每个操作都会通过主分片分配一个序列号,改序列号将会协调更改。每个操作的序列号是自增的,并且较新的操作保证具有比旧操作更高的序列号。

例如,下面的索引命令将创建一个文档,并为其分配一个初始序列号和主term:

curl -X PUT "localhost:9200/products/_doc/1567" -H 'Content-Type: application/json' -d'
{
    "product" : "r2d2",
    "details" : "A resourceful astromech droid"
}
'
复制代码

您可以在响应的_seq_no_primary_term字段中看到分配的序列号和主term:

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 1
    },
    "_index" : "products",
    "_type" : "_doc",
    "_id" : "1567",
    "_version" : 1,
    "_seq_no" : 362,
    "_primary_term" : 2,
    "result" : "created"
}
复制代码

Elasticsearch跟踪要更改其存储的每个文档的最后一个操作的序号和主term。在GET API的响应中,在_seq_no_primary_term字段中返回序列号和主项:

curl -X GET "localhost:9200/products/_doc/1567"
复制代码

返回的结果:

{
    "_index" : "products",
    "_type" : "_doc",
    "_id" : "1567",
    "_version" : 1,
    "_seq_no" : 362,
    "_primary_term" : 2,
    "found": true,
    "_source" : {
        "product" : "r2d2",
        "details" : "A resourceful astromech droid"
    }
}
复制代码

注意:通过设置seq_no_primary_term参数,搜索API可以为每次搜索命中返回_seq_no_primary_term

序列号和主term惟一地标识更改。通过记录返回的序列号和主term,您可以确保只在检索后没有对文档进行其他更改的情况下更改文档。这是通过设置索引API或删除API的if_seq_noif_primary_term参数来实现的。

例如,下面的索引调用将确保向文档添加一个标签,而不会丢失对描述的任何潜在更改,或由另一个API添加另一个标签:

curl -X PUT "localhost:9200/products/_doc/1567?if_seq_no=362&if_primary_term=2" -H 'Content-Type: application/json' -d'
{
    "product" : "r2d2",
    "details" : "A resourceful astromech droid",
    "tags": ["droid"]
}
'
复制代码
关注下面的标签,发现更多相似文章
评论