Large file fragment upload, breakpoint continuation and related extension

Large file fragment upload core method

  • In JavaScript, the FIle object is a subclass of the Blob object, which contains the important method slice through which we can split binary files
  • Upload using FormData format
  • The server interface receives the data and processes it through the Multiparty library
  • Separate files from fields, and run the fse.move command to move the uploaded file to the target path
  • The client uses the promise. all method. When it detects that all slices have been uploaded, the merge interface is called to notify the server to merge slices
  • Use Stream to read and write to slices and set start for writable streams
  • Promise.all Determines whether all slices are written

The progress bar

  • Listen for progress using the onProgress method of the browser XMLHttpRequest
// as an input to request
const xhr = new XMLHttpRequest();
xhr.upload.onprogress = onProgress;
// Callback method
onProgress: this.createProgressHandler(this.data[index])
// Accept the callback and get the progress through E.loaded and E.total
createProgressHandler(item) {
  return (e) = > {
    item.percentage = parseInt(String((e.loaded / e.total) * 100));
  };
},
Copy the code

Breakpoint continuation of core methods

The XHR abort method is used to abort the current request

this.requestList.forEach((xhr) = >xhr? .abort());Copy the code

2, extras: breakpoint continuation server practice

  • When a user is listening to a song, the network breaks down and the user needs to continue listening. If the file server does not support breakpoints, the user needs to download the file again. If Range is supported, the client should record the Range of files that have been read before. After the network is restored, the server will send a request to read the remaining Range. The server only needs to send the content requested by the client instead of sending the whole file back to the client, so as to save network bandwidth.

  • If the Server supports Range, the client must first be told that it supports Range, and then the client may initiate a request with Range. Here apply a word of Tang’s monk, you don’t say how do I know. response.setHeader(‘Accept-Ranges’, ‘bytes’);

  • The Server uses Range: bytes=0-xxx in the request header to determine whether a Range request is made. If the Range: bytes=0-xxx value exists and is valid, only the requested Partial Content is sent back. The status code of the response is 206, indicating Partial Content, and content-range is set. Request Range Not Satisfiable (www.w3.org/Protocols/r…) . If the Range header is not included, the response continues in the normal way.

getStream(req, res, filepath, fileStat) {
    res.setHeader('Accept-Range'.'bytes'); // Tell the client that the server supports Range
    let range = req.headers['range'];
    let start = 0;
    let end = fileStat.size;
    if (range) {
        let reg = /bytes=(\d*)-(\d*)/;
        let result = range.match(reg);
        if (result) {
            start = isNaN(result[1])?0 : parseInt(result[1]);
            end = isNaN(result[2])?0 : parseInt(result[2]); }}; debug(`start=${start},end=${end}`);
    return fs.createReadStream(filepath, {
        start,
        end
    });
}
Copy the code

Improve the paper

  1. Time slice computes file hash:You can use web-workder and React Fiber architecture to calculate the hash time. RequestIdleCallback can be used to calculate the idle time of the browser without blocking the main thread
  2. Sampling hash:File hash is calculated to determine whether the file exists, so as to realize the function of second transmission, so we can refer to the concept of Bloom filter, sacrifice a little recognition rate to exchange time, for example, we can sample hash
  3. Generate hash based on file name + file modification time + size
  4. Network request concurrency control:Large files with too many slices and too many HTTP links can also crash the browser. We can solve this problem by controlling the number of concurrent asynchronous requests, which is also a headline interview question
  5. Slow start strategy:Since the file sizes vary, it is a bit awkward to set the size of each slice to be fixed. We can refer to TCPSlow startPolicy, set an initial size, according to the completion of the upload task, to dynamically adjust the size of the next slice, to ensure that the size of the file slice matches the current network speed
  6. Concurrent retry + error:If an error is reported, how to retry, for example, we allow two retries for each slice, and then terminate three times
  7. File fragmentation cleaning

1. Time slice computes file hash

In fact, it is the time-slice concept, the core concept of Fiber architecture in React. It uses idle time of the browser to compute large DIff processes, and any high-priority tasks, such as animation and input, will interrupt diFF tasks in the middle of the process. Although the total computation amount is not reduced, the user interaction experience is greatly improved

requestIdleCallback

requestIdelCallback(myNonEssentialWork);

function myNonEssentialWork (deadline) {
  // deadline.timeremaining () retrieves the remaining time of the current frame
  // The current frame has time and the task queue is not empty
  while (deadline.timeRemaining() > 0 && tasks.length > 0) {
    doWorkIfNeeded();
  }
  if (tasks.length > 0){ requestIdleCallback(myNonEssentialWork); }}Copy the code

2. Sample hash

The purpose of calculating the MD5 value of a file is to determine whether the file exists or not. We can consider designing a sampling hash to sacrifice some hit ratio and improve efficiency. The design idea is as follows

  1. Cut the file into XXX Mb slices
  2. The first and last slices all the content, the other slices take the first, middle and last three places 2 bytes each
  3. The merged content computes MD5, which is called the shard Hash
  4. The result of this hash is that the file exists with a small probability of miscalculation, but if it does not exist, it is 100% accurate, andBloom filterHash is similar. You can use two hashes together
  5. I tried a 1.5-gigabyte file on my computer. It took about 20 seconds for the full file, but about 1 second for sampling is still a good way to determine if the file doesn’t exist

3. Generate hash based on the file name, file modification time, and size

You can generate hash based on the lastModified, name, and size of a File, avoiding using spark-MD5 to hash large files, which greatly saves time

Docx: Tue Oct 05 2021 20:17:42 GMT+0800 {} Name: "2021.docx" size: 1633436262311 1696681 type: "application/vnd.openxmlformats-officedocument.wordprocessingml.document"Copy the code

4. Network request concurrency control

Hundreds of HTTP requests are sent at a time after large file hashes, and the resulting TCP setup kills the browser

The idea is that we put asynchronous requests in a queue. For example, if the number of concurrent requests is 3, we will make 3 requests at the same time, and then the next request will be made when the request is finished

We manage the number of concurrent requests by Max, making a request Max — and ending a request Max ++

async sendRequest(forms, max=4) {
  return new Promise(resolve= > {
    const len = forms.length;
    let idx = 0;
    let counter = 0;
    const start = async() = > {// There are requests, there are channels
      while (idx < len && max > 0) {
        max--; // Occupy the channel
        console.log(idx, "start");
        const form = forms[idx].form;
        const index = forms[idx].index;
        idx++
        request({
          url: '/upload'.data: form,
          onProgress: this.createProgresshandler(this.chunks[index]),
          requestList: this.requestList
        }).then(() = > {
          max++; // Release channel
          counter++;
          if (counter === len) {
            resolve();
          } else{ start(); }}); } } start(); }); }Copy the code

5. Slow start strategy implementation

  1. (chunk) createFileChunk (createFileChunk, createFileChunk
  2. For example, we want to deliver one in 30 seconds
  3. The initial size is set to 1M, and if the upload takes 10 seconds, the next block size becomes 3M
  4. If the upload takes 60 seconds, the next block size becomes 500KB and so on

6, concurrent retry + error

  1. The request failed. Catch requeued the task
  2. Progress is set to -1 after an error and the progress bar is shown in red
  3. The array stores the number of hash retries for each file. For example, [1,0,2], the 0th file slice error is reported once and the second file slice error is reported twice
  4. Direct reject beyond 3

7. Clear server fragmentation files

If a lot of people leave halfway through, the slices won’t make sense. Consider cleaning them regularly

We can use Node-schedule to manage scheduled tasks. For example, we can scan the directory where files are stored once a day. If the file was modified a month ago, we can delete it directly

// To facilitate testing, I changed it to scan every 5 seconds and delete expired 1 minute for demonstration
const fse = require('fs-extra')
const path = require('path')
const schedule = require('node-schedule')


// Delete the empty directory
function remove(file,stats){
    const now = new Date().getTime()
    const offset = now - stats.ctimeMs 
    if(offset>1000*60) {// Fragments longer than 60 seconds
        console.log(file,'Expired, waste of space, delete')
        fse.unlinkSync(file)
    }
}

async function scan(dir,callback){
    const files = fse.readdirSync(dir)
    files.forEach(filename= >{
        const fileDir = path.resolve(dir,filename)
        const stats = fse.statSync(fileDir)
        if(stats.isDirectory()){
            return scan(fileDir,remove)
        }
        if(callback){
            callback(fileDir,stats)
        }
    })
}
// * * * * * *
// ┬ ┬ ┬ ┬ ┬ ┬
// │ │ │ │ │ │ │
// │ │ ├ ─ 2-7 (0 or 7)
│ │ ├ ─ garbage (1-12)
│ // │ ├ ─ Day of Month (1-31)
│ // │ ├ ── hour (0-23)
// │ ├ ── 2-8 (1-8)
/ / └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ the second (0 to 59, OPTIONAL)
let start = function(UPLOAD_DIR){
    / / every 5 seconds
    schedule.scheduleJob("*/5 * * * * *".function(){
        console.log('Start scanning')
        scan(UPLOAD_DIR)
    })
}
exports.start = start
Copy the code

Client core code

<template>
  <div id="app">
    <div>
      <input
        type="file"
        :disabled="status ! == Status.wait"
        @change="handleFileChange"
      />
      <el-button @click="handleUpload" :disabled="uploadDisabled"
        >Upload < / el - button ><el-button @click="handleResume" v-if="status === Status.pause"
        >< / el - button ><el-button
        v-else
        :disabled="status ! == Status.uploading || ! container.hash"
        @click="handlePause"
        >Suspend < / el - button ></div>
    <div>
      <div>Compute file hash</div>
      <el-progress :percentage="hashPercentage"></el-progress>
      <div>The progress</div>
      <el-progress :percentage="fakeUploadPercentage"></el-progress>
    </div>
    <el-table :data="data">
      <el-table-column
        prop="hash"
        label="Slice hash"
        align="center"
      ></el-table-column>
      <el-table-column label="Size (KB)" align="center" width="120">
        <template v-slot="{ row }">
          {{ row.size | transformByte }}
        </template>
      </el-table-column>
      <el-table-column label="Progress" align="center">
        <template v-slot="{ row }">
          <el-progress
            :percentage="row.percentage"
            color="# 909399"
          ></el-progress>
        </template>
      </el-table-column>
    </el-table>
  </div>
</template>

<script>
const SIZE = 128 * 1024; // Slice size
const Status = {
  wait: "wait".pause: "pause".uploading: "uploading"};export default {
  name: "app".filters: {
    transformByte(val) {
      return Number((val / 1024).toFixed(0)); }},data: () = > ({
    Status,
    container: {
      file: null.hash: "".worker: null,},hashPercentage: 0.data: [].requestList: [].status: Status.wait,
    // Canceling XHR while pausing causes the progress bar to fall back
    // To avoid this, define a fake progress bar
    fakeUploadPercentage: 0,}).computed: {
    uploadDisabled() {
      return(!this.container.file ||
        [Status.pause, Status.uploading].includes(this.status)
      );
    },
    uploadPercentage() {
      if (!this.container.file || !this.data.length) return 0;
      const loaded = this.data
        .map((item) = > item.size * item.percentage)
        .reduce((acc, cur) = > acc + cur);
      return parseInt((loaded / this.container.file.size).toFixed(2)); }},watch: {
    uploadPercentage(now) {
      if (now > this.fakeUploadPercentage) {
        this.fakeUploadPercentage = now; }}},methods: {
    handlePause() {
      this.status = Status.pause;
      this.resetData();
    },
    resetData() {
      this.requestList.forEach((xhr) = >xhr? .abort());this.requestList = [];
      if (this.container.worker) {
        this.container.worker.onmessage = null; }},async handleResume() {
      this.status = Status.uploading;
      const { uploadedList } = await this.verifyUpload(
        this.container.file.name,
        this.container.hash
      );
      await this.uploadChunks(uploadedList);
    },
    // xhr
    request({
      url,
      method = "post",
      data,
      headers = {},
      onProgress = (e) => e,
      requestList,
    }) {
      return new Promise((resolve) = > {
        const xhr = new XMLHttpRequest();
        xhr.upload.onprogress = onProgress;
        xhr.open(method, url);
        Object.keys(headers).forEach((key) = >
          xhr.setRequestHeader(key, headers[key])
        );
        xhr.send(data);
        xhr.onload = (e) = > {
          // Delete the XHR successfully requested from the list
          if (requestList) {
            const xhrIndex = requestList.findIndex((item) = > item === xhr);
            requestList.splice(xhrIndex, 1);
          }
          resolve({
            data: e.target.response,
          });
        };
        // Expose the current XHR to the outside worldrequestList? .push(xhr); }); },// Generate file slices
    createFileChunk(file, size = SIZE) {
      const fileChunkList = [];
      let cur = 0;
      while (cur < file.size) {
        fileChunkList.push({ file: file.slice(cur, cur + size) });
        cur += size;
      }
      return fileChunkList;
    },
    // Generate file hash (web-worker)
    calculateHash(fileChunkList) {
      return new Promise((resolve) = > {
        this.container.worker = new Worker("/hash.js");
        this.container.worker.postMessage({ fileChunkList });
        this.container.worker.onmessage = (e) = > {
          const { percentage, hash } = e.data;
          this.hashPercentage = percentage;
          if(hash) { resolve(hash); }}; }); },handleFileChange(e) {
      const [file] = e.target.files;
      if(! file)return;
      console.log(file)
      this.resetData();
      Object.assign(this.$data, this.$options.data());
      this.container.file = file;
    },
    async handleUpload() {
      if (!this.container.file) return;
      this.status = Status.uploading;
      const fileChunkList = this.createFileChunk(this.container.file);
      this.container.hash = await this.calculateHash(fileChunkList);
      const { shouldUpload, uploadedList } = await this.verifyUpload(
        this.container.file.name,
        this.container.hash
      );
      if(! shouldUpload) {this.$message.success("Second pass: Upload successful");
        this.status = Status.wait;
        return;
      }
      this.data = fileChunkList.map(({ file }, index) = > ({
        fileHash: this.container.hash,
        index,
        hash: this.container.hash + "-" + index,
        chunk: file,
        size: file.size,
        percentage: uploadedList.includes(this.container.hash + "-" + index) ? 100 : 0,}));await this.uploadChunks(uploadedList);
    },
    // Upload slices and filter the uploaded slices
    async uploadChunks(uploadedList = []) {
      const requestList = this.data
        .filter(({ hash }) = >! uploadedList.includes(hash)) .map(({ chunk, hash, index }) = > {
          const formData = new FormData();
          formData.append("chunk", chunk);
          formData.append("hash", hash);
          formData.append("filename".this.container.file.name);
          formData.append("fileHash".this.container.hash);
          return { formData, index };
        })
        .map(async ({ formData, index }) =>
          this.request({
            url: "http://localhost:3000".data: formData,
            onProgress: this.createProgressHandler(this.data[index]),
            requestList: this.requestList,
          })
        );
      await Promise.all(requestList);
      // Number of uploaded slices + number of uploaded slices = number of all slices
      // merge slices
      if (uploadedList.length + requestList.length === this.data.length) {
        await this.mergeRequest(); }},// Notify the server to merge slices
    async mergeRequest() {
      await this.request({
        url: "http://localhost:3000/merge".headers: {
          "content-type": "application/json",},data: JSON.stringify({
          size: SIZE,
          fileHash: this.container.hash,
          filename: this.container.file.name,
        }),
      });
      this.$message.success("Upload successful");
      this.status = Status.wait;
    },
    // Verify that the file has been uploaded according to the hash
    // Upload only if no
    async verifyUpload(filename, fileHash) {
      const { data } = await this.request({
        url: "http://localhost:3000/verify".headers: {
          "content-type": "application/json",},data: JSON.stringify({
          filename,
          fileHash,
        }),
      });
      return JSON.parse(data);
    },
    // Use closures to store progress data for each chunk
    createProgressHandler(item) {
      return (e) = > {
        console.log(item.hash, parseInt(String((e.loaded / e.total) * 100)));
        item.percentage = parseInt(String((e.loaded / e.total) * 100)); }; ,}}};</script>
Copy the code

Server core code

index.js

const Controller = require("./controller");
const http = require("http");
const server = http.createServer();

const controller = new Controller();

server.on("request".async (req, res) => {
  res.setHeader("Access-Control-Allow-Origin"."*");
  res.setHeader("Access-Control-Allow-Headers"."*");
  if (req.method === "OPTIONS") {
    res.status = 200;
    res.end();
    return;
  }
  if (req.url === "/verify") {
    await controller.handleVerifyUpload(req, res);
    return;
  }

  if (req.url === "/merge") {
    await controller.handleMerge(req, res);
    return;
  }

  if (req.url === "/") {
    awaitcontroller.handleFormData(req, res); }}); server.listen(3000.() = > console.log("Listening to port 3000."));
Copy the code

controller.js

const multiparty = require("multiparty");
const path = require("path");
const fse = require("fs-extra");

const extractExt = (filename) = >
  filename.slice(filename.lastIndexOf("."), filename.length); // Extract the suffix name
const UPLOAD_DIR = path.resolve(__dirname, ".."."target"); // Large file storage directory

const pipeStream = (path, writeStream) = >
  new Promise((resolve) = > {
    const readStream = fse.createReadStream(path);
    readStream.on("end".() = > {
      fse.unlinkSync(path);
      resolve();
    });
    readStream.pipe(writeStream);
  });

// merge slices
const mergeFileChunk = async (filePath, fileHash, size) => {
  const chunkDir = path.resolve(UPLOAD_DIR, fileHash);
  const chunkPaths = await fse.readdir(chunkDir);
  // Sort by slice subscript
  // Otherwise, the order of obtaining the directory directly may be out of order
  chunkPaths.sort((a, b) = > a.split("-") [1] - b.split("-") [1]);
  await Promise.all(
    chunkPaths.map((chunkPath, index) = >
      pipeStream(
        path.resolve(chunkDir, chunkPath),
        // Create a writable stream at the specified location
        fse.createWriteStream(filePath, {
          start: index * size,
          end: (index + 1) * size,
        })
      )
    )
  );
  fse.rmdirSync(chunkDir); // Delete the directory where the slices are saved after merging
};

const resolvePost = (req) = >
  new Promise((resolve) = > {
    let chunk = "";
    req.on("data".(data) = > {
      chunk += data;
    });
    req.on("end".() = > {
      resolve(JSON.parse(chunk));
    });
  });

// Returns the name of the slice that has been uploaded
const createUploadedList = async (fileHash) =>
  fse.existsSync(path.resolve(UPLOAD_DIR, fileHash))
    ? await fse.readdir(path.resolve(UPLOAD_DIR, fileHash))
    : [];

module.exports = class {
  // merge slices
  async handleMerge(req, res) {
    const data = await resolvePost(req);
    const { fileHash, filename, size } = data;
    const ext = extractExt(filename);
    const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${ext}`);
    await mergeFileChunk(filePath, fileHash, size);
    res.end(
      JSON.stringify({
        code: 0.message: "file merged success",})); }// Process slices
  async handleFormData(req, res) {
    const multipart = new multiparty.Form();

    multipart.parse(req, async (err, fields, files) => {
      if (err) {
        console.error(err);
        res.status = 500;
        res.end("process file chunk failed");
        return;
      }
      const [chunk] = files.chunk;
      const [hash] = fields.hash;
      const [fileHash] = fields.fileHash;
      const [filename] = fields.filename;
      const filePath = path.resolve(
        UPLOAD_DIR,
        `${fileHash}${extractExt(filename)}`
      );
      const chunkDir = path.resolve(UPLOAD_DIR, fileHash);

      // File existence returns directly
      if (fse.existsSync(filePath)) {
        res.end("file exist");
        return;
      }

      // Create a slice directory because the slice directory does not exist
      if(! fse.existsSync(chunkDir)) {await fse.mkdirs(chunkDir);
      }
      // fs-extra special method, similar to fs.rename and cross-platform
      // Fs-extra rename method has permissions on Windows platforms
      // https://github.com/meteor/meteor/issues/7852#issuecomment-255767835
      await fse.move(chunk.path, path.resolve(chunkDir, hash));
      res.end("received file chunk");
    });
  }
  // Verify that slice subscripts are uploaded/uploaded
  async handleVerifyUpload(req, res) {
    const data = await resolvePost(req);
    const { fileHash, filename } = data;
    const ext = extractExt(filename);
    const filePath = path.resolve(UPLOAD_DIR, `${fileHash}${ext}`);
    if (fse.existsSync(filePath)) {
      res.end(
        JSON.stringify({
          shouldUpload: false,})); }else {
      res.end(
        JSON.stringify({
          shouldUpload: true.uploadedList: awaitcreateUploadedList(fileHash), }) ); }}};Copy the code

The complete code

Github.com/miracle90/b…

Refer to the link

  • Bytedance Interviewer: Please implement a large file upload and resumable breakpoint
  • Bytedance interviewer, I also implemented large file upload and resumable breakpoint
  • How to handle large files uploaded from the front end