大文件上传

2,987 阅读7分钟

背景

最近由于业务需求, 需要上传大文件, 为了让用户上传起来更快速, 不禁想起最近比较火的大文件上传, 于是不如实践一下, 写出符合业务需求的大文件上传, 不仅样式可以自定义, 各种功能细节点都在自己掌握之中, 本文使用 Vue  框架来搭建, 在 Element  框架基础之上来写样式, 后端采用 Egg.js  来提供接口. 感兴趣的可以继续看下去.

效果

先看下最终效果, 感兴趣的话可以继续看下去

大文件上传.mp4 (13.52MB)

思路

主要功能点

本文主要是实现了: 前端文件切片上传、断点续传、秒传; 后端文件合并.

前端实现主要思路

  • 切片逻辑: 前端给文件 blob  进行 slice  切割成切片

  • 计算 hash : 前端用插件 spark-md5  计算出该文件 hash  值 (根据文件内容生成的 hash , 文件内容不变,则 hash 变化)

  • 并发请求 : 分别对这几个片 promise.all  发起上传请求,  待所有片上传成功之后, 在发起一个请求告知后端可以对分片进行合并.

  • 断点续传: 为了使用户刷新页面时, 之前老文件上传记录回保留, 故在每次分片上传之前, 会请求后端接口查看当前文件已经上传过的记录, 已经上传的不用在上传, 上传过部分的会返回已上传的大小, 前端根据返回大小再次切割分片.

后端实现主要思路

  • 接收并保存: 接受前端传递的文件分片, 按照文件名将分片文件放到一起.
  • 合并分片: 当前端调接口表明要合并某个文件, 后端接收文件名, 将该文件下的分片排序后合并.
  • 老文件记录: 提供已经上传过的分片记录

可能想问的问题

  • 为什么计算 hash , 可以不计算么?
    • 计算后文件名根据内容是唯一的, 若不计算, 后期文件内容发生改变,文件名不变, 再次上传时候, 后端会以为要断点续传, 从而导致旧上传的资源得不到替换.
  • 为什么要分片 ?
    • 分片之后,每个请求资源会变小, 通过 primise.all  可以同时 http 并发进行请求, 提高了上传速度
  • 秒传如何实现?
    • 后端可以获取上传过的文件记录信息, 发现前端请求文件已经上传过之后, 会直接返回上传成功.
  • 我们需要知道的
    • 大文件上传, 是在前端进行文件切片, 在后端将文件进行的合并.

前端实现

切片逻辑

  • 核心是对文件资源的 slice  切割, 并存放到数组中
createChunks(file) {
  let current = 0;
  const partList = [];
  while (current < file.size) {
    // 核心是对文件的 slice 
    const chunk = file.raw.slice(current, current + this.partSize); // el-upload 原始文件会放到: file.raw 对象中
    partList.push({ chunk, size: chunk.size });
    current += this.partSize; // this.partSize 定位每片大小
  }
  return partList;
},

计算 hash

通过 new Worker 打开子进程,  将最终计算的 hash 返回

calculateHash(partList, fileIndex) {
  this.fileList[fileIndex].state = PART;
  return new Promise((resolve) => {
    const worker = new Worker('/hash.js');
    worker.postMessage({ partList });
    worker.onmessage = (event) => { // 监听子进程 传过来的数据
      const { percent, hash } = event.data;
      this.setHashPercent(percent, fileIndex); // 修改滚动条
      if (hash) {
        resolve(hash);
      }
    };
  });
},
self.importScripts('https://cdn.bootcss.com/spark-md5/3.0.0/spark-md5.js');
self.onmessage = async (event) => {
  var { partList } = event.data;
  const spark = new self.SparkMD5.ArrayBuffer();
  var percent = 0;
  var size = 100 / partList.length;
  var buffers = await Promise.all(partList.map(({ chunk }) => new Promise((resolve) => {
      const reader = new FileReader();
      reader.readAsArrayBuffer(chunk);
      reader.onload = (event) => {
          percent += size;
	        // 修改滚动条
          self.postMessage({ percent: Number(percent.toFixed(2)) });
          resolve(event.target.result);
      }
  })));
  buffers.forEach(buffer => spark.append(buffer));
  self.postMessage({ percent: 100, hash: spark.end() });
  self.close();
}

并发请求

并发、进度条

createRequests(partList) {
  return partList.map((part) => {
    const formData = new FormData();
    // part.chunk 每个切片的真正内容
    formData.append(part.filename, part.chunk.slice(part.loaded), part.filename);
    return request({
      url: `${this.action.partUpload}/${part.filename}/${part.chunk_name}/${part.loaded}`,
      method: 'POST',
      data: formData, // 这里FormData 格式
      setXHR: (xhr) => {
        part.xhr = xhr;
      },
      // 进度条 核心 event.loaded 返回已经上传过的文件大小
      onProgress: (event) => {
        const percent = parseInt(Number((Number(part.loaded + event.loaded) / part.chunk.size * 100)), 10);
        part.percent = percent > 100 ? 100 : percent;
      },
    });
  });
},
async uploadParts(partList, filename, fileIndex) {
  if (!filename) return;
  const requests = this.createRequests(partList);// 获取切片请求
  await Promise.all(requests);// 同时请求
},

断点续传、秒传

每次上传分片前先获取已经上传文件的列表, 将列表与本地列表进行筛选, 已经上传的移除, 上传一部分的 通过下面 22 行代码 将资源截取.

createRequests(partList, uploadedList) {
  const partLists = partList.filter((part) => {
    const uploadedFile = uploadedList.find((item) => item.filename === part.chunk_name);
    if (!uploadedFile) { // 未上传过的
      part.loaded = 0;
      part.percent = 0;
      return true;
    }
    if (uploadedFile.size < part.chunk.size) { // 上传过一部分的
      part.loaded = uploadedFile.size; // 后端返回已上传过文件的大小
      part.percent = parseInt(Number((Number(part.loaded) / part.chunk.size * 100)), 10);
      return true;
    }
    if (uploadedFile.size >= part.chunk.size) {
      part.percent = 100;
      return false;
    }
    return false;
  });
  return partLists.map((part) => {
    const formData = new FormData();
    formData.append(part.filename, part.chunk.slice(part.loaded), part.filename);
    return request({
      url: `${this.action.partUpload}/${part.filename}/${part.chunk_name}/${part.loaded}`,
      method: 'POST',
      data: formData,
      setXHR: (xhr) => {
        part.xhr = xhr;
      },
      onProgress: (event) => {
        const percent = parseInt(Number((Number(part.loaded + event.loaded) / part.chunk.size * 100)), 10);
        part.percent = percent > 100 ? 100 : percent;
      },
    });
  });
},
async uploadParts(partList, filename, fileIndex) {
  if (!filename) return;
  // 先向服务器获取已上传过的文件列表
  const { needUpload = true, uploadedList = [] } = await this.getUploadedList(filename);
  if (!needUpload) { // 表示该文件已经上传完整, 不用在上传.
    partList.forEach((item) => {
      item.percent = 100;
    });
    this.handleSuccess(fileIndex, true);
    return;
  }
  const requests = this.createRequests(partList, uploadedList);
  await Promise.all(requests);
  const res = await request({
    url: this.action.upload,
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    data: JSON.stringify({ filename }),
  });
  if (res.success) {
    this.handleSuccess(fileIndex);
    return;
  }
  this.$message.error('发生未知异常错误,请重新上传!');
},

暂停、继续

到这里也该交代一下, fileList partList  两个重要变量的含义, 下图是具体信息, 在 el-upload  的 fileList  扩展了一些参数.

image.png

// 继续
async resumeItem(index) {
  const { partList = [], hashName } = this.fileList[index];
  this.fileList[index].state = UPLOADING;
  this.uploadParts(partList || [], hashName, index);
},
// 暂停 xhr.abort 核心 将每个请求 xhr 保存到 partList 中
pauseItem(index) {
  const { partList = [] } = this.fileList[index];
  this.fileList[index].state = PAUSE;
  partList.forEach((part) => part.xhr && part.xhr.abort());
},
演示一下暂停与继续

秒传.mp4 (16.81MB)

后端实现

Egg,js 配置起来方便, 需要提供给前端三个接口

接收并保存

import { Controller, Context } from 'egg'
import { resolve } from 'path'
import { createWriteStream, createReadStream } from 'fs'
import { pathExists } from 'fs-extra'
// tslint:disable-next-line: no-var-requires
const fs = require('fs').promises
import { write } from 'await-stream-ready'
import sendToWormhole from 'stream-wormhole'

const TEMP_DIR = 'temp'
const PUBLIC_DIR = 'file'
export default class FileController extends Controller {
	public async partUpload(ctx: Context) {
    const { filename = '', chunkName = '', start = 0 } = ctx.params
    const filePath = resolve(ctx.app.config.static.dir as string, TEMP_DIR, filename)
    const existFile = await pathExists(filePath)
    if (!existFile) {
      try {
        await fs.mkdir(filePath)
      } catch (err) {
        console.log(err)
      }
    }
    const stream = await ctx.getFileStream()
    const target = resolve(filePath, chunkName)
    // start 表示从何处开始写, 断点续传后端核心
    const writeStream = createWriteStream(target, { start: ~~start, flags: 'a' })
    try {
        await write(stream.pipe(writeStream))
    } catch (err) {
        await sendToWormhole(stream)
        throw err
    }
    ctx.body = {
      code: 200,
      success: true,
    }
    ctx.status = 200
  }
	//  .....
}

合并分片

public async upload(ctx: Context) {
  const { filename } = ctx.request.body
  await mergeChunks(filename, ctx)
  ctx.body = {
    code: 200,
    success: true,
  }
  ctx.status = 200
}

const pipeStream = (filePath: string, writeStream) => new Promise(resolve => {
  const readStream = createReadStream(filePath)
  readStream.on('end', async () => {
      await fs.unlink(filePath) // 结束删掉 切片
      resolve()
  })
  readStream.pipe(writeStream) // 写入
})
const SIZE = 1024 * 1024 * 10
// 合并逻辑
const mergeChunks = async (filename: string, ctx) => {
  const filePath = resolve(ctx.app.config.static.dir as string, PUBLIC_DIR, filename)
  const chunksDir = resolve(ctx.app.config.static.dir as string, TEMP_DIR, filename) // 切片资源路径
  const chunkFiles = await fs.readdir(chunksDir) // 读切片资源文件
  chunkFiles.sort((a, b) => Number(a.split('-')[1]) - Number(b.split('-')[1])) // 排序
  await Promise.all(
    chunkFiles.map((chunkFile, index) => pipeStream( // 写入流
        resolve(chunksDir, chunkFile),
        createWriteStream(filePath, {
            start: index * SIZE,
        }),
    )),
  )
  await fs.rmdir(chunksDir)
}

老文件记录

public async getUploadedList(ctx: Context) {
  const { filename } = ctx.request.body
  const filePath = resolve(ctx.app.config.static.dir as string, PUBLIC_DIR, filename) // 查看资源文件是否存在当前文件
  const existFile = await pathExists(filePath)
  if (existFile) {
    ctx.body = {
      code: 200,
      needUpload: false, // 存在,则不需要再次上传, 秒传的配合
    }
    return
  }
  // 查看该文件切片存在哪些
  const tempFilePath = resolve(ctx.app.config.static.dir as string, TEMP_DIR, filename) 
  let uploadedList: any[] = []
  const existTemporaryFile = await pathExists(tempFilePath)
  if (existTemporaryFile) {
    uploadedList = await fs.readdir(tempFilePath)
    uploadedList = await Promise.all(uploadedList.map(async (filename: string) => {
      const stat = await fs.stat(resolve(tempFilePath, filename))
      return {
          filename,
          size: stat.size, // 切片大小需要返回前端
      }
	  }))
  }
  ctx.body = {
    code: 200,
    needUpload: true,
    uploadedList,
  }
 }

优化

可以发现最上面视频, 1G多点文件, 虽然用了 new Worker 还是有些卡顿的, 还需要进一步优化, 如最后参考文中所描述几个点: 时间切片计算文件 hash, 抽样hash 命中率的同时,提升效率, 分片上传失败重试 等.

参考