preface

Before, I happened to find a download implemented by Go in Github community. As THERE was no previous research on this aspect, I took relevant open source code out of curiosity to understand its internal implementation principle, and conducted debugging and analysis during this period as a reference template for personal learning. Thanks to monkeyWie’s open source project, Gopede-Core.

Train of thought

After a preliminary review of the relevant code logic, the main process of the downloader is summarized as follows:

  1. Initiate a request to obtain the file name, file size, and status of the download link, such as whether resumable is supported.
  2. Support breakpoint continuation?
    • If yes, each coroutine is assigned a download task range independently based on the number of concurrent tasks
    • Not supported, single coroutine download at the same time
  3. Start concurrent download, get file byte range from HTTP request header, divide and conquer.

Related technical points

Here is a list of some of the basics referenced in the project, familiar with the usage and then talk about the implementation of the specific application in the project.

ErrGroup

There is a common structure in the Go native sync package, sync.waitgroup, which is often used in concurrent programming, where wg.wait () is used to tell the caller to Wait and Wait for all expected tasks to execute WG.done () to implement a blocking Wait framework. It is followed by a sync.errgroup package that returns the error returned by task execution at wait()

  • Wait for the subtask to complete
  • The catch subtask err decides what to do

This design model works well in a divide-and-conquer scenario where a task is broken up into chunks, similar to MapReduce or ‘Scatter and Gather’. Such as segmental download, paging search and so on.

When error occurs, context cancelled, so use select will fine, “scatter and gather”

Usage: errGroup usually comes with an errCtx context to inform coroutines within the same group that the processing logic can be controlled by code.

// Create an errGroup and an errCtx context
eg, errCtx := errgroup.WithContext(context.Background())
Copy the code

Start with eg.go (), passing a function with the signature func() error as an argument:

eg.Go(func(a) error {
    // Any business error
	return errors.New("egError")})Copy the code

Internal analysis: Look at the source code to see that g.cancel() is called when an error occurs, corresponding to errctx.done (), which returns the err context above.

// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func(a) error) {
	g.wg.Add(1)

	go func(a) {
		defer g.wg.Done()
        // If the task returns err, explicitly call the g-bound cancel() function
		iferr := f(); err ! =nil {
			g.errOnce.Do(func(a) {
			    Err is not Shadowed, that is, only the first error is reported
				g.err = err
				ifg.cancel ! =nil {
				    / / trigger errCtx. Done ()
					g.cancel()
				}
			})
		}
	}()
}
Copy the code

The sample demo:

const (
	CON = 10
)

func main(a) {
    var eg *errgroup.Group
	eg, errCtx := errgroup.WithContext(context.Background())
	for i := 0; i < CON; i++ {
		i := i
		eg.Go(func(a) error {
			if i == 3 {
				return errors.New(fmt.Sprintf("Mock err: %d", i))
			}
			select {
			// Make the Done branch judge the hit after the Mock Err and wait 1s
			case <-time.After(time.Duration(1) * time.Second):
			case <-errCtx.Done():
			    fmt.Println("meet err in job:", i)
				return errCtx.Err()
			}
			fmt.Println(i)
			return nil})}// wait for done or err occurs
	if err := eg.Wait(); err == nil {
		log.Print("all looks good.")}else {
		log.Printf("errors occurs: %v", err)
	}
	return
}
Copy the code

Output: As you can see, after the coroutine I equals 3 throws an error, the other coroutine errctx. Done() branches in the group all hit, so they return the first error caught by the errGroup directly.

2021/06/24 07:24:55 meet err in job: 9
2021/06/24 07:24:55 meet err in job: 1
2021/06/24 07:24:55 meet err in job: 4
2021/06/24 07:24:55 meet err in job: 2
2021/06/24 07:24:55 meet err in job: 0
2021/06/24 07:24:55 meet err in job: 7
2021/06/24 07:24:55 meet err in job: 8
2021/06/24 07:24:55 meet err in job: 5
2021/06/24 07:24:55 meet err in job: 6
2021/06/24 07:24:55 errors occurs: Mock err: 3
Copy the code

The HTTP Header parameter

HTTP / 1.1

In addition to HTTP1.0, HTTP/1.1 provides long connections. Another upgrade is that HTTP/1.1 enables breakpoint continuation, which means that requesters can request only a portion of a resource through the Range tag.

The Header parameter

Noun explanation meaning
Range For example, Range: bytes=0 to 1023, indicating the first 1024 bytes
Content-Range Content-range: bytes 0-1023/2047. The server returns Header, indicating the Range of requests and the total file size.
Status code: 206 After using Range to request a file Range, the server typically returns 206 indicating support for resumable breakpoints
Content-Disposition Text description, in this case to get the file name (also parsed from the URL link)

The sample demo:

  • Start the local file server
// Start the local file server and open 127.0.0.1:8899 to view the list of shared files
http.Handle("/", http.FileServer(http.Dir("D:\\ Test download \\serverFile")))
if err := http.ListenAndServe(": 8899".nil); err ! =nil {
	fmt.Println("err:", err)
}
Copy the code
  • Related constants
// basic/downloader/mdl/constansts.go
const (
	HttpCodeOK             = 200
	HttpCodePartialContent = 206

	HttpHeaderRange              = "Range"
	HttpHeaderContentLength      = "Content-Length"
	HttpHeaderContentRange       = "Content-Range"
	HttpHeaderContentDisposition = "Content-Disposition"

	HttpHeaderRangeFormat = "bytes=%d-%d"
)
Copy the code
  • Parse target file information (get file size/whether breakpoint continuation is supported)
const (
	DOWNLOAD_URL = "http://127.0.0.1:8899/demo.zip"
	SAVE_PATH    = "D:\\ test download \\download"
	CON          = 8
	RETRY_COUNT  = 5
)

// ...

// Construct the request and set the header
req, err := buildReq(ctx, reqURL)
iferr ! =nil {
	return nil, err
}

// Access only one byte to test whether the resource supports Range requests
req.Header.Set(mdl.HttpHeaderRange, fmt.Sprintf(mdl.HttpHeaderRangeFormat, 0.0))
resp, err := http.DefaultClient.Do(req)

// Resolve whether breakpoint continuation/file size is supported
if resp.StatusCode == mdl.HttpCodePartialContent {
	// Support breakpoint download
	res.Range = true
	// Obtain the size from content-range :bytes 0-0/137783533
	totalSize := path.Base(resp.Header.Get(mdl.HttpHeaderContentRange))
}
Copy the code

File to read and write

This part is the download process, mainly the operation of files

  • File creation gets the file size after the last initial download link, so you can pre-create the file and allocate the total number of bytes.

    func touch(fileName string, size int64) (file *os.File, err error) {
        // Create a file
    	file, err = os.Create(fileName)
    	if size > 0 {
    	    // Allocate the size
    		err := os.Truncate(fileName, size)
    		iferr ! =nil {
    			return nil, err
    		}
    	}
    	return
    }
    Copy the code
  • When the number of concurrent files is greater than 0 and the file protocol supports resumable breakpoints, multiple coroutines concurrent downloads are enabled, which leads to a confusion. Core problem: multiple coroutines write the same file concurrently, how to avoid locking and improve efficiency?

    When I first pulled the project code, the author had already optimized and provided the final solution, and I “preempted” the problem by ignoring it, only to find out later when I looked at the commit record and issue list that the author had originally made the mutex.

    Part of the code is as follows:

    Later, we recall that it doesn’t matter if we operate the file concurrently, because we created the file and allocated the total size at the beginning, and then the subcoroutine only needs to write according to the subscript range of the file byte. This has nothing to do with the write order, so there is no need to lock. (I later asked the author to confirm this conjecture again.)

    The final code implemented by the author is as follows, which defines the blocks of each file segmentation, and the subscript is used to apply the file interval to the server:

    type Chunk struct {
    	Status Status       // File block download status
    	Begin  int64        // start subscript
    	End    int64        // end the subscript
    	Downloaded int64    // Current file block downloaded bytes, used for file appending or error breakpoint continuation
    }
    Copy the code

    Define an array of file blocks and assign intervals:

    var (
    	// Split file blocks
    	chunks []*mdl.Chunk
    	// Number of partitions
    	ckTol int
    )
    // Support sharding
    if res.Range {
        // Download 8 coroutines
    	ckTol = 8
    	chunks = make([]*mdl.Chunk, ckTol)
    	partSize := res.TotalSize / int64(ckTol)
    	for i := 0; i < ckTol; i++ {
    		var (
    			begin = partSize * int64(i)
    			end   int64
    		)
    		if i == (ckTol - 1) {
    			end = res.TotalSize - 1
    		} else {
    			end = begin + partSize - 1
    		}
    		ck := mdl.NewChunk(begin, end)
    		chunks[i] = ck
    	}
    } else {
    	ckTol = 1
    	// Single connection download
    	chunks = make([]*mdl.Chunk, ckTol)
    	chunks[0] = mdl.NewChunk(0.0)}Copy the code

    Download details:

    func fetch(ctx context.Context, res *mdl.Resource,
        file *os.File, chks []*mdl.Chunk, c int, doneCh chan error) {
        // Use errGroup to catch exceptions and notify the external doneCh channel
    	eg, _ := errgroup.WithContext(ctx)
    	for i := 0; i < c; i++ {
    	    // Pay attention to closure
    		i := i
    		eg.Go(func(a) error {
    		    // Concurrent download
    			return fetchChunk(ctx, res, file, i, chks)
    		})
    	}
    
    	go func(a) {
    		// error from errgroup
    		err := eg.Wait()
    		// Close the file
    		file.Close()
    		// Receive fetchChunk() internal state
    		doneCh <- err
    	}()
    	return
    }
    
    func fetchChunk(ctx context.Context, res *mdl.Resource, file *os.File, index int, chks []*mdl.Chunk) (err error) {
    	ck := chks[index]
    	req, err := buildReq(ctx, DOWNLOAD_URL)
    	iferr ! =nil {
    		return err
    	}
    	var (
    		client = http.DefaultClient
    		buf    = make([]byte.8192))/************** Start of retry interval **************/
    	// Set the header depending on whether to block the download
    	// - Set the file range to header based on the current chunk
    	// - Determine the request to return status
    	// - Failed: Retry if the number of attempts is less than five
    	// - success:
    	// Write buf to file according to offset
    	// - Success: return notifies the external
    	// - Failed: Retry for less than five times
    	for i := 0; i < RETRY_COUNT; i++ {
    		var resp *http.Response
    		if res.Range {
    			req.Header.Set(mdl.HttpHeaderRange,
    				fmt.Sprintf(mdl.HttpHeaderRangeFormat, chks[index].Begin+ck.Downloaded, chks[index].End))
    		} else {
    			// Single connection retry without breakpoint continuation
    			ck.Downloaded = 0
    		}
    
    		// Get the range of bytes
    		if err := func(a) error {
    			resp, err = client.Do(req)
    			iferr ! =nil {
    				return err
    			}
    			ifresp.StatusCode ! = mdl.HttpCodeOK && resp.StatusCode ! = mdl.HttpCodePartialContent {return errors.New(fmt.Sprintf("%d,%s", resp.StatusCode, resp.Status))
    			}
    			return nil} (); err ! =nil {
    			continue
    		}
    
    		// Fetch buF from body and write to file, resetting the retry identifier
    		i = 0
    		retry := false
    		retry, err = func(a) (bool, error) {
    			defer resp.Body.Close()
    			for {
    				n, err := resp.Body.Read(buf)
    				if n > 0 {
    				    // Each write starts from the downloaded location, ensuring byte order
    					_, err := file.WriteAt(buf[:n], ck.Begin+ck.Downloaded)
    					iferr ! =nil {
    						// File error do not retry
    						return false, err
    					}
    					// Record downloaded for breakpoint continuation
    					ck.Downloaded += int64(n)
    				}
    				iferr ! =nil {
    					// err from read
    					iferr ! = io.EOF {return true, err
    					}
    					break}}// success exit
    			return false.nil} ()if! retry {break}}/************** End of retry interval **************/
    
    	// Notify the external
    	return
    }
    
    Copy the code

    In fact, the code logic is relatively simple, but the error retries are added, so it is a bit difficult to read. The key point is that the current value of the CK.Downloaded field is maintained and updated.

The code analysis

The author adjusted the original code and extracted some core functions in order to quickly understand its download ideas. The author used the more commonly used factory and adapter mode. If there is an opportunity to carry out analysis in the future, thanks again to the author monkeyWie for providing wheels.

Refer to the address

Coordinating goroutines – errGroup levelup.gitconnected.com/coordinatin… MonkeyWie/gopeed – core github.com/monkeyWie/g…