preface

XLTD file, the other one is config.cfg file, roughly guess is thunder is multithreaded download, need to save the data of each thread to temp.xltd file, Then, in config.cfg, MARK the location and number of bytes of the data block, for which I will write a large file to verify my guess.

Breakpoint-upload Breakpoint-upload

The front-end implementation

// the entry function
// Complete concurrent upload via queue
// This function returns five methods
// start upload,
/ / stop pause
// Done is executed when the queue is empty, requiring the user to register the callback function of the business logic
// next next task
// execute queue executes the function that requires the user to register the callback function of the business logic
// 
export function queueUpload(
  file: File,
  options: UploadOptions = {
    chunkSize: 2 * 1024 * 1024,
    concurrent: 4,
    fieldname: "file",}) {
  let { chunkSize, concurrent = 1, fieldname = "file" } = options;
  // Divide files into chunks of a fixed size
  let chunks = blobSlice(file, chunkSize);
  // Generate the FormData array to upload
  let formDatas = generateFormData(chunks, file.name, fieldname);
  let queue: Item[] = [];
  for (let index = 0; index < formDatas.length; index++) {
    const element = formDatas[index];
    queue.push({
      data: element,
    });
  }

  let concurrentQueue: Item[] = []; // Data in execution
  let valve = false; / / the valve
  let isDone = false;

  / / execution
  let execute = (item: FormData) = > {};
  // The queue is complete
  let done = () = > {};

  const active = {
    stop: function stop() {
      valve = false;
    },
    start: function start() {
      valve = true;
      if (queue.length === 0 && !isDone) {
        done.call(null);
        isDone = true;
        return;
      }
      while (concurrentQueue.length < concurrent) {
        let item = queue.shift();
        if(! item)return;
        concurrentQueue.push(item);
        execute.call(null, item.data); }},// Move on to the next one
    next: function next() {
      concurrentQueue.shift();
      if(! valve)return;
      active.start();
    },
    execute: function (fn: (item? : FormData) =>void) {
      execute = fn;
    },
    done: function (fn: () => void) { done = fn; }};return {
    ...active,
  };
}
Copy the code

Server-side implementation

Back-end services expose two interfaces

  • Breakpoint continuation interface/renewalTo check whether the file has been uploaded. If yes, the file address is directly returned; if no, the completed shard id is returned
  • File upload interface/uploadThe first handles the request FormData field and data flow, and the second handles the upload logic
// Middleware that processes FormData data
export function makeField() {
  return function Field(req: Request, _: Response, next: NextFunction) {
    let busboy = new Busboy({
      headers: req.headers,
    });
    busboy.on("file".(fieldname, fileStream, filename, encoding, mimetype) = > {
      if(! filename)return fileStream.resume();
      let finalPath = path.join(os.tmpdir(), `${uuid.v4()}-${filename}`);

      let file = {
        finalPath: finalPath,
        fieldname: fieldname,
        originalname: filename,
        encoding: encoding,
        mimetype: mimetype,
      };
      appendField(req.body, fieldname, file);
      // Temporarily save the data to the system cache folder, sacrificing speed for space
      // Another idea is to write data to a Duplex stream that is passed to the next middleware for processing
      let outStream = fs.createWriteStream(finalPath);
      fileStream.pipe(outStream);
      fileStream.on("end".() = > {
        outStream.close();
      });
    });
    busboy.on("field".(fieldname, value) = > {
      appendField(req.body, fieldname, value);
    });
    busboy.on("finish".() = > {
      req.unpipe(busboy);
      next();
    });
    req.pipe(busboy);
  };
}
Copy the code
// Create a temporary file to be written and enter the shard upload information
export function makeUpload() {
  return function Upload(req: Request, res: Response, next: NextFunction) {
    let {
      sizes,
      size,
      byteOffset,
      position,
      chunkIndex,
      chunkCount,
      md5,
      type,
      fieldname,
    } = req.body;

    let { originalname, encoding, mimetype, finalPath, buffers } = req.body[fieldname];

    if(! fs.existsSync(getNowTempPath())) { mkdirsSync(getNowTempPath()); }// Fragments files to be written
    let pathTmp = `${getNowTempPath()}/${md5}.${nameSuffix(originalname)}`;
    if(! fs.existsSync(pathTmp)) { fs.writeFileSync(pathTmp,new Uint8Array());
    }

    // Fragment upload status
    let pathCfgTmp = `${getNowTempPath()}/${md5}.${nameSuffix(originalname)}.cfg`;
    if(! fs.existsSync(pathCfgTmp)) {let arr = new ArrayBuffer(+chunkCount * 12);
      uintCodeCfg(arr, (uint32, uint8, index) = > {
        uint32[0] = index;
        uint8[0] = 0;
      });
      fs.writeFileSync(pathCfgTmp, new Uint8Array(arr));
    }
    readCacheFile(req, next, pathTmp, pathCfgTmp);
  };
}

// Read the cache file to write the fragment to the temporary file
async function readCacheFile(
  req: Request,
  next: NextFunction,
  pathTmp: string,
  pathCfgTmp: string
) {
  let {
    sizes,
    size,
    byteOffset,
    position,
    chunkIndex,
    chunkCount,
    md5,
    type,
    fieldname,
  } = req.body;

  let { originalname, encoding, mimetype, finalPath, buffers } = req.body[
    fieldname
  ];

  let fileHandle = await fs.promises.open(pathTmp, "r+");
  // Read the cache file
  let rs = fs.createReadStream(finalPath);
  let _position = +position;
  rs.on("data".(chunk) = > {
    let __position = _position;
    if (chunk instanceof Uint8Array) {
      _position += chunk.length;
      // Write chunk to specific position {{__position}}
      fs.write(fileHandle.fd, chunk, 0, chunk.length, __position, () = >{}); }}); rs.on("end".async() = > {await fileHandle.close();

    // Delete cached files
    fs.promises.unlink(finalPath);

    let fi = await chunkFinish(pathCfgTmp, +chunkIndex);
    // Move the file to the official folder and delete the fragment status file
    if (fi.filter((f) = > f.complete === 1).length === +chunkCount) {
      fs.promises.unlink(pathCfgTmp);
      let _path = `${getUploadsPath()}/${md5}.${nameSuffix(originalname)}`;
      await fs.promises.rename(pathTmp, _path);
      appendField(req.body[fieldname], "path", _path);
    }

    rs.close();
    next();
  });
}

// Save the current fragment state
async function chunkFinish(pathCfgTmp: string, chunkIndex: number) {
  // Read the upload details file to view the fragment upload status
  // Ensure that the shard status file is read and written synchronously, avoiding the old value overwriting the new value
  let buf = fs.readFileSync(pathCfgTmp);
  let arr = pickTypedArrayBuffer(buf);
  let fi: any[] = [];
  // Data encoding
  uintCodeCfg(arr, (uint32, uint8, index) = > {
    if (index === +chunkIndex) {
      uint8[0] = 1;
    }
    fi.push({
      index: uint32[0].complete: uint8[0]}); });// Write the modified state
  fs.writeFileSync(pathCfgTmp, new Uint8Array(arr));
  return fi;
}
Copy the code

The test code

function uploadFile(file: File) {
  let { execute, start, stop, next, done } = queueUpload(file, {
    chunkSize: 2 * 1024 * 1024.concurrent: 6.fieldname: "file"});const task = async() = > {// To compute speed, md5 computes only the header and tail fragments
    let md5 = await fileMd5(file);
    done(async() = > {console.info("finish");
    });
    let res = await HTTP.post(baseUrl + "/file/renewal", {
      md5,
      filename: file.name,
    }).resule<Result<{ path: string; list: number[]} > > ();if(res.code ! = =200) return;
    if (res.data.path) {
      console.info(res.data.path);
      return;
    }
    let renewalArr = res.data.list;
    execute((item) = > {
      if(! item)return;
      item.set("md5", md5);
      let chunkIndex = item.get("chunkIndex");
      if(chunkIndex && renewalArr.indexOf(+chunkIndex) ! = = -1) {
        next();
        return;
      }
      const upload = async() = > {let res = await HTTP.post(baseUrl + "/file/upload", item).resule<any> ();console.info(res);
        next();
      };
      upload();
    });
    start();
    console.info("start");
  };
  task();
}
Copy the code

Due to the limitations of FETCH, the upload progress cannot be realized. The download function can handle the download progress through Response.body.getreader ()

summary

This service takes a different approach to shard, in addition to the fragmentation of the merge and the transfer of fragmentation for many times, but this service is brought to a temporary file repeatedly open and close operation, recorded in the shard details file read and write many times (can be done by written to memory, remove synchronous read operations lead to higher performance). There are still a lot of optimizations in the code, such as border processing and performance improvement, etc. Welcome to the comments section, a lot of comments and exchanges!