preface
XLTD file, the other one is config.cfg file, roughly guess is thunder is multithreaded download, need to save the data of each thread to temp.xltd file, Then, in config.cfg, MARK the location and number of bytes of the data block, for which I will write a large file to verify my guess.
Breakpoint-upload Breakpoint-upload
The front-end implementation
// the entry function
// Complete concurrent upload via queue
// This function returns five methods
// start upload,
/ / stop pause
// Done is executed when the queue is empty, requiring the user to register the callback function of the business logic
// next next task
// execute queue executes the function that requires the user to register the callback function of the business logic
//
export function queueUpload(
file: File,
options: UploadOptions = {
chunkSize: 2 * 1024 * 1024,
concurrent: 4,
fieldname: "file",}) {
let { chunkSize, concurrent = 1, fieldname = "file" } = options;
// Divide files into chunks of a fixed size
let chunks = blobSlice(file, chunkSize);
// Generate the FormData array to upload
let formDatas = generateFormData(chunks, file.name, fieldname);
let queue: Item[] = [];
for (let index = 0; index < formDatas.length; index++) {
const element = formDatas[index];
queue.push({
data: element,
});
}
let concurrentQueue: Item[] = []; // Data in execution
let valve = false; / / the valve
let isDone = false;
/ / execution
let execute = (item: FormData) = > {};
// The queue is complete
let done = () = > {};
const active = {
stop: function stop() {
valve = false;
},
start: function start() {
valve = true;
if (queue.length === 0 && !isDone) {
done.call(null);
isDone = true;
return;
}
while (concurrentQueue.length < concurrent) {
let item = queue.shift();
if(! item)return;
concurrentQueue.push(item);
execute.call(null, item.data); }},// Move on to the next one
next: function next() {
concurrentQueue.shift();
if(! valve)return;
active.start();
},
execute: function (fn: (item? : FormData) =>void) {
execute = fn;
},
done: function (fn: () => void) { done = fn; }};return {
...active,
};
}
Copy the code
Server-side implementation
Back-end services expose two interfaces
- Breakpoint continuation interface
/renewal
To check whether the file has been uploaded. If yes, the file address is directly returned; if no, the completed shard id is returned - File upload interface
/upload
The first handles the request FormData field and data flow, and the second handles the upload logic
// Middleware that processes FormData data
export function makeField() {
return function Field(req: Request, _: Response, next: NextFunction) {
let busboy = new Busboy({
headers: req.headers,
});
busboy.on("file".(fieldname, fileStream, filename, encoding, mimetype) = > {
if(! filename)return fileStream.resume();
let finalPath = path.join(os.tmpdir(), `${uuid.v4()}-${filename}`);
let file = {
finalPath: finalPath,
fieldname: fieldname,
originalname: filename,
encoding: encoding,
mimetype: mimetype,
};
appendField(req.body, fieldname, file);
// Temporarily save the data to the system cache folder, sacrificing speed for space
// Another idea is to write data to a Duplex stream that is passed to the next middleware for processing
let outStream = fs.createWriteStream(finalPath);
fileStream.pipe(outStream);
fileStream.on("end".() = > {
outStream.close();
});
});
busboy.on("field".(fieldname, value) = > {
appendField(req.body, fieldname, value);
});
busboy.on("finish".() = > {
req.unpipe(busboy);
next();
});
req.pipe(busboy);
};
}
Copy the code
// Create a temporary file to be written and enter the shard upload information
export function makeUpload() {
return function Upload(req: Request, res: Response, next: NextFunction) {
let {
sizes,
size,
byteOffset,
position,
chunkIndex,
chunkCount,
md5,
type,
fieldname,
} = req.body;
let { originalname, encoding, mimetype, finalPath, buffers } = req.body[fieldname];
if(! fs.existsSync(getNowTempPath())) { mkdirsSync(getNowTempPath()); }// Fragments files to be written
let pathTmp = `${getNowTempPath()}/${md5}.${nameSuffix(originalname)}`;
if(! fs.existsSync(pathTmp)) { fs.writeFileSync(pathTmp,new Uint8Array());
}
// Fragment upload status
let pathCfgTmp = `${getNowTempPath()}/${md5}.${nameSuffix(originalname)}.cfg`;
if(! fs.existsSync(pathCfgTmp)) {let arr = new ArrayBuffer(+chunkCount * 12);
uintCodeCfg(arr, (uint32, uint8, index) = > {
uint32[0] = index;
uint8[0] = 0;
});
fs.writeFileSync(pathCfgTmp, new Uint8Array(arr));
}
readCacheFile(req, next, pathTmp, pathCfgTmp);
};
}
// Read the cache file to write the fragment to the temporary file
async function readCacheFile(
req: Request,
next: NextFunction,
pathTmp: string,
pathCfgTmp: string
) {
let {
sizes,
size,
byteOffset,
position,
chunkIndex,
chunkCount,
md5,
type,
fieldname,
} = req.body;
let { originalname, encoding, mimetype, finalPath, buffers } = req.body[
fieldname
];
let fileHandle = await fs.promises.open(pathTmp, "r+");
// Read the cache file
let rs = fs.createReadStream(finalPath);
let _position = +position;
rs.on("data".(chunk) = > {
let __position = _position;
if (chunk instanceof Uint8Array) {
_position += chunk.length;
// Write chunk to specific position {{__position}}
fs.write(fileHandle.fd, chunk, 0, chunk.length, __position, () = >{}); }}); rs.on("end".async() = > {await fileHandle.close();
// Delete cached files
fs.promises.unlink(finalPath);
let fi = await chunkFinish(pathCfgTmp, +chunkIndex);
// Move the file to the official folder and delete the fragment status file
if (fi.filter((f) = > f.complete === 1).length === +chunkCount) {
fs.promises.unlink(pathCfgTmp);
let _path = `${getUploadsPath()}/${md5}.${nameSuffix(originalname)}`;
await fs.promises.rename(pathTmp, _path);
appendField(req.body[fieldname], "path", _path);
}
rs.close();
next();
});
}
// Save the current fragment state
async function chunkFinish(pathCfgTmp: string, chunkIndex: number) {
// Read the upload details file to view the fragment upload status
// Ensure that the shard status file is read and written synchronously, avoiding the old value overwriting the new value
let buf = fs.readFileSync(pathCfgTmp);
let arr = pickTypedArrayBuffer(buf);
let fi: any[] = [];
// Data encoding
uintCodeCfg(arr, (uint32, uint8, index) = > {
if (index === +chunkIndex) {
uint8[0] = 1;
}
fi.push({
index: uint32[0].complete: uint8[0]}); });// Write the modified state
fs.writeFileSync(pathCfgTmp, new Uint8Array(arr));
return fi;
}
Copy the code
The test code
function uploadFile(file: File) {
let { execute, start, stop, next, done } = queueUpload(file, {
chunkSize: 2 * 1024 * 1024.concurrent: 6.fieldname: "file"});const task = async() = > {// To compute speed, md5 computes only the header and tail fragments
let md5 = await fileMd5(file);
done(async() = > {console.info("finish");
});
let res = await HTTP.post(baseUrl + "/file/renewal", {
md5,
filename: file.name,
}).resule<Result<{ path: string; list: number[]} > > ();if(res.code ! = =200) return;
if (res.data.path) {
console.info(res.data.path);
return;
}
let renewalArr = res.data.list;
execute((item) = > {
if(! item)return;
item.set("md5", md5);
let chunkIndex = item.get("chunkIndex");
if(chunkIndex && renewalArr.indexOf(+chunkIndex) ! = = -1) {
next();
return;
}
const upload = async() = > {let res = await HTTP.post(baseUrl + "/file/upload", item).resule<any> ();console.info(res);
next();
};
upload();
});
start();
console.info("start");
};
task();
}
Copy the code
Due to the limitations of FETCH, the upload progress cannot be realized. The download function can handle the download progress through Response.body.getreader ()
summary
This service takes a different approach to shard, in addition to the fragmentation of the merge and the transfer of fragmentation for many times, but this service is brought to a temporary file repeatedly open and close operation, recorded in the shard details file read and write many times (can be done by written to memory, remove synchronous read operations lead to higher performance). There are still a lot of optimizations in the code, such as border processing and performance improvement, etc. Welcome to the comments section, a lot of comments and exchanges!