HDFS Decommission问题分析

2,192 阅读8分钟
本文通过更改配置及数据结构改造,快速解决HDFS Decommission缓慢问题。
上篇文章回顾:记一次Open-Falcon-Graph频繁OOM问题排查

背景

c3prc-xiami有大量raid单副本文件,decommission单个datanode速度很慢,观察监控指标,发现:

  • 网卡流量始终保持低速,60~80mb/s

  • 磁盘io util也是单个磁盘100%在某一个时刻,也即是同一时间只有一个磁盘在工作

这样下线速度就十分缓慢,一个datanode一天只能下4w个block。而一个datanode平均有20w个block,这个速度明显不符合要求。

最开始认为是配置问题,我们分析了几种配置,有一定的效果,提高到了6w个block每天,但还是慢。

最后我们从代码层面着手,改变相关数据结构,使下线速度明显提升,一个datanode下线平均1到2天就能下完。

分析配置

public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;public static final String  DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;

blockmanager.maxReplicationStreams这个变量有两个作用:

(1)在选择从哪里把块复制出去的时候chooseSourceDatanode(),如果某个dn上已经有>maxReplicationStreams的块在被复制则不会再选中它作为源了。

(2)HeartbeatManager每次会想dn发送DNA_TRANSFER命令,会从DatanodeDescriptor.replicateBlocks取一定数量的block进行传输,而每个DN能够启动的DataTransfer线程数最大不能超过maxReplicationStreams。

blockmanage.replicationStreamsHardLimit同上一个变量类似,只是在chooseSourceDatanode(),如果block的优先级最高,这个dn还能再多复制2个(默认值分别是2,4),但是不能>replicationStreamsHardLimit。

所以在同一时刻最多replicationStreamsHardLimit被选出,而且是在同一个dn中。但是单纯调整hardLimit并没有多少效果。

真正控制一个dn往外拷贝数据还是maxReplicationStreams,dn通过心跳向NN报告正在进行Transfer的线程数,而后NN向dn发送maxTransfer个DNA_transfer CMD:

//get datanode commandsfinal int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;xmitsInProgress=正在传输数量

对于每个dn:从待复制blocks队列取出maxTransfers

public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {  return replicateBlocks.poll(maxTransfers);}

结论1:

通过调大上述2个参数,从2到4,再调整到8,效果还是比较明显,dn中的日志也反映出同一个时刻传输线程数有所增加。

但当调整为12或者更大时,就没有多少效果了。整体网速也没有上来。

目前调整为12是比较合理的。和单个dn磁盘数量对应。

publicstatic final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY ="dfs.namenode.replication.max-streams";publicstatic final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;publicstatic final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY ="dfs.namenode.replication.max-streams-hard-limit";publicstatic final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;

namenode中的blockmanager. ReplicationMonitor每3秒会computeDatanodeWork并且取一批block,然后告诉dn去复制这些blocks取的数量:

final int blocksToProcess = numlive * this.blocksReplWorkMultiplier;

开始我认为调大能加快下线速度,但这个参数影响小,它的作用仅仅是把取出来的block放入DatanodeDescriptor的等待队列中(replicateBlocks)

同过观察NN日志发现如下问题:

  • ReplicationMonitor每次迭代会打印日志:

askdn_ip to replicate blk_xxx to another_dn.

并且数量为blocksToProcess。在同一时刻,这个dn_ip都是一样的。

  • 每次迭代(在一段时间内循环)多次要求同一个dn拷贝blocksToProcess(c3prc-xiaomi=800个)blocks,而dn每次最多拷贝maxReplicationStreams

  • 也就是说NN做了很多无效工作,取的blocks都是同一个dn,当取到轮到下一个dn时,又是同样的问题。并不是每个dn都在同时工作。观察监控发现dn间歇性往外拷贝数据。

ReplicationMonitor每次取blocksToProcess个blocks的时候,这些blocks可能是同一个dn上,甚至同一个dn的同一个磁盘上。

因此,要分析每次的取法。目的是能取出不连续的blocks,能让不同dn,不同磁盘同时工作。

分析数据结构

和下线主要有关的代码集中在blockmanager,以及UnderReplicatedBlocks

/** * Store set of Blocks that need to be replicated 1 or more times. * We also store pending replication-orders. */public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();

所有需要做replicate的block都会放在blockmanager.neededReplications中。

UnderReplicatedBlocks是一个复合结构,保存者5个(LEVEL=5)带有优先级的队列:

private final List<LightWeightHashSet<Block>> priorityQueues

对于只有一个副本的block,或者replica都在decommision节点上,它在优先级最高的队列中,raid副本下线就是这种情况。

对于每一个优先级的队列,实现是LightWeightLinkedSet,它是一个有序的hashset,元素收尾相连。

UnderReplicatedBlocks的实现是保证每个队列的元素都会被取到,同时,每个队列中的元素按顺序依次被取出,不会让某些block永远没机会被取出。

具体做法是为每一个队列保存一个偏移:

private finalList<LightWeightHashSet<Block>> priorityQueues

如果取到最后一个队列(LEVEL-1)末尾了,就重置所有队列的偏移=0,从头再取。

这5个队列都是先进先被选,队尾进,而且优先级Level=0的更容易被取到。具体取的算法是在UnderReplicatedBlocks.chooseUnderReplicatedBlocks()中。

在进行decommission操作的时候,可能整个dn的块都是要加入neededReplication队列(raid集群如此,如果有3副本,那一个block有3个source.单副本的source dn只有一个)。这时候,加入某一个优先级队列(LightWeightLinkedSet)的blocks是有序的,而且连续上w个blocks属于同一个dn,甚至连续在同一个磁盘上。由于从队列是从头到尾顺序取,所以会有问题,尤其是对单副本的情况。

因此,我们想要随机从优先级队列中取出block.但又要保证每个block被取到,所以还是要有序的。

数据结构改造-ShuffleAddSet

实际上,这么做是合理的,先进入队列可以优先被取到。但对于我们这种场景,并不要求取出的顺序性和放入顺序一致。如果能打乱顺序,再取出就能使一次迭代取出的blocks尽可能在不同dn或者不同磁盘上。

LightWeightLinkedSet:UnderReplicatedBlocks默认采用的优先级队列的实现。本身是一个hashset继承LightWeightHashSet,同时元素双向连接,带有Head tail

LightWeightHashSet:轻量级hashSet

ShuffleAddSet:Look like LightWeightLinkedSet

我们实现了一种ShuffleAddSet继承LightWeightHashSet,尽可能表现和LightWeightLinkedSet一致,这样对外UnderReplicatedBlocks不需要做过多修改。

ShuffleAddSet中有两个队列,而对外表现为一个优先级队列。

第一个队列,同时也是Set和LightWeightLinkedSet是一样的,双向有序,外部调用方法取元素也是从这个Set取。

第二个队列,缓存队列cachedAddList,一开始用ArrayList、LinkedList,由于性能问题不使用了。现在也是用LightWeightHashSet,HashSet具有天然无序性质。

每当有新的元素加入,首先会放入cachedAddList中,随后当第一个队列数据空或者取到末尾,立即将cachedAddList数据shuffle,并拷贝到第一个队列中,然后清空自己,继续接收新元素。由于HashSet本身无序,因此少一步shuffle操作,直接从cachedAddList拷贝至第一个队列即可。

需要注意的是取数据(调用迭代器取)和add操作必须是同步的,因为取的时候第一个队列到达末尾或着空,会触发shuffle and add操作,清空cachedlist。

综上,第一个队列只有为空或者取到末尾的时候,会从第二个队里加入数据,如果都为空说明整个优先级队列空。每从cachedlist加入一批,这一批就是随机顺序,虽然第一个队列不是整个队列都随机打乱,总体上,第一个队列还是是乱序的。

这样做的问题:

(1)外部调用这个优先级队列的add操作,先进入队列的不一定是先被调度,后加入cachedList的元素,也可能排在第一个队列的前面先调度。

(2)极端情况,如果每次加入1个,然后再取1个元素,少量的元素,或者取出数量和频率远大于add数量,(比如cachedlist加入一个元素,第一个队列立刻到达末尾了)实际上没有达到随机效果。

好在我们的场景不要求FIFO,而且每次Decommision初始加入UnderReplicatedBlocks的block数量很大,ReplicationMonitor每次取/处理的数量blockToProcess,相对而言(下线8台节点,UnderReplicatedBlocks会达到180w)较小。发生shuffle and add不是很频繁,也不是性能瓶颈。观测到最长时间是200ms。

同时,我们将这个功能ShuffleAddSet作为一种可配置项目,UnderReplicatedBlocks可以在初始化时候选择用ShuffleAddSet或者LightWeightLinkedSet

dfs.namenode.blockmanagement.queues.shuffle

如下:

/** the queues themselves */private final List<LightWeightHashSet<Block>> priorityQueues = new ArrayList<LightWeightHashSet<Block>>();public static final String NAMENODE_BLOCKMANAGEMENT_QUEUES_SHUFFLE =    "dfs.namenode.blockmanagement.queues.shuffle";/** Stores the replication index for each priority */private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);/** Create an object. */UnderReplicatedBlocks() {  Configuration conf = new HdfsConfiguration(); boolean useShuffle = conf.getBoolean(NAMENODE_BLOCKMANAGEMENT_QUEUES_SHUFFLE, false); for (int i = 0; i < LEVEL; i++) {    if (useShuffle) {      priorityQueues.add(new ShuffleAddSet<Block>()); } else {      priorityQueues.add(new LightWeightLinkedSet<Block>()); }    priorityToReplIdx.put(i, 0); }}

性能问题

我们发现使用ShuffleAddSet时候,开始下线8台dn时会卡住,主要是卡在DecommissionManager$Monitor,每次要检查此dn上全部的blocks是否 underReplicated,这样blocks很多拿写锁的时间会很长。

也会检查ShuffleAddSet.contains(blocks),由于有两个队列,所以contain开销会比之前大。

2019-04-12,12:09:35,876 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Long read lock is held at 1555042175235. And released after 641 milliseconds.Call stack is:java.lang.Thread.getStackTrace(Thread.java:1479)org.apache.hadoop.util.StringUtils.getStackTrace(StringUtils.java:914)org.apache.hadoop.hdfs.server.namenode.FSNamesystemLock.checkAndLogLongReadLockDuration(FSNamesystemLock.java:104)org.apache.hadoop.hdfs.server.namenode.FSNamesystem.writeUnlock(FSNamesystem.java:1492)org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.isReplicationInProgress(BlockManager.java:3322)org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.checkDecommissionState(DatanodeManager.java:751)org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager$Monitor.check(DecommissionManager.java:93)org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager$Monitor.run(DecommissionManager.java:70)java.lang.Thread.run(Thread.java:662)

通过修改isReplicationInProgress方法,类似处理blockreport,每隔一定数量放一次锁的方式,缓解写锁时间太长导致其他rpc请求没有响应。

++processed;// Release lock per 5w blocks processed and has too many underReplicatedBlocks.if (processed == numReportBlocksPerIteration &&    namesystem.hasWriteLock() && underReplicatedBlocks > numReportBlocksPerIteration) {  namesystem.writeUnlock(); processed = 0; namesystem.writeLock();}

结论

对于单副本较多的集群,可采用如下方式下线:

dfs.namenode.blockmanagement.queues.shuffle= truedfs.namenode.replication.max-streams= 12 默认是2,限制一个datanode复制数量dfs.namenode.replication.max-streams-hard-limit=12 默认是4dfs.namenode.replication.work.multiplier.per.iteration= 4 默认2 / namenode一次调度的数量=该值×datanodes数量

开启shuffle and add,并调整单个dn最大复制数为物理磁盘数量,对于小集群可以调大work.multiplier一次处理4倍LiveDatanode数量block.使下线速度最大化。

注意的问题:

每次操作下线2台节点(refreshNodes),隔10分钟再下2台,一直到8台。同时下线的dn最好不要超过8台。不然DecommissionManager的开销会很大,影响NN正常服务。

本文首发于公众号“小米云技术”,点击阅读原文