DataX writer 批量提交

1,934 阅读1分钟

人面不知何处去,桃花依旧笑春风。

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQLOracleSqlServerPostgreHDFSHiveADSHBaseTableStore(OTS)MaxCompute(ODPS)DRDS 等各种异构数据源之间高效的数据同步功能。

优化

优化如下:

默认 HbaseAbstractTask.startWriter方法

public void startWriter(RecordReceiver lineReceiver,TaskPluginCollector taskPluginCollector){
        Record record;
        try {
            while ((record = lineReceiver.getFromReader()) != null) {
                Put put;
                try {
                    put = convertRecordToPut(record);
                } catch (Exception e) {
                    taskPluginCollector.collectDirtyRecord(record, e);
                    continue;
                }
                try {
                    this.htable.put(put);
                } catch (IllegalArgumentException e) {
                    if(e.getMessage().equals("No columns to insert") && nullMode.equals(NullModeType.Skip)){
                        LOG.info(String.format("record is empty, 您配置nullMode为[skip],将会忽略这条记录,record[%s]", record.toString()));
                        continue;
                    }else {
                        taskPluginCollector.collectDirtyRecord(record, e);
                        continue;
                    }
                }
            }
        }catch (IOException e){
            throw DataXException.asDataXException(Hbase094xWriterErrorCode.PUT_HBASE_ERROR,e);
        }finally {
            Hbase094xHelper.closeTable(this.htable);
        }
    }

hbasehtable api支持putList方法,修改上面代码如下:

public void startWriter(RecordReceiver lineReceiver,TaskPluginCollector taskPluginCollector){
        Record record;
        List<Put> putList = new ArrayList<>(2000);
        Long begin = System.currentTimeMillis();
        try {
            while ((record = lineReceiver.getFromReader()) != null) {
                Put put;
                try {
                    put = convertRecordToPut(record);
                } catch (Exception e) {
                    taskPluginCollector.collectDirtyRecord(record, e);
                    continue;
                }
                putList.add(put);
                try {
                    if (putList.size() % 2000 == 0 || System.currentTimeMillis() - begin > 200) {
                        this.asyncTable.put(putList);
                        putList.clear();
                        begin = System.currentTimeMillis();
                    }
                } catch (IllegalArgumentException e) {
                    if (e.getMessage().equals("No columns to insert") && nullMode.equals(NullModeType.Skip)) {
                        LOG.info(String.format("record is empty, 您配置nullMode为[skip],将会忽略这条记录,record[%s]", record.toString()));
                        continue;
                    } else {
                        taskPluginCollector.collectDirtyRecord(record, e);
                        continue;
                    }
                }
            }
        } finally {
            Hbase20xHelper.closeConn(future);
        }
    }

修改为每2000条记录提交一次,减少请求。

总结

如果你使用的writer中支持批量提交,也可以按照上面进行修改