Nodejs 大文件分片上传 实现

5,077 阅读2分钟

步骤:

  1. 前端:大文件进行切片文件名-下标-token
  2. 前端:控制并发的分片数量
  3. 前端:分片上传完毕,通知后端进行合并操作
  4. 后端:接收切片,将分片从缓存目录中取出,重命名文件名-下标-token
  5. 后端:合并操作,根据下标获取对应切片 ,通过stream 进行合并

前端

文件获取通过input 标签生成的File 对象

  • Blob 对象 表示原始数据,提供了slice方法

  • File 继承了Blob 的功能

HTML:只需要input file标签

<input type="file" name="f1" id="test">
<button id="upload_btn">确认上传</button>

JS:切割,控制并发,发送合并信息

//提交数据
const UPLOAD_URL = "http://localhost:8081/upload";

//监听btn,触发上传
document.querySelector("#upload_btn").addEventListener("click", () => {
  submitUpload(UPLOAD_URL, getElFile("input#test"));
});

//主体上传功能
async function submitUpload(url, file) {
  const CHUNKSIZE = 1 * 1024 * 1024; // 1M
  const TOKEN = Date.now();
  //切割数组
  const chunkList = sliceFile(file, CHUNKSIZE);
  //创建formdata 并上传
  console.log(file.name);
  let promiseList = createChunkPromiseList(chunkList, file.name, TOKEN);
  //并发控制 上传
  await createLimitPromise(2, promiseList);
  //合并分片
  let mergeFormData = new FormData();
  mergeFormData.append("type", "merge");
  mergeFormData.append("token", TOKEN);
  mergeFormData.append("chunkCount", chunkList.length);
  mergeFormData.append("fileName", file.name);
  //结束后发送合并请
  let res = await axios.post(url, mergeFormData);
  console.log(res);
}

辅助函数

//并发控制 
function createLimitPromise(limitNum, promiseListRaw) {
  let resArr = [];
  let handling = 0;
  let resolvedNum = 0;
  let promiseList = [...promiseListRaw]
  let runTime =  promiseListRaw.length

  return new Promise(resolve => {
    //并发执行limitNum 次
    for (let i = 1; i <= limitNum; i++) {
      run();
    }

    function run() {
       if(!promiseList.length) return 
        handling += 1;
        console.log("cur handling:" + handling)
        handle(promiseList.shift())
          .then(res => {
            resArr.push(res);
          })
          .catch(e => {
            //ignore
            console.log("catch error");
          })
          .finally(() => {
            handling -= 1;
            resolvedNum += 1;
            console.log(`resolvedNum : ${resolvedNum}`);
            if(resolvedNum === runTime){
              resolve(resArr)
            }
            run();
          });
    }
    function handle(promise) {
      return new Promise((resolve, reject) => {
        promise.then(res => resolve(res)).catch(e => reject(e));
      });
    }
  });
}

//分片二进制数据
function sliceFile(file, chunkSize) {
  let chunkList = [];
  let start = 0;
  let end = chunkSize;
  while (true) {
    let curChunk = file.slice(start, end);
    if (!curChunk.size) break;
    chunkList.push(curChunk);
    start += chunkSize;
    end = start + chunkSize;
  }
  return chunkList;
}

//获取HTML 中的file对象
function getElFile(selector) {
  return document.querySelector(selector).files[0];
}

//chunkList => formdata list => PromiseList
//切片数组 封装成 http 请求
function createChunkPromiseList(chunkList, name, TOKEN) {
  return chunkList
    .map((chunk, index) => {
      console.log(chunk);
      let formdata = new FormData();
      formdata.append("type", "upload");
      formdata.append("name", name);
      formdata.append("token", TOKEN);
      formdata.append("chunk", chunk);
      formdata.append("index", index);
      return formdata;
    })
    .map(formdata => {
      console.log(formdata.get("type"));
      return axios.post(UPLOAD_URL, formdata, axiosConfig);
    });
}

后端(基于koa)

注意:koa-bodyparser 不能处理formdata 数据,需要使用koa-body

const uploadChunkPath = path.resolve(__dirname,'../data')
app.use(KoaBody({
  multipart:true, // 支持文件上传 
  formidable: {
    //设置文件的默认保存目录,不设置则保存在系统临时目录下  os
    uploadDir: uploadChunkPath
  },
}))

基于Koa

//主体
router.post('/upload',ctx=>{
    if(ctx.request.body.type === 'merge'){
            const {token,chunkCount,fileName} = ctx.request.body
            mergeChunkFile(fileName,uploadChunkPath,chunkCount,token,'../data')    
            ctx.body =  'ok'
    }else if(ctx.request.body.type === 'upload'){
            const {index,token,name} = ctx.request.body
            const chunkFile = ctx.request.files.chunk
            const chunkName = chunkFile.path.split('/').pop()
            renameFile(uploadChunkPath,chunkName,`${name}-${index}-${token}`)
            ctx.body = 'upload chunk success'
    }else{
        ctx.body = "unkown type"
    }
})

merge 函数 (stream pipe)

/**
 *  
 * @param {String} fileName 生成文件的文件名
 * @param {String} chunkPath 缓存目录的路径
 * @param {String} fileToken 文件的token
 * @param {String} dataDir 可选,生成文件的相对路径
 * @returns {Boolean} 
 */
const mergeChunkFile = (fileName,chunkPath,chunkCount,fileToken,dataDir="./")=>{
    //如果chunkPath 不存在 则直接结束
    if(!fs.existsSync(chunkPath)) return 
    const dataPath = path.join(__dirname,dataDir,fileName);
    let writeStream = fs.createWriteStream(dataPath); 
    let mergedChunkNum = 0
    return mergeCore()
  	//闭包保存非递归数据
    function mergeCore(){
      	//结束标志为已合并数量大于总数(mergedChunkNum从0开始)
        if (mergedChunkNum >= chunkCount) return
        const curChunk = path.resolve(chunkPath,`${fileName}-${mergedChunkNum}-${fileToken}`)
        const curChunkReadStream = fs.createReadStream(curChunk);
        //将readStream 写入 writeStream
        curChunkReadStream.pipe(writeStream, { end: false }); //end = false 则可以连续给writeStream 写数据
        curChunkReadStream.on("end", () => {
            //readStream 传输结束 则 递归 进行下一个文件流的读写操作
            fs.unlinkSync(curChunk) //删除chunkFile
            mergedChunkNum += 1
            mergeCore();
        });
    }
}

辅助函数

//文件重命名
function renameFile(dir,oldName,newName){
    const oldPath = path.resolve(dir,oldName)
    const newPath = path.resolve(dir,newName)
    fs.renameSync(oldPath,newPath)
}

补充

进度条

  • ProgressEvent:这个接口用于测量 HTTP请求 进度的时间

    //XHR 
    xhr.onprogress=updateProgress;
    xhr.upload.onprogress = updateProgress;
      function updateProgress(event) {
        console.log(event);
        if (event.lengthComputable) {
          var completedPercent = (event.loaded / event.total * 100).toFixed(2);
          progressSpan.style.width= completedPercent+'%';
          progressSpan.innerHTML=completedPercent+'%';
          if(completedPercent>90){//进度条变色
            progressSpan.classList.add('green');
          }
          console.log('已上传',completedPercent);
        }
    }
    //axios
    
    const config = {
      ...
      onUploadProgress: progressEvent => {
        console.log(progressEvent);
      }
      ...
    }
    
    axios.post(url,data,config)
    
  • 大文件切片进度条,存在限制,所以采用 已加载切片/ 总切片 * 100 的方式实现进度条 文件越,切片越多,显示的越细致

    //从limitPromise 函数中获取
    let loadedLen = 0 
    let fileChunkLen = 0 
    //axios 配置
    const axiosConfig = {
      onUploadProgress: progressEvent => {
        const curPercent =( loadedLen / fileChunkLen *100).toFixed(2)
        console.log(` percentage :${curPercent}%`)
      }
    };
    

断点续传

  • 依赖服务端
    1. 服务端设置接口:获取当前缓存目录下的内容
    2. 前端进行比较,如果已经存在,则跳过切片
  • 依赖客户端
    1. 客户端在发送切片前,将切片打上标记spark-md5,将已经上传的切片信息保存在本地
    2. 在重新上传的时候,对比本地的标记 ,如果相同就跳过

参考

完整代码

github.com/shancw96/my…