HDFS写过程分析

1,512 阅读13分钟

简单之美 | HDFS 写文件过程分析

HDFS 是一个分布式文件系统,在 HDFS 上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在 HDFS 文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示:

AsDDFU.png

具体过程描述如下:

  1. Client 调用 DistributedFileSystem 对象的 create 方法,创建一个文件输出流(FSDataOutputStream)对象
  2. 通过 DistributedFileSystem 对象与 Hadoop 集群的 NameNode 进行一次 RPC 远程调用,在 HDFS 的 Namespace 中创建一个文件条目(Entry),该条目没有任何的 Block
  3. 通过 FSDataOutputStream 对象,向 DataNode 写入数据,数据首先被写入 FSDataOutputStream 对象内部的 Buffer 中,然后数据被分割成一个个 Packet 数据包
  4. 以 Packet 最小单位,基于 Socket 连接发送到按特定算法选择的 HDFS 集群中一组 DataNode(正常是 3 个,可能大于等于 1)中的一个节点上,在这组 DataNode 组成的 Pipeline 上依次传输 Packet
  5. 这组 DataNode 组成的 Pipeline 反方向上,发送 ack,最终由 Pipeline 中第一个 DataNode 节点将 Pipeline ack 发送给 Client
  6. 完成向文件写入数据,Client 在文件输出流(FSDataOutputStream)对象上调用 close 方法,关闭流
  7. 调用 DistributedFileSystem 对象的 complete 方法,通知 NameNode 文件写入成功

下面代码使用 Hadoop 的 API 来实现向 HDFS 的文件写入数据,同样也包括创建一个文件和写数据两个主要过程,代码如下所示:

     static String[] contents = new String[] {
          "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
          "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
          "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
          "dddddddddddddddddddddddddddddddd",
          "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
     };
    
     public static void main(String[] args) {
          String file = "hdfs://h1:8020/data/test/test.log";
        Path path = new Path(file);
        Configuration conf = new Configuration();
        FileSystem fs = null;
        FSDataOutputStream output = null;
        try {
               fs = path.getFileSystem(conf);
               output = fs.create(path); // 创建文件
               for(String line : contents) { // 写入数据
                    output.write(line.getBytes("UTF-8"));
                    output.flush();
               }
          } catch (IOException e) {
               e.printStackTrace();
          } finally {
               try {
                    output.close();
               } catch (IOException e) {
                    e.printStackTrace();
               }
          }
     }

结合上面的示例代码,我们先从 fs.create(path); 开始,可以看到 FileSystem 的实现 DistributedFileSystem 中给出了最终返回 FSDataOutputStream 对象的抽象逻辑,代码如下所示:

  public FSDataOutputStream create(Path f, FsPermission permission,
    boolean overwrite,
    int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {

    statistics.incrementWriteOps(1);
    return new FSDataOutputStream
       (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
  }

上面,DFSClient dfs 的 create 方法中创建了一个 OutputStream 对象,在 DFSClient 的 create 方法:

  public OutputStream create(String src,
                             FsPermission permission,
                             boolean overwrite,
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize
                             ) throws IOException {
   ... ...
}

创建了一个 DFSOutputStream 对象,如下所示:

    final DFSOutputStream result = new DFSOutputStream(src, masked,
        overwrite, createParent, replication, blockSize, progress, buffersize,
        conf.getInt("io.bytes.per.checksum", 512));

下面,我们从 DFSOutputStream 类开始,说明其内部实现原理。

DFSOutputStream 内部原理

打开一个 DFSOutputStream 流,Client 会写数据到流内部的一个缓冲区中,然后数据被分解成多个 Packet,每个 Packet 大小为 64k 字节,每个 Packet 又由一组 chunk 和这组 chunk 对应的 checksum 数据组成,默认 chunk 大小为 512 字节,每个 checksum 是对 512 字节数据计算的校验和数据。 当 Client 写入的字节流数据达到一个 Packet 的长度,这个 Packet 会被构建出来,然后会被放到队列 dataQueue 中,接着 DataStreamer 线程会不断地从 dataQueue 队列中取出 Packet,发送到复制 Pipeline 中的第一个 DataNode 上,并将该 Packet 从 dataQueue 队列中移到 ackQueue 队列中。ResponseProcessor 线程接收从 Datanode 发送过来的 ack,如果是一个成功的 ack,表示复制 Pipeline 中的所有 Datanode 都已经接收到这个 Packet,ResponseProcessor 线程将 packet 从队列 ackQueue 中删除。 在发送过程中,如果发生错误,所有未完成的 Packet 都会从 ackQueue 队列中移除掉,然后重新创建一个新的 Pipeline,排除掉出错的那些 DataNode 节点,接着 DataStreamer 线程继续从 dataQueue 队列中发送 Packet。 下面是 DFSOutputStream 的结构及其原理,如图所示:

AsD2O1.png

我们从下面 3 个方面来描述内部流程:

  • 创建 Packet

Client 写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个 Chunk 大小(512B)时,便会创建一个 Packet 对象,然后向该 Packet 对象中写 Chunk Checksum 校验和数据,以及实际数据块 Chunk Data,校验和数据是基于实际数据块计算得到的。每次满足一个 Chunk 大小时,都会向 Packet 中写上述数据内容,直到达到一个 Packet 对象大小(64K),就会将该 Packet 对象放入到 dataQueue 队列中,等待 DataStreamer 线程取出并发送到 DataNode 节点。

  • 发送 Packet

DataStreamer 线程从 dataQueue 队列中取出 Packet 对象,放到 ackQueue 队列中,然后向 DataNode 节点发送这个 Packet 对象所对应的数据。

  • 接收 ack

发送一个 Packet 数据包以后,会有一个用来接收 ack 的 ResponseProcessor 线程,如果收到成功的 ack,则表示一个 Packet 发送成功。如果成功,则 ResponseProcessor 线程会将 ackQueue 队列中对应的 Packet 删除。

DFSOutputStream 初始化

首先看一下,DFSOutputStream 的初始化过程,构造方法如下所示:

    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
        boolean createParent, short replication, long blockSize, Progressable progress,
        int buffersize, int bytesPerChecksum) throws IOException {
      this(src, blockSize, progress, bytesPerChecksum, replication);

      computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默认 writePacketSize=64*1024(即64K),bytesPerChecksum=512(没512个字节计算一个校验和),

      try {
        if (createParent) { // createParent为true表示,如果待创建的文件的父级目录不存在,则自动创建
          namenode.create(src, masked, clientName, overwrite, replication, blockSize);
        } else {
          namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
        }
      } catch(RemoteException re) {
        throw re.unwrapRemoteException(AccessControlException.class,
                                       FileAlreadyExistsException.class,
                                       FileNotFoundException.class,
                                       NSQuotaExceededException.class,
                                       DSQuotaExceededException.class);
      }
      streamer.start(); // 启动一个DataStreamer线程,用来将写入的字节流打包成packet,然后发送到对应的Datanode节点上
    }
上面computePacketChunkSize方法计算了一个packet的相关参数,我们结合代码来查看,如下所示:
      int chunkSize = csize + checksum.getChecksumSize();
      int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
      chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
      packetSize = n + chunkSize*chunksPerPacket;

我们用默认的参数值替换上面的参数,得到:

      int chunkSize = 512 + 4;
      int n = 21 + 4;
      chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1);  // 127
      packetSize = 25 + 516*127;

上面对应的参数,说明如下表所示:

参数名称 参数值 参数含义
chunkSize 512+4=516 每个 chunk 的字节数(数据 + 校验和)
csize 512 每个 chunk 数据的字节数
psize 64*1024 每个 packet 的最大字节数(不包含 header)
DataNode.PKT_HEADER_LEN 21 每个 packet 的 header 的字节数
chunksPerPacket 127 组成每个 packet 的 chunk 的个数
packetSize 25+516*127=65557 每个 packet 的字节数(一个 header + 一组 chunk)

在计算好一个 packet 相关的参数以后,调用 create 方法与 Namenode 进行 RPC 请求,请求创建文件:

        if (createParent) { // createParent为true表示,如果待创建的文件的父级目录不存在,则自动创建
          namenode.create(src, masked, clientName, overwrite, replication, blockSize);
        } else {
          namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
        }

远程调用上面方法,会在 FSNamesystem 中创建对应的文件路径,并初始化与该创建的文件相关的一些信息,如租约(向 Datanode 节点写数据的凭据)。文件在 FSNamesystem 中创建成功,就要初始化并启动一个 DataStreamer 线程,用来向 Datanode 写数据,后面我们详细说明具体处理逻辑。

Packet 结构与定义

Client 向 HDFS 写数据,数据会被组装成 Packet,然后发送到 Datanode 节点。Packet 分为两类,一类是实际数据包,另一类是 heatbeat 包。一个 Packet 数据包的组成结构,如图所示:

AsDIYD.png

上图中,一个 Packet 是由 Header 和 Data 两部分组成,其中 Header 部分包含了一个 Packet 的概要属性信息,如下表所示:

字段名称 字段类型 字段长度 字段含义
pktLen int 4 4 + dataLen + checksumLen
offsetInBlock long 8 Packet 在 Block 中偏移量
seqNo long 8 Packet 序列号,在同一个 Block 唯一
lastPacketInBlock boolean 1 是否是一个 Block 的最后一个 Packet
dataLen int 4 dataPos – dataStart,不包含 Header 和 Checksum 的长度

Data 部分是一个 Packet 的实际数据部分,主要包括一个 4 字节校验和(Checksum)与一个 Chunk 部分,Chunk 部分最大为 512 字节。 在构建一个 Packet 的过程中,首先将字节流数据写入一个 buffer 缓冲区中,也就是从偏移量为 25 的位置(checksumStart)开始写 Packet 数据的 Chunk Checksum 部分,从偏移量为 533 的位置(dataStart)开始写 Packet 数据的 Chunk Data 部分,直到一个 Packet 创建完成为止。如果一个 Packet 的大小未能达到最大长度,也就是上图对应的缓冲区中,Chunk Checksum 与 Chunk Data 之间还保留了一段未被写过的缓冲区位置,这种情况说明,已经在写一个文件的最后一个 Block 的最后一个 Packet。在发送这个 Packet 之前,会检查 Chunksum 与 Chunk Data 之间的缓冲区是否为空白缓冲区(gap),如果有则将 Chunk Data 部分向前移动,使得 Chunk Data 1 与 Chunk Checksum N 相邻,然后才会被发送到 DataNode 节点。 我们看一下 Packet 对应的 Packet 类定义,定义了如下一些字段:

      ByteBuffer buffer;           // only one of buf and buffer is non-null
      byte[]  buf;
      long    seqno;               // sequencenumber of buffer in block
      long    offsetInBlock;       // 该packet在block中的偏移量
      boolean lastPacketInBlock;   // is this the last packet in block?
      int     numChunks;           // number of chunks currently in packet
      int     maxChunks;           // 一个packet中包含的chunk的个数
      int     dataStart;
      int     dataPos;
      int     checksumStart;
      int     checksumPos; 

Packet 类有一个默认的没有参数的构造方法,它是用来做 heatbeat 的,如下所示:

      Packet() {
        this.lastPacketInBlock = false;
        this.numChunks = 0;
        this.offsetInBlock = 0;
        this.seqno = HEART_BEAT_SEQNO; // 值为-1

        buffer = null;
        int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25
        buf = new byte[packetSize];

        checksumStart = dataStart = packetSize;
        checksumPos = checksumStart;
        dataPos = dataStart;
        maxChunks = 0;
      }

通过代码可以看到,一个 heatbeat 的内容,实际上只有一个长度为 25 字节的 header 数据。通过 this.seqno = HEART_BEAT_SEQNO; 的值可以判断一个 packet 是否是 heatbeat 包,如果 seqno 为 - 1 表示这是一个 heatbeat 包。

Client 发送 Packet 数据

可以 DFSClient 类中看到,发送一个 Packet 之前,首先需要向选定的 DataNode 发送一个 Header 数据包,表明要向 DataNode 写数据,该 Header 的数据结构,如图所示:

AsDqOI.png

上图显示的是 Client 发送 Packet 到第一个 DataNode 节点的 Header 数据结构,主要包括待发送的 Packet 所在的 Block(先向 NameNode 分配 Block ID 等信息)的相关信息、Pipeline 中另外 2 个 DataNode 的信息、访问令牌(Access Token)和校验和信息,Header 中各个字段及其类型,详见下表:

字段名称 字段类型 字段长度 字段含义
Transfer Version short 2 Client 与 DataNode 之间数据传输版本号,由常量 DataTransferProtocol.DATA_TRANSFER_VERSION 定义,值为 17
OP int 4 操作类型,由常量 DataTransferProtocol.OP_WRITE_BLOCK 定义,值为 80
blkId long 8 Block 的 ID 值,由 NameNode 分配
GS long 8 时间戳(Generation Stamp),NameNode 分配 blkId 的时候生成的时间戳
DNCnt int 4 DataNode 复制 Pipeline 中 DataNode 节点的数量
Recovery Flag boolean 1 Recover 标志
Client Text Client 主机的名称,在使用 Text 进行序列化的时候,实际包含长度 len 与主机名称字符串 ClientHost
srcNode boolean 1 是否发送 src node 的信息,默认值为 false,不发送 src node 的信息
nonSrcDNCnt int 4 由 Client 写的该 Header 数据,该数不包含 Pipeline 中第一个节点(即为 DNCnt-1)
DN2 DatanodeInfo DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
DN3 DatanodeInfo DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
Access Token Token 访问令牌信息,包括 IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service
CheckSum Header DataChecksum 1+4 校验和 Header 信息,包括 type、bytesPerChecksum

Header 数据包发送成功,Client 会收到一个成功响应码(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着将 Packet 数据发送到 Pipeline 中第一个 DataNode 上,如下所示:

           Packet one = null;
          one = dataQueue.getFirst(); // regular data packet
          ByteBuffer buf = one.getBuffer();
          // write out data to remote datanode
          blockStream.write(buf.array(), buf.position(), buf.remaining());

          if (one.lastPacketInBlock) { // 如果是Block中的最后一个Packet,还要写入一个0标识该Block已经写入完成
              blockStream.writeInt(0); // indicate end-of-block
          }

否则,如果失败,则会与 NameNode 进行 RPC 调用,删除该 Block,并把该 Pipeline 中第一个 DataNode 加入到 excludedNodes 列表中,代码如下所示:

        if (!success) {
          LOG.info("Abandoning " + block);
          namenode.abandonBlock(block, src, clientName);

          if (errorIndex < nodes.length) {
            LOG.info("Excluding datanode " + nodes[errorIndex]);
            excludedNodes.add(nodes[errorIndex]);
          }

          // Connection failed.  Let's wait a little bit and retry
          retry = true;
        }

DataNode 端服务组件

数据最终会发送到 DataNode 节点上,在一个 DataNode 上,数据在各个组件之间流动,流程如下图所示:

Asr9pQ.png

DataNode 服务中创建一个后台线程 DataXceiverServer,它是一个 SocketServer,用来接收来自 Client(或者 DataNode Pipeline 中的非最后一个 DataNode 节点)的写数据请求,然后在 DataXceiverServer 中将连接过来的 Socket 直接派发给一个独立的后台线程 DataXceiver 进行处理。所以,Client 写数据时连接一个 DataNode Pipeline 的结构,实际流程如图所示:

Asrkmq.png

每个 DataNode 服务中的 DataXceiver 后台线程接收到来自前一个节点(Client/DataNode)的 Socket 连接,首先读取 Header 数据:

    Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong());
    LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress);
    int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
    boolean isRecovery = in.readBoolean(); // is this part of recovery?
    String client = Text.readString(in); // working on behalf of this client
    boolean hasSrcDataNode = in.readBoolean(); // is src node info present
    if (hasSrcDataNode) {
      srcDataNode = new DatanodeInfo();
      srcDataNode.readFields(in);
    }
    int numTargets = in.readInt();
    if (numTargets < 0) {
      throw new IOException("Mislabelled incoming datastream.");
    }
    DatanodeInfo targets[] = new DatanodeInfo[numTargets];
    for (int i = 0; i < targets.length; i++) {
      DatanodeInfo tmp = new DatanodeInfo();
      tmp.readFields(in);
      targets[i] = tmp;
    }
    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
    accessToken.readFields(in);

上面代码中,读取 Header 的数据,与前一个 Client/DataNode 写入 Header 字段的顺序相对应,不再累述。在完成读取 Header 数据后,当前 DataNode 会首先将 Header 数据再发送到 Pipeline 中下一个 DataNode 结点,当然该 DataNode 肯定不是 Pipeline 中最后一个 DataNode 节点。接着,该 DataNode 会接收来自前一个 Client/DataNode 节点发送的 Packet 数据,接收 Packet 数据的逻辑实际上在 BlockReceiver 中完成,包括将来自前一个 Client/DataNode 节点发送的 Packet 数据写入本地磁盘。在 BlockReceiver 中,首先会将接收到的 Packet 数据发送写入到 Pipeline 中下一个 DataNode 节点,然后再将接收到的数据写入到本地磁盘的 Block 文件中。

DataNode 持久化 Packet 数据

在 DataNode 节点的 BlockReceiver 中进行 Packet 数据的持久化,一个 Packet 是一个 Block 中一个数据分组,我们首先看一下,一个 Block 在持久化到磁盘上的物理存储结构,如下图所示:

AsrmhF.png

每个 Block 文件(如上图中 blk_1084013198 文件)都对应一个 meta 文件(如上图中 blk_1084013198_10273532.meta 文件),Block 文件是一个一个 Chunk 的二进制数据(每个 Chunk 的大小是 512 字节),而 meta 文件是与每一个 Chunk 对应的 Checksum 数据,是序列化形式存储。

写文件过程中 Client/DataNode 与 NameNode 进行 RPC 调用

Client 在 HDFS 文件系统中写文件过程中,会发生多次与 NameNode 节点进行 RPC 调用来完成写数据相关操作,主要是在如下时机进行 RPC 调用:

  • 写文件开始时创建文件:Client 调用 create 在 NameNode 节点的 Namespace 中创建一个标识该文件的条目
  • 在 Client 连接 Pipeline 中第一个 DataNode 节点之前,Client 调用 addBlock 分配一个 Block(blkId+DataNode 列表 + 租约)
  • 如果与 Pipeline 中第一个 DataNode 节点连接失败,Client 调用 abandonBlock 放弃一个已经分配的 Block
  • 一个 Block 已经写入到 DataNode 节点磁盘,Client 调用 fsync 让 NameNode 持久化 Block 的位置信息数据
  • 文件写完以后,Client 调用 complete 方法通知 NameNode 写入文件成功
  • DataNode 节点接收到并成功持久化一个 Block 的数据后,DataNode 调用 blockReceived 方法通知 NameNode 已经接收到 Block

具体 RPC 调用的详细过程,可以参考源码。