Coroutines, a more lightweight thread-based existence, are designed for concurrency.

I will not document the detailed concepts of coroutines in this article, which is more like an experimental comparison

Merge sort

If we want to show the efficiency of coroutines, we must find an application scenario, and can merge sort be implemented by coroutines? Of course you can.

Merge sort is one of the classic “divide and conquer” algorithms. First divide the array evenly until it cannot be divided, and then merge the small array in pairs. The pseudo-code of the algorithm can be written as:

int[] mergesort(int[] a) {
    if (a.length <= 1) return a
    
    mid = a.length / 2
    
    left = mergesort(a[:mid])
    right = mergesort(a[mid:])
    
    return merge(left, right)
}
Copy the code

The merge function merges two ordered arrays and is not expanded here. Looking at the code above, we can easily introduce concurrency advantages:

left = mergesort(a[:mid])
right = mergesort(a[mid:])
Copy the code

There is no need for synchronous computation at this point; the two functions can be executed asynchronously, as long as they are executed before the final merge.

Coroutine implementation of Go

With that in mind, let’s take a look at how golang does this. Go comes with Goroutine, which has a natural concurrency advantage and is easy to use. Just add the go keyword to the function you want to execute concurrently.

func run(a) {
    fmt.Println("goroutine is running")}func main {
    go run()
}
Copy the code

Of course, the above program probably has no output, because the main function’s goroutine runs too fast, the run method’s goroutine before the print character code, the program has finished exit.

And as I said before, you want to make sure that the recursive partition ends before the final merge, and simply adding the go keyword doesn’t work.

However, go also provides a solution. It is as simple as adding a lock and releasing the lock when the goroutine is finished. This way, the program does not exit until the goroutine is finished.

var wg = sync.WaitGroup{}

func run(a) {
    fmt.Println("goroutine is running")
    wg.Done()
}

func main {
    wg.Add(1)
    go run()
    wg.Wait()
}
Copy the code

Wg variable is a synchronous method, it can be used like this, when we request a goroutine, we increment it by 1, when the goroutine completes, we decrease it by 1, its Wait() will block until the variable value is 0.

Thus, we can write the first version mergesort with Go:

// MergeSortConcurrently merge sort with goroutine
func MergeSortConcurrently(array []int) []int {
   if len(array) <= 1 {
      return array
   }

   mid := len(array) / 2
   wg := sync.WaitGroup{}
   wg.Add(2)

   var left []int
   var right []int

   go func(a) {
      left = MergeSortConcurrently(array[:mid])
      wg.Done()
   }()
   go func(a) {
      right = MergeSortConcurrently(array[mid:])
      wg.Done()
   }()

   wg.Wait()
   return merge(left, right)
}
Copy the code

Will it be a problem? We could write a unit test to compare with Mergesort without Goroutine.

// MergeSort merge sort implement
func MergeSort(array []int) []int {
   if len(array) <= 1 {
      return array
   }

   mid := len(array) / 2
   var left []int
   var right []int

   left = MergeSort(array[:mid])
   right = MergeSort(array[mid:])

   return merge(left, right)
}
Copy the code

The unit tests are as follows:

var a []int

func init(a) {
   for i := 0; i < 1000000; i++ {
      a = append(a, rand.Int())
   }
}

func BenchmarkMergeSort(b *testing.B) {
   for i := 0; i < b.N; i++ {
      MergeSort(a)
   }
}

func BenchmarkMergeSortConcurrently(b *testing.B) {
   for i := 0; i < b.N; i++ {
      MergeSortConcurrently(a)
   }
}
Copy the code

Run unit test go test-test. bench=”.*”

goos: darwin
goarch: amd64
pkg: go-py-coroutine
cpu: Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
BenchmarkMergeSort
BenchmarkMergeSort-4               	       7	 153933946 ns/op
BenchmarkMergeSortConcurrently
BenchmarkMergeSortConcurrently-4   	       1	1348671941 ns/op
PASS
Copy the code

The result was not as we expected, but worse performance with the addition of Goroutine. What’s going on here? It seems that in our quest to make calculations concurrent, we frantically applied for Goroutine. For each step of the recursion, we apply two Goroutines. This ends up with millions of Goroutines queueing for the CPU, making the code slower.

So how do we get the performance advantage of concurrent code without having to set up lots of Goroutines? A good way to limit concurrency in GO is to use Buffered Channel Semaphore. Buffering channels in GO are a nice and simple way to block execution based on the number of concurrent operation units we want.

We set a Channel of capacity 100, and when we generate goroutines to perform asynchronous computations, if 100 goroutines are already busy computations, we revert to the synchronous version using MergeSort:

var sem = make(chan struct{}, 100)

// MergeSortConcurrentlyWithChannel merge sort with goroutine and channel
func MergeSortConcurrentlyWithChannel(array []int) []int {
   if len(array) <= 1 {
      return array
   }

   mid := len(array) / 2
   wg := sync.WaitGroup{}
   wg.Add(2)

   var left []int
   var right []int

   select {
   case sem <- struct{} {} :go func(a) {
         left = MergeSortConcurrentlyWithChannel(array[:mid])
         <-sem
         wg.Done()
      }()
   default:
      left = MergeSort(array[:mid])
      wg.Done()
   }

   select {
   case sem <- struct{} {} :go func(a) {
         right = MergeSortConcurrentlyWithChannel(array[mid:])
         <-sem
         wg.Done()
      }()
   default:
      right = MergeSort(array[mid:])
      wg.Done()
   }

   wg.Wait()
   return merge(left, right)
}
Copy the code

Unit test results:

goos: darwin
goarch: amd64
pkg: go-py-coroutine
cpu: Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
BenchmarkMergeSort
BenchmarkMergeSort-4                          	       7	 154325019 ns/op
BenchmarkMergeSortConcurrently
BenchmarkMergeSortConcurrently-4              	       1	1138546544 ns/op
BenchmarkMergeSortConcurrentlyWithChannel
BenchmarkMergeSortConcurrentlyWithChannel-4   	      12	 101670450 ns/op
PASS
Copy the code

As you can see, performance has improved.

Coroutine implementation of Python

Python coroutines are declared with async/await syntax and are the recommended way to write asyncio applications.

import asyncio

async def main() :
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(main())
Copy the code

The points to note here are:

  • async: to declare that a function is a coroutine
  • await: to wait for the end of a coroutine function

You can compare this with the go keyword and WaitGroup. This makes it easy to write the Python version of the coroutine merge sort:

async def merge_sort_concurrently(array: list) - >list:
    if len(array) <= 1:
        return array

    mid = int(len(array) / 2)

    left = await merge_sort_concurrently(array[:mid])
    right = await merge_sort_concurrently(array[mid:])

    return merge(left, right)
    
def merge(left: list, right: list) - >list:
    ret = [0 for _ in range(len(left) + len(right))]

    l_ptr, r_ptr, ptr = 0.0.0
    while l_ptr < len(left) or r_ptr < len(right):
        if l_ptr == len(left):
            ret[ptr:] = right[r_ptr:]
            break
        if r_ptr == len(right):
            ret[ptr:] = left[l_ptr:]
            break
        if left[l_ptr] < right[r_ptr]:
            ret[ptr] = left[l_ptr]
            l_ptr += 1
        else:
            ret[ptr] = right[r_ptr]
            r_ptr += 1
        ptr += 1
    return ret
Copy the code