阅读 287

Spark写入es:EsHadoopRemoteException: version_conflict_engine_exception

背景介绍

  • 业务场景:spark批量写入es,基于es-hadoop组件实现
  • 批处理任务定时调度
  • cdh5.5.3集群,spark2.3,elasticsearch6.4.3
  • es中对应索引的_id由程序控制,保证全局唯一
  • 仅测试环境出现,且偶尔出现

问题描述

完整报错信息如下:

19/05/20 11:08:54 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 24.0 failed 4 times, most recent failure: Lost task 2.3 in stage 24.0 (TID 849, p016d052n01, executor 6): org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for bulk operation [24/1000]. Error sample (first [5] error messages):
	org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZVIK_2462056_2019-05-18]: version conflict, document already exists (current version [1])
	{"update":{"_id":"OZVIK_2462056_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"OZVIK_2462056_2019-05-18","product_no":"OZVIK","cust_id":"2462056","p106":32,"p107":61,"p108":55,"p109":"YGM6E","p110":1,"p111":46,"p112":11126,"p113":189,"p114":70,"p115":6,"p116":60,"p117":"male","p118":"gg","p119":19,"p120":2,"p121":1544025600000,"p122":69,"p123":"FL0SS","dt":"2019-05-18","absum01":71,"testday01":76,"testday02":11202,"testday03":"7611202","testday04":"70male","testday04_2":22404,"testday05":"761120270male761120222404","amount01":"YGM6E2462056","amount02":22252,"amount03":"OZVIK","aa":11197,"testb21":93,"fix_const_999_0222":999,"0304tf":"999 2462056 YGM6E","0305test_long":11173,"hello":87,"datetest":"2019-05-18","binarytest":32,"nestedtest":"YGM6E","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZVIK","floattest02":1,"__namelist_54":"0"}}

	org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZWTC_148752_2019-05-18]: version conflict, document already exists (current version [1])
	{"update":{"_id":"OZWTC_148752_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"OZWTC_148752_2019-05-18","product_no":"OZWTC","cust_id":"148752","p106":88,"p107":20,"p108":13,"p109":"3BIW6","p110":1,"p111":79,"p112":15107,"p113":183,"p114":62,"p115":85,"p116":68,"p117":"female","p118":"nn","p119":51,"p120":80,"p121":1534867200000,"p122":87,"p123":"VOG2J","dt":"2019-05-18","absum01":63,"testday01":147,"testday02":15254,"testday03":"14715254","testday04":"62female","testday04_2":30508,"testday05":"1471525462female1471525430508","amount01":"3BIW6148752","amount02":30214,"amount03":"OZWTC","aa":15170,"testb21":108,"fix_const_999_0222":999,"0304tf":"999 148752 3BIW6","0305test_long":15187,"hello":101,"datetest":"2019-05-18","binarytest":88,"nestedtest":"3BIW6","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZWTC","floattest02":1,"__namelist_54":"0"}}

	org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P08Y7_3310671_2019-05-18]: version conflict, document already exists (current version [1])
	{"update":{"_id":"P08Y7_3310671_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P08Y7_3310671_2019-05-18","product_no":"P08Y7","cust_id":"3310671","p106":27,"p107":62,"p108":40,"p109":"5JPCP","p110":0,"p111":93,"p112":17036,"p113":185,"p114":68,"p115":54,"p116":24,"p117":"female","p118":"aa","p119":43,"p120":88,"p121":1536508800000,"p122":43,"p123":"HI31Q","dt":"2019-05-18","absum01":68,"testday01":122,"testday02":17158,"testday03":"12217158","testday04":"68female","testday04_2":34316,"testday05":"1221715868female1221715834316","amount01":"5JPCP3310671","amount02":34072,"amount03":"P08Y7","aa":17104,"testb21":89,"fix_const_999_0222":999,"0304tf":"999 3310671 5JPCP","0305test_long":17129,"hello":67,"datetest":"2019-05-18","binarytest":27,"nestedtest":"5JPCP","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P08Y7","floattest02":0,"__namelist_54":"0"}}

	org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P0TI9_8523_2019-05-18]: version conflict, document already exists (current version [1])
	{"update":{"_id":"P0TI9_8523_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P0TI9_8523_2019-05-18","product_no":"P0TI9","cust_id":"8523","p106":20,"p107":68,"p108":36,"p109":"YIP72","p110":0,"p111":24,"p112":13632,"p113":197,"p114":73,"p115":70,"p116":90,"p117":"male","p118":"aa","p119":75,"p120":11,"p121":1532361600000,"p122":82,"p123":"8KUUS","dt":"2019-05-18","absum01":73,"testday01":143,"testday02":13775,"testday03":"14313775","testday04":"73male","testday04_2":27550,"testday05":"1431377573male1431377527550","amount01":"YIP728523","amount02":27264,"amount03":"P0TI9","aa":13705,"testb21":88,"fix_const_999_0222":999,"0304tf":"999 8523 YIP72","0305test_long":13656,"hello":56,"datetest":"2019-05-18","binarytest":20,"nestedtest":"YIP72","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P0TI9","floattest02":0,"__namelist_54":"0"}}

	org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P1J8O_2619118_2019-05-18]: version conflict, document already exists (current version [1])
	{"update":{"_id":"P1J8O_2619118_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P1J8O_2619118_2019-05-18","product_no":"P1J8O","cust_id":"2619118","p106":99,"p107":57,"p108":53,"p109":"NR3QD","p110":1,"p111":83,"p112":17171,"p113":157,"p114":55,"p115":8,"p116":20,"p117":"male","p118":"oo","p119":42,"p120":4,"p121":1516636800000,"p122":62,"p123":"FO4IS","dt":"2019-05-18","absum01":56,"testday01":63,"testday02":17234,"testday03":"6317234","testday04":"55male","testday04_2":34468,"testday05":"631723455male631723434468","amount01":"NR3QD2619118","amount02":34342,"amount03":"P1J8O","aa":17227,"testb21":156,"fix_const_999_0222":999,"0304tf":"999 2619118 NR3QD","0305test_long":17255,"hello":152,"datetest":"2019-05-18","binarytest":99,"nestedtest":"NR3QD","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P1J8O","floattest02":1,"__namelist_54":"0"}}

Bailing out...
	at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519)
	at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:127)
	at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

复制代码

es客户端在写入es时,数据现有的版本号与它所持有的版本号不一致,即有别的client已经修改过数据。

解决过程

1)首先思考:确保了_id全局唯一,正常情况下同一个_id的数据,仅会被一个spark task执行一次。而es基于乐观锁进行控制,只有其他client在当前client读写之间进行了数据的更改才会导致当前client报版本冲突错误。于是思考,是什么原因导致会有至少两个client去写同一条数据呢?

可能一:spark的动态资源分配

spark的动态资源分配,在CDH中确实会导致executor数量成倍增长,然后将task调度到新的executor执行,但这不会导致同一个task对应的数据(partition)多个task执行,故排除。

可能二:task的推测执行

推测执行机制为了防止某个task拖慢task set整体的执行进度,会为同一份数据启动多个task,哪个task最先执行完就以该task的结果为准,并杀掉其他task。该种情况确实会产生多个client写同一条数据产生版本冲突,但spark默认并未开启该机制,程序也没有手动设置,所以也要排除。

2)debug源代码,因为问题很难复现问题,也没有获得足够有用的信息。

3)这个时候突然发现ui界面除了有大量版本冲突的报错信息,在某个角落还有一种EsHadoopNoNodesLeftException: Connection error的错误信息,再结合spark的task重试机制,貌似已经有了答案。由于网络原因,es连接异常,但已经写入的数据却无法回滚,spark重新调度该任务,新任务以数据的版本号为0进行写入,但实际已经写入的数据版本已经被自增为1了,这时报版本冲突。

4)首先解决版本冲突问题。因为只要保证数据不丢失,所以版本冲突时只需忽略该条数据即可。

结合官网配置如下错误处理器

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector)
            throws Exception
    {
        if (entry.getResponseCode() == 409) {
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

复制代码

经验证,确实不会再出现版本冲突的错误,ui界面只能看到EsHadoopNoNodesLeftException: Connection error

5)解决EsHadoopNoNodesLeftException: Connection error

由于集群使用docker虚拟机搭建,并且elasticsearch与cdh集群部署在一起,整体性能较差;并且集群中默认开启了spark的动态资源分配,导致写入并行度成倍增长。以上原因导致连接异常报错。 解决:使用--conf spark.dynamicAllocation.enabled=false 禁用动态资源分配,同时调整并行度,即控制同时写入es的client数量。

经验证,连接异常不再出现。

源码验证

由 dataframe.saveToEs(to, map) 开始,调用链如下:

SparkDataFrameFunctions#saveToEs
 EsSparkSQL#saveToEs
  SparkContext#runJob

忽略dag划分、task调度等细节,关注runJob方法 sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)

EsDataFrameWriter的write方法转换为函数作为参数传递到runJob中,在后续调用

def write(taskContext: TaskContext, data: Iterator[T]) {
    val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)

    taskContext.addTaskCompletionListener((TaskContext) => writer.close())

    if (runtimeMetadata) {
      writer.repository.addRuntimeFieldExtractor(metaExtractor)
    }

    while (data.hasNext) {
      writer.repository.writeToIndex(processData(data))
    }
  }

复制代码

调用链如下: RestRepository#writeToIndex
 RestRepository#doWriteToIndex
  BulkProcessor#add
   BulkProcessor#flush
    BulkProcessor#tryFlush
     RestClient#bulk
      NetworkClient#execute

核心方法:

public Response execute(Request request) {
        Response response = null;

        boolean newNode;
        do {
            SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body());

            newNode = false;
            try {
                response = currentTransport.execute(routedRequest);
                ByteSequence body = routedRequest.body();
                if (body != null) {
                    stats.bytesSent += body.length();
                }
            } catch (Exception ex) {
                if (ex instanceof EsHadoopIllegalStateException) {
                    throw (EsHadoopException) ex;
                }
                // issues with the SSL handshake, bail out instead of retry, for security reasons
                if (ex instanceof javax.net.ssl.SSLException) {
                    throw new EsHadoopTransportException(ex);
                }
                // check for fatal, non-recoverable network exceptions
                if (ex instanceof BindException) {
                    throw new EsHadoopTransportException(ex);
                }

                if (log.isTraceEnabled()) {
                    log.trace(
                            String.format(
                                    "Caught exception while performing request [%s][%s] - falling back to the next node in line...",
                                    currentNode, request.path()), ex);
                }

                String failed = currentNode;

                failedNodes.put(failed, ex);

                newNode = selectNextNode();

                log.error(String.format("Node [%s] failed (%s); "
                        + (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."),
                        failed, ex.getMessage()));

                if (!newNode) {
                    throw new EsHadoopNoNodesLeftException(failedNodes);
                }
            }
        } while (newNode);

        return response;
    }

复制代码

在此抛出 EsHadoopNoNodesLeftException

总结建议

再次验证一个道理:读报错信息一定要有耐心,以免误入歧途。

关注下面的标签,发现更多相似文章
评论