WebWorker 手动实现分片上传七牛云存储

395 阅读4分钟

分片上传是个很常见的功能,而七牛官方的qiniu-js很多功能用不到且有些冗余(有些包已经deprecated),所以自己实现一个精简的队列分片上传,借助WebWorker也能很好的避免页面卡顿等问题。

创建分片

首先是创建分片函数,如果有上传后校验数据的需求可以加MD5,并非必须。

  • createChunk.ts
import SparkMD5 from 'spark-md5';

export interface ChunkData {
  start: number;
  end: number;
  index: number;
  md5: string;
  data: Blob;
}

export async function createChunk(file: File, index: number, chunkSize: number): Promise<ChunkData> {
  return new Promise((resolve) => {
    const spark = new SparkMD5.ArrayBuffer();
    const start = index * chunkSize;
    const end = start + chunkSize;
    const fileReader = new FileReader();
    fileReader.onload = (e) => {
      const data = new Blob([e.target.result]);
      spark.append(e.target.result);
      const md5 = spark.end();
      resolve({
        start,
        end,
        index,
        md5,
        data,
      });
    };
    fileReader.readAsArrayBuffer(file.slice(start, end));
  });
}

实现 WebWorker

Worker实现比较简单, 解构发送过来的数据并调用createChunk即可,每个Worker都可能会分到 1 个以上的任务,直接Promise.all

  • worker.js
import { createChunk } from './createChunk';

onmessage = async (e) => {
  const { file, chunkSize, startIndex, endIndex } = e.data;
  const proms = [];
  for (let i = startIndex; i < endIndex; i++) {
    proms.push(createChunk(file, i, chunkSize));
  }
  const chunks = await Promise.all(proms);
  postMessage(chunks);
};

实现分片

最后是主要的分片函数,分配任务和回调,分片大小设置为5MB,Worker数量为当前电脑的线程数量。

注意:如果你打包的js文件是放在另一个域名下的,可以通过vite的inline方式引入,或者是借助Blob中importScripts来引入以避免跨域问题,这里以vite为例。

  • splitFile.ts
import { ChunkData } from '@/media/components/upload/createChunk';
import MyWorker from './worker?worker&inline';

const chunkSize = 1024 * 1024 * 5;
const threadCount = navigator.hardwareConcurrency || 4;

/**
 * 分片上传
 * 传递一个回调函数,每当worker完成时就上传
 * 全部分片完成后返回result数组,如果有部分上传失败则可按需检验并重新上传
 *
 * @param file
 * @param callback
 */
export async function splitFile(file: File, callback: (data: ChunkData) => void): Promise<ChunkData[]> {
  return new Promise((resolve) => {
    const result: ChunkData[] = [];
    // 共有几个分片/任务
    const chunkCount = Math.ceil(file.size / chunkSize);
    // 每个线程分到的任务数量
    const workerChunkCount = Math.ceil(chunkCount / threadCount);
    let finishCount = 0;
    for (let i = 0; i < threadCount; i++) {
      // 有跨域问题的通过 vite 的 inline 引入,否则可以直接 URL 引入
      // new Worker(new URL('./worker.js', import.meta.url), { type: 'module' })
      const worker = new MyWorker();
      // 每个线程执行的任务范围
      const startIndex = i * workerChunkCount;
      const endIndex = startIndex + workerChunkCount > chunkCount ? chunkCount : startIndex + workerChunkCount;
      worker.postMessage({
        file,
        chunkSize,
        startIndex,
        endIndex,
      });
      worker.onmessage = (e) => {
        for (let i = startIndex; i < endIndex; i++) {
          result[i] = e.data[i - startIndex];
          // 直接上传分片
          callback(e.data[i - startIndex]);
        }
        worker.terminate();
        finishCount++;
        // 全部完成
        if (finishCount >= threadCount) {
          resolve(result);
        }
      };
    }
  });
}

队列任务

实现一个简易队列,其中一个出错也不会影响下一个任务,只不过这是单通道的

const runTask = (() => {
  let pending = Promise.resolve();
  const run = async (res) => {
    await pending.catch((e) => e).finally(() => upload(res));
  }
  return (res) => (pending = run(res))
})();

// res为splitFile的回调参数
runTask(res).then();

或者使用一种并发队列,核心思路是通过一个函数增加任务到队列中,并尝试执行任务(注意任务执行的条件),如果符合条件就从队首取出任务并执行,等执行完了后再次调用执行函数形成递归。如果某个任务出错则会通过reject弹出,同时finally会确保该通道会继续执行下个任务,实现并发队列效果。

  • useQueue.ts
export function useQueue(maxCount = 4) {
  const queue = [];
  let count = 0;

  function addTask(task) {
    return new Promise((resolve, reject) => {
      queue.push({ task, resolve, reject });
      runTask();
    });
  }

  async function runTask() {
    if (count < maxCount && queue.length > 0) {
      const { task, resolve, reject } = queue.shift();
      count++;
      try {
        const result = await task();
        resolve(result);
      } catch (err) {
        reject(err);
      } finally {
        count--;
        runTask();
      }
    }
  }

  return {
    addTask,
  };
}

应用

核心功能已经基本完成了,接下来就是常规的业务逻辑,以上传视频文件为例,其中bucketNamefilekey替换为你自己的,上传token也是从自行服务端获取,调用方式各自为例。

import axios from 'axios';
import { splitFile } from './splitFile';
import { useQueue } from './useQueue';

const http = axios.create();

export default {
  data() {
    return {
      uploadDomain: 'https://upload.qiniup.com',
      percent: 0,
      videoDuration: 0,
      bucketName: 'asdf-1', // 七牛云对象存储名
      filekey: 'upload/2023-09-26/test.mp4', // 七牛云的路径
      fileChunks: [],
      uploadResults: [],
    }
  },
  methods: {
    async onUpload(file: File) {
      const queue = useQueue();
      const token = await this.getUploadToken();
      this.videoDuration = await this.getVideoDuration();
      const { uploadId } = await this.getUploadId(this.bucketName, this.filekey, token);
      // 单个上传任务
      const upload = async (res) => {
        const result = await this.doUpload(this.bucketName, this.filekey, uploadId, res.index + 1, token, res.data);
        this.uploadResults.push({
          ...result,
          partNumber: res.index + 1,
        });
        await this.checkComplete(bucketName, uploadId, token);
      }
      // 注意此处没有处理错误,可以自行扩展
      this.fileChunks = await splitFile(file, (res) => {
        // console.log('uploading', res);
        // 此处可校验md5等操作
        queue.addTask(() => upload(res));
      });
    },
    async checkComplete(bucketName: string, uploadId: string, token: string, file: File) {
      // 上传进度
      this.percent = Math.ceil(this.uploadResults.length / this.fileChunks.length * 100);
      // 上传完成后的操作,可校验上传是否正确的等
      if (this.fileChunks.length === this.uploadResults.length) {
        await this.completeUpload(bucketName, this.filekey, uploadId, token);
      }
    },
    getUploadToken() {
      return new Promise<string>((resolve) => {
        resolve('up token') // 返回你的上传Token
      });
    },
    getVideoDuration() {
      return new Promise<number>((resolve) => {
        const fileUrl = URL.createObjectURL(this.file);
        const video = document.createElement('video');
        video.setAttribute('preload', 'auto');
        video.src = fileUrl;
        video.onloadeddata = () => {
          resolve(Math.floor(video.duration));
        };
      });
    },
    async getUploadId(bucketName: string, filename: string, token: string) {
      const result = await http({
        method: 'POST',
        url: `${this.uploadDomain}/buckets/${bucketName}/objects/${window.btoa(filename)}/uploads`,
        headers: {
          'Content-Type': 'application/json',
          Authorization: `UpToken ${token}`,
        },
      });
      console.log('getUploadId', result.data);
      return result.data; // {uploadId: '', expireAt: 1695894984}
    },
    async doUpload(bucketName: string, filename: string, uploadId: string, index: number, token: string, data: Blob) {
      const result = await http({
        method: 'PUT',
        url: `${this.uploadDomain}/buckets/${bucketName}/objects/${window.btoa(filename)}/uploads/${uploadId}/${index}`,
        headers: {
          'Content-Type': 'application/octet-stream',
          Authorization: `UpToken ${token}`,
        },
        data: data,
      });
      console.log('doUpload', index, result.data);
      return result.data; // {etag: '', md5: ''}
    },
    async completeUpload(bucketName: string, filename: string, uploadId: string, token: string, mimeType: string) {
      // 七牛文档要求按照partNumber排序
      this.uploadResults.sort((a, b) => (a.partNumber > b.partNumber ? 1 : -1));
      const result = await http({
        method: 'POST',
        url: `${this.uploadDomain}/buckets/${bucketName}/objects/${window.btoa(filename)}/uploads/${uploadId}`,
        headers: {
          'Content-Type': 'application/json',
          Authorization: `UpToken ${token}`,
        },
        data: {
          fname: filename,
          mimeType: mimeType, // 'video/mp4'
          parts: this.uploadResults, // [{etag: '', partNumber: 1}]
        },
      });
      console.log('completeUpload', result.data);
      return result.data; // {hash: '', key: ''}
    },
  },
}

至此,简易的文件分片上传已完成,当然还有暂停和取消上传等操作可以在队列那里实现,不过现在已能满足大部分场景,很多时候文件上传出错用户更倾向于重头来,你写了错误处理也不一定用得上。

如有问题欢迎指出!