背景
最近由于业务需求, 需要上传大文件, 为了让用户上传起来更快速, 不禁想起最近比较火的大文件上传, 于是不如实践一下, 写出符合业务需求的大文件上传, 不仅样式可以自定义, 各种功能细节点都在自己掌握之中, 本文使用 Vue
框架来搭建, 在 Element
框架基础之上来写样式, 后端采用 Egg.js
来提供接口. 感兴趣的可以继续看下去.
效果
思路
主要功能点
本文主要是实现了: 前端文件切片上传、断点续传、秒传; 后端文件合并.
前端实现主要思路
-
切片逻辑: 前端给文件
blob
进行slice
切割成切片 -
计算 hash : 前端用插件
spark-md5
计算出该文件hash
值 (根据文件内容生成的hash
, 文件内容不变,则hash
变化) -
并发请求 : 分别对这几个片
promise.all
发起上传请求, 待所有片上传成功之后, 在发起一个请求告知后端可以对分片进行合并. -
断点续传: 为了使用户刷新页面时, 之前老文件上传记录回保留, 故在每次分片上传之前, 会请求后端接口查看当前文件已经上传过的记录, 已经上传的不用在上传, 上传过部分的会返回已上传的大小, 前端根据返回大小再次切割分片.
后端实现主要思路
- 接收并保存: 接受前端传递的文件分片, 按照文件名将分片文件放到一起.
- 合并分片: 当前端调接口表明要合并某个文件, 后端接收文件名, 将该文件下的分片排序后合并.
- 老文件记录: 提供已经上传过的分片记录
可能想问的问题
- 为什么计算
hash
, 可以不计算么?- 计算后文件名根据内容是唯一的, 若不计算, 后期文件内容发生改变,文件名不变, 再次上传时候, 后端会以为要断点续传, 从而导致旧上传的资源得不到替换.
- 为什么要分片 ?
- 分片之后,每个请求资源会变小, 通过
primise.all
可以同时http
并发进行请求, 提高了上传速度
- 分片之后,每个请求资源会变小, 通过
- 秒传如何实现?
- 后端可以获取上传过的文件记录信息, 发现前端请求文件已经上传过之后, 会直接返回上传成功.
- 我们需要知道的
- 大文件上传, 是在前端进行文件切片, 在后端将文件进行的合并.
前端实现
切片逻辑
- 核心是对文件资源的
slice
切割, 并存放到数组中
createChunks(file) {
let current = 0;
const partList = [];
while (current < file.size) {
// 核心是对文件的 slice
const chunk = file.raw.slice(current, current + this.partSize); // el-upload 原始文件会放到: file.raw 对象中
partList.push({ chunk, size: chunk.size });
current += this.partSize; // this.partSize 定位每片大小
}
return partList;
},
计算 hash
通过 new Worker 打开子进程, 将最终计算的 hash 返回
calculateHash(partList, fileIndex) {
this.fileList[fileIndex].state = PART;
return new Promise((resolve) => {
const worker = new Worker('/hash.js');
worker.postMessage({ partList });
worker.onmessage = (event) => { // 监听子进程 传过来的数据
const { percent, hash } = event.data;
this.setHashPercent(percent, fileIndex); // 修改滚动条
if (hash) {
resolve(hash);
}
};
});
},
self.importScripts('https://cdn.bootcss.com/spark-md5/3.0.0/spark-md5.js');
self.onmessage = async (event) => {
var { partList } = event.data;
const spark = new self.SparkMD5.ArrayBuffer();
var percent = 0;
var size = 100 / partList.length;
var buffers = await Promise.all(partList.map(({ chunk }) => new Promise((resolve) => {
const reader = new FileReader();
reader.readAsArrayBuffer(chunk);
reader.onload = (event) => {
percent += size;
// 修改滚动条
self.postMessage({ percent: Number(percent.toFixed(2)) });
resolve(event.target.result);
}
})));
buffers.forEach(buffer => spark.append(buffer));
self.postMessage({ percent: 100, hash: spark.end() });
self.close();
}
并发请求
并发、进度条
createRequests(partList) {
return partList.map((part) => {
const formData = new FormData();
// part.chunk 每个切片的真正内容
formData.append(part.filename, part.chunk.slice(part.loaded), part.filename);
return request({
url: `${this.action.partUpload}/${part.filename}/${part.chunk_name}/${part.loaded}`,
method: 'POST',
data: formData, // 这里FormData 格式
setXHR: (xhr) => {
part.xhr = xhr;
},
// 进度条 核心 event.loaded 返回已经上传过的文件大小
onProgress: (event) => {
const percent = parseInt(Number((Number(part.loaded + event.loaded) / part.chunk.size * 100)), 10);
part.percent = percent > 100 ? 100 : percent;
},
});
});
},
async uploadParts(partList, filename, fileIndex) {
if (!filename) return;
const requests = this.createRequests(partList);// 获取切片请求
await Promise.all(requests);// 同时请求
},
断点续传、秒传
每次上传分片前先获取已经上传文件的列表, 将列表与本地列表进行筛选, 已经上传的移除, 上传一部分的 通过下面 22 行代码 将资源截取.
createRequests(partList, uploadedList) {
const partLists = partList.filter((part) => {
const uploadedFile = uploadedList.find((item) => item.filename === part.chunk_name);
if (!uploadedFile) { // 未上传过的
part.loaded = 0;
part.percent = 0;
return true;
}
if (uploadedFile.size < part.chunk.size) { // 上传过一部分的
part.loaded = uploadedFile.size; // 后端返回已上传过文件的大小
part.percent = parseInt(Number((Number(part.loaded) / part.chunk.size * 100)), 10);
return true;
}
if (uploadedFile.size >= part.chunk.size) {
part.percent = 100;
return false;
}
return false;
});
return partLists.map((part) => {
const formData = new FormData();
formData.append(part.filename, part.chunk.slice(part.loaded), part.filename);
return request({
url: `${this.action.partUpload}/${part.filename}/${part.chunk_name}/${part.loaded}`,
method: 'POST',
data: formData,
setXHR: (xhr) => {
part.xhr = xhr;
},
onProgress: (event) => {
const percent = parseInt(Number((Number(part.loaded + event.loaded) / part.chunk.size * 100)), 10);
part.percent = percent > 100 ? 100 : percent;
},
});
});
},
async uploadParts(partList, filename, fileIndex) {
if (!filename) return;
// 先向服务器获取已上传过的文件列表
const { needUpload = true, uploadedList = [] } = await this.getUploadedList(filename);
if (!needUpload) { // 表示该文件已经上传完整, 不用在上传.
partList.forEach((item) => {
item.percent = 100;
});
this.handleSuccess(fileIndex, true);
return;
}
const requests = this.createRequests(partList, uploadedList);
await Promise.all(requests);
const res = await request({
url: this.action.upload,
method: 'POST',
headers: { 'Content-Type': 'application/json' },
data: JSON.stringify({ filename }),
});
if (res.success) {
this.handleSuccess(fileIndex);
return;
}
this.$message.error('发生未知异常错误,请重新上传!');
},
暂停、继续
到这里也该交代一下, fileList partList
两个重要变量的含义, 下图是具体信息, 在 el-upload
的 fileList
扩展了一些参数.
// 继续
async resumeItem(index) {
const { partList = [], hashName } = this.fileList[index];
this.fileList[index].state = UPLOADING;
this.uploadParts(partList || [], hashName, index);
},
// 暂停 xhr.abort 核心 将每个请求 xhr 保存到 partList 中
pauseItem(index) {
const { partList = [] } = this.fileList[index];
this.fileList[index].state = PAUSE;
partList.forEach((part) => part.xhr && part.xhr.abort());
},
演示一下暂停与继续
后端实现
Egg,js 配置起来方便, 需要提供给前端三个接口
接收并保存
import { Controller, Context } from 'egg'
import { resolve } from 'path'
import { createWriteStream, createReadStream } from 'fs'
import { pathExists } from 'fs-extra'
// tslint:disable-next-line: no-var-requires
const fs = require('fs').promises
import { write } from 'await-stream-ready'
import sendToWormhole from 'stream-wormhole'
const TEMP_DIR = 'temp'
const PUBLIC_DIR = 'file'
export default class FileController extends Controller {
public async partUpload(ctx: Context) {
const { filename = '', chunkName = '', start = 0 } = ctx.params
const filePath = resolve(ctx.app.config.static.dir as string, TEMP_DIR, filename)
const existFile = await pathExists(filePath)
if (!existFile) {
try {
await fs.mkdir(filePath)
} catch (err) {
console.log(err)
}
}
const stream = await ctx.getFileStream()
const target = resolve(filePath, chunkName)
// start 表示从何处开始写, 断点续传后端核心
const writeStream = createWriteStream(target, { start: ~~start, flags: 'a' })
try {
await write(stream.pipe(writeStream))
} catch (err) {
await sendToWormhole(stream)
throw err
}
ctx.body = {
code: 200,
success: true,
}
ctx.status = 200
}
// .....
}
合并分片
public async upload(ctx: Context) {
const { filename } = ctx.request.body
await mergeChunks(filename, ctx)
ctx.body = {
code: 200,
success: true,
}
ctx.status = 200
}
const pipeStream = (filePath: string, writeStream) => new Promise(resolve => {
const readStream = createReadStream(filePath)
readStream.on('end', async () => {
await fs.unlink(filePath) // 结束删掉 切片
resolve()
})
readStream.pipe(writeStream) // 写入
})
const SIZE = 1024 * 1024 * 10
// 合并逻辑
const mergeChunks = async (filename: string, ctx) => {
const filePath = resolve(ctx.app.config.static.dir as string, PUBLIC_DIR, filename)
const chunksDir = resolve(ctx.app.config.static.dir as string, TEMP_DIR, filename) // 切片资源路径
const chunkFiles = await fs.readdir(chunksDir) // 读切片资源文件
chunkFiles.sort((a, b) => Number(a.split('-')[1]) - Number(b.split('-')[1])) // 排序
await Promise.all(
chunkFiles.map((chunkFile, index) => pipeStream( // 写入流
resolve(chunksDir, chunkFile),
createWriteStream(filePath, {
start: index * SIZE,
}),
)),
)
await fs.rmdir(chunksDir)
}
老文件记录
public async getUploadedList(ctx: Context) {
const { filename } = ctx.request.body
const filePath = resolve(ctx.app.config.static.dir as string, PUBLIC_DIR, filename) // 查看资源文件是否存在当前文件
const existFile = await pathExists(filePath)
if (existFile) {
ctx.body = {
code: 200,
needUpload: false, // 存在,则不需要再次上传, 秒传的配合
}
return
}
// 查看该文件切片存在哪些
const tempFilePath = resolve(ctx.app.config.static.dir as string, TEMP_DIR, filename)
let uploadedList: any[] = []
const existTemporaryFile = await pathExists(tempFilePath)
if (existTemporaryFile) {
uploadedList = await fs.readdir(tempFilePath)
uploadedList = await Promise.all(uploadedList.map(async (filename: string) => {
const stat = await fs.stat(resolve(tempFilePath, filename))
return {
filename,
size: stat.size, // 切片大小需要返回前端
}
}))
}
ctx.body = {
code: 200,
needUpload: true,
uploadedList,
}
}
优化
可以发现最上面视频, 1G多点文件, 虽然用了 new Worker 还是有些卡顿的, 还需要进一步优化, 如最后参考文中所描述几个点: 时间切片计算文件 hash, 抽样hash 命中率的同时,提升效率, 分片上传失败重试 等.