preface
Before, I happened to find a download implemented by Go in Github community. As THERE was no previous research on this aspect, I took relevant open source code out of curiosity to understand its internal implementation principle, and conducted debugging and analysis during this period as a reference template for personal learning. Thanks to monkeyWie’s open source project, Gopede-Core.
Train of thought
After a preliminary review of the relevant code logic, the main process of the downloader is summarized as follows:
- Initiate a request to obtain the file name, file size, and status of the download link, such as whether resumable is supported.
- Support breakpoint continuation?
- If yes, each coroutine is assigned a download task range independently based on the number of concurrent tasks
- Not supported, single coroutine download at the same time
- Start concurrent download, get file byte range from HTTP request header, divide and conquer.
Related technical points
Here is a list of some of the basics referenced in the project, familiar with the usage and then talk about the implementation of the specific application in the project.
ErrGroup
There is a common structure in the Go native sync package, sync.waitgroup, which is often used in concurrent programming, where wg.wait () is used to tell the caller to Wait and Wait for all expected tasks to execute WG.done () to implement a blocking Wait framework. It is followed by a sync.errgroup package that returns the error returned by task execution at wait()
- Wait for the subtask to complete
- The catch subtask err decides what to do
This design model works well in a divide-and-conquer scenario where a task is broken up into chunks, similar to MapReduce or ‘Scatter and Gather’. Such as segmental download, paging search and so on.
When error occurs, context cancelled, so use select will fine, “scatter and gather”
Usage: errGroup usually comes with an errCtx context to inform coroutines within the same group that the processing logic can be controlled by code.
// Create an errGroup and an errCtx context
eg, errCtx := errgroup.WithContext(context.Background())
Copy the code
Start with eg.go (), passing a function with the signature func() error as an argument:
eg.Go(func(a) error {
// Any business error
return errors.New("egError")})Copy the code
Internal analysis: Look at the source code to see that g.cancel() is called when an error occurs, corresponding to errctx.done (), which returns the err context above.
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func(a) error) {
g.wg.Add(1)
go func(a) {
defer g.wg.Done()
// If the task returns err, explicitly call the g-bound cancel() function
iferr := f(); err ! =nil {
g.errOnce.Do(func(a) {
Err is not Shadowed, that is, only the first error is reported
g.err = err
ifg.cancel ! =nil {
/ / trigger errCtx. Done ()
g.cancel()
}
})
}
}()
}
Copy the code
The sample demo:
const (
CON = 10
)
func main(a) {
var eg *errgroup.Group
eg, errCtx := errgroup.WithContext(context.Background())
for i := 0; i < CON; i++ {
i := i
eg.Go(func(a) error {
if i == 3 {
return errors.New(fmt.Sprintf("Mock err: %d", i))
}
select {
// Make the Done branch judge the hit after the Mock Err and wait 1s
case <-time.After(time.Duration(1) * time.Second):
case <-errCtx.Done():
fmt.Println("meet err in job:", i)
return errCtx.Err()
}
fmt.Println(i)
return nil})}// wait for done or err occurs
if err := eg.Wait(); err == nil {
log.Print("all looks good.")}else {
log.Printf("errors occurs: %v", err)
}
return
}
Copy the code
Output: As you can see, after the coroutine I equals 3 throws an error, the other coroutine errctx. Done() branches in the group all hit, so they return the first error caught by the errGroup directly.
2021/06/24 07:24:55 meet err in job: 9
2021/06/24 07:24:55 meet err in job: 1
2021/06/24 07:24:55 meet err in job: 4
2021/06/24 07:24:55 meet err in job: 2
2021/06/24 07:24:55 meet err in job: 0
2021/06/24 07:24:55 meet err in job: 7
2021/06/24 07:24:55 meet err in job: 8
2021/06/24 07:24:55 meet err in job: 5
2021/06/24 07:24:55 meet err in job: 6
2021/06/24 07:24:55 errors occurs: Mock err: 3
Copy the code
The HTTP Header parameter
HTTP / 1.1
In addition to HTTP1.0, HTTP/1.1 provides long connections. Another upgrade is that HTTP/1.1 enables breakpoint continuation, which means that requesters can request only a portion of a resource through the Range tag.
The Header parameter
Noun explanation | meaning |
---|---|
Range | For example, Range: bytes=0 to 1023, indicating the first 1024 bytes |
Content-Range | Content-range: bytes 0-1023/2047. The server returns Header, indicating the Range of requests and the total file size. |
Status code: 206 | After using Range to request a file Range, the server typically returns 206 indicating support for resumable breakpoints |
Content-Disposition | Text description, in this case to get the file name (also parsed from the URL link) |
The sample demo:
- Start the local file server
// Start the local file server and open 127.0.0.1:8899 to view the list of shared files
http.Handle("/", http.FileServer(http.Dir("D:\\ Test download \\serverFile")))
if err := http.ListenAndServe(": 8899".nil); err ! =nil {
fmt.Println("err:", err)
}
Copy the code
- Related constants
// basic/downloader/mdl/constansts.go
const (
HttpCodeOK = 200
HttpCodePartialContent = 206
HttpHeaderRange = "Range"
HttpHeaderContentLength = "Content-Length"
HttpHeaderContentRange = "Content-Range"
HttpHeaderContentDisposition = "Content-Disposition"
HttpHeaderRangeFormat = "bytes=%d-%d"
)
Copy the code
- Parse target file information (get file size/whether breakpoint continuation is supported)
const (
DOWNLOAD_URL = "http://127.0.0.1:8899/demo.zip"
SAVE_PATH = "D:\\ test download \\download"
CON = 8
RETRY_COUNT = 5
)
// ...
// Construct the request and set the header
req, err := buildReq(ctx, reqURL)
iferr ! =nil {
return nil, err
}
// Access only one byte to test whether the resource supports Range requests
req.Header.Set(mdl.HttpHeaderRange, fmt.Sprintf(mdl.HttpHeaderRangeFormat, 0.0))
resp, err := http.DefaultClient.Do(req)
// Resolve whether breakpoint continuation/file size is supported
if resp.StatusCode == mdl.HttpCodePartialContent {
// Support breakpoint download
res.Range = true
// Obtain the size from content-range :bytes 0-0/137783533
totalSize := path.Base(resp.Header.Get(mdl.HttpHeaderContentRange))
}
Copy the code
File to read and write
This part is the download process, mainly the operation of files
-
File creation gets the file size after the last initial download link, so you can pre-create the file and allocate the total number of bytes.
func touch(fileName string, size int64) (file *os.File, err error) { // Create a file file, err = os.Create(fileName) if size > 0 { // Allocate the size err := os.Truncate(fileName, size) iferr ! =nil { return nil, err } } return } Copy the code
-
When the number of concurrent files is greater than 0 and the file protocol supports resumable breakpoints, multiple coroutines concurrent downloads are enabled, which leads to a confusion. Core problem: multiple coroutines write the same file concurrently, how to avoid locking and improve efficiency?
When I first pulled the project code, the author had already optimized and provided the final solution, and I “preempted” the problem by ignoring it, only to find out later when I looked at the commit record and issue list that the author had originally made the mutex.
Part of the code is as follows:
Later, we recall that it doesn’t matter if we operate the file concurrently, because we created the file and allocated the total size at the beginning, and then the subcoroutine only needs to write according to the subscript range of the file byte. This has nothing to do with the write order, so there is no need to lock. (I later asked the author to confirm this conjecture again.)
The final code implemented by the author is as follows, which defines the blocks of each file segmentation, and the subscript is used to apply the file interval to the server:
type Chunk struct { Status Status // File block download status Begin int64 // start subscript End int64 // end the subscript Downloaded int64 // Current file block downloaded bytes, used for file appending or error breakpoint continuation } Copy the code
Define an array of file blocks and assign intervals:
var ( // Split file blocks chunks []*mdl.Chunk // Number of partitions ckTol int ) // Support sharding if res.Range { // Download 8 coroutines ckTol = 8 chunks = make([]*mdl.Chunk, ckTol) partSize := res.TotalSize / int64(ckTol) for i := 0; i < ckTol; i++ { var ( begin = partSize * int64(i) end int64 ) if i == (ckTol - 1) { end = res.TotalSize - 1 } else { end = begin + partSize - 1 } ck := mdl.NewChunk(begin, end) chunks[i] = ck } } else { ckTol = 1 // Single connection download chunks = make([]*mdl.Chunk, ckTol) chunks[0] = mdl.NewChunk(0.0)}Copy the code
Download details:
func fetch(ctx context.Context, res *mdl.Resource, file *os.File, chks []*mdl.Chunk, c int, doneCh chan error) { // Use errGroup to catch exceptions and notify the external doneCh channel eg, _ := errgroup.WithContext(ctx) for i := 0; i < c; i++ { // Pay attention to closure i := i eg.Go(func(a) error { // Concurrent download return fetchChunk(ctx, res, file, i, chks) }) } go func(a) { // error from errgroup err := eg.Wait() // Close the file file.Close() // Receive fetchChunk() internal state doneCh <- err }() return } func fetchChunk(ctx context.Context, res *mdl.Resource, file *os.File, index int, chks []*mdl.Chunk) (err error) { ck := chks[index] req, err := buildReq(ctx, DOWNLOAD_URL) iferr ! =nil { return err } var ( client = http.DefaultClient buf = make([]byte.8192))/************** Start of retry interval **************/ // Set the header depending on whether to block the download // - Set the file range to header based on the current chunk // - Determine the request to return status // - Failed: Retry if the number of attempts is less than five // - success: // Write buf to file according to offset // - Success: return notifies the external // - Failed: Retry for less than five times for i := 0; i < RETRY_COUNT; i++ { var resp *http.Response if res.Range { req.Header.Set(mdl.HttpHeaderRange, fmt.Sprintf(mdl.HttpHeaderRangeFormat, chks[index].Begin+ck.Downloaded, chks[index].End)) } else { // Single connection retry without breakpoint continuation ck.Downloaded = 0 } // Get the range of bytes if err := func(a) error { resp, err = client.Do(req) iferr ! =nil { return err } ifresp.StatusCode ! = mdl.HttpCodeOK && resp.StatusCode ! = mdl.HttpCodePartialContent {return errors.New(fmt.Sprintf("%d,%s", resp.StatusCode, resp.Status)) } return nil} (); err ! =nil { continue } // Fetch buF from body and write to file, resetting the retry identifier i = 0 retry := false retry, err = func(a) (bool, error) { defer resp.Body.Close() for { n, err := resp.Body.Read(buf) if n > 0 { // Each write starts from the downloaded location, ensuring byte order _, err := file.WriteAt(buf[:n], ck.Begin+ck.Downloaded) iferr ! =nil { // File error do not retry return false, err } // Record downloaded for breakpoint continuation ck.Downloaded += int64(n) } iferr ! =nil { // err from read iferr ! = io.EOF {return true, err } break}}// success exit return false.nil} ()if! retry {break}}/************** End of retry interval **************/ // Notify the external return } Copy the code
In fact, the code logic is relatively simple, but the error retries are added, so it is a bit difficult to read. The key point is that the current value of the CK.Downloaded field is maintained and updated.
The code analysis
The author adjusted the original code and extracted some core functions in order to quickly understand its download ideas. The author used the more commonly used factory and adapter mode. If there is an opportunity to carry out analysis in the future, thanks again to the author monkeyWie for providing wheels.
Refer to the address
Coordinating goroutines – errGroup levelup.gitconnected.com/coordinatin… MonkeyWie/gopeed – core github.com/monkeyWie/g…