Go has officially designed a semaphore library

preface

Hello, everyone, I’m Asong. In my last article do Not abuse Goroutine, I discovered that the Go language extension provides Semaphore, a weighted Semaphore library, with which we can implement a “work pool” that controls a certain number of concurrent goroutine jobs. Curious about the source code, I took a close look at the library and parsed it over the weekend, which is recorded here.

What is a semaphore

If I want to know what something is, I like to search baidu Baike, type in “semaphore”, and the answer will come.

Baidu Encyclopedia explanation:

A Semaphore, sometimes called a Semaphore, is a device used in multithreaded environments to ensure that two or more critical sections of code are not called concurrently. Before entering a critical section, a thread must acquire a Semaphore; once the critical section is complete, Other threads that want to enter the critical code segment must wait until the first thread releases the semaphore. To do this, create a semaphore VI. Then place Acquire Semaphore VI and Release Semaphore VI at the beginning and end of each key snippet. Make sure that the Semaphore VI references the Semaphore originally created.

A semaphore is a variable or abstract data type used to control the atomicity of access to a common resource by multiple processes in a concurrent system. Semaphores fall into two main categories:

  • Binary semaphore: As the name implies, there are only two values0or1, is equivalent to the mutex, and the value is1When resources are available, and the value is0The resource is locked and the process is blocked from continuing.
  • Count semaphore: A semaphore is an arbitrary integer that starts if the count of the counter is0, the created semaphore is an unreachable state if the count of the counter is greater than0, the created semaphore is the available state, and the total number of retrievals is equal to the value of the counter.

How semaphore works

Semaphores are maintained by the operating system. Semaphores can only perform two operations, waiting and sending signals. In summary, the core operation is PV operation:

  • P primitive: P is the first letter of the Dutch Proberen(to test). Is a blocking primitive responsible for switching the current process from a running state to a blocked state until another process wakes it up. The operation is: apply for a free resource (reduce the semaphore 1), if successful, exit; If it fails, the process is blocked.
  • V primitive: V is the first letter of the Dutch Verhogen(to increase). The wake up primitive, which is responsible for waking up a blocked process, has a parameter list containing information about the process waiting to be woken up. The operation is to release an occupied resource (increment the semaphore by 1), and if a blocked process is found, select one to wake up.

PV operations on semaphores are atomic operations, and no interrupts are allowed during PV primitive execution.

PV primitives operate on semaphores in three cases:

  • The semaphore is regarded as the remaining number of a certain type of shared resource to achieve access to a class of shared resources
  • Use semaphores as synchronization between processes
  • A semaphore is a locking flag that implements access to a shared variable

Without further analysis of the specific scenarios, let’s focus on Semaphore, the Go language extension package, and see how it is implemented.

Official Expansion PackSemaphore

We have always seen these functions when analyzing the Go language source code:

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
Copy the code

These functions are PV operations on semaphores, but they are for internal use in Go. If you want to use Semaphore, you can use the official extension package Semaphore, which is a weighted Semaphore. We will focus on this library.

Installation method: Go get -u golang.org/x/sync

The data structure

type Weighted struct {
	size    int64 // Set a maximum weight value
	cur     int64 // Identifies the number of resources that have been used
	mu      sync.Mutex // Provide critical section protection
	waiters list.List // Block the waiting caller list
}
Copy the code

Weighted is the core structure of semaphore library, which consists of four fields:

  • size: This represents the maximum power, in the creationWeightedObject to specify
  • cur: acts as a cursor to record the weights currently in use
  • mu: mutex, concurrent critical section protection
  • waiters: blocks the waiting list of callers, using a linked list data structure to ensure first-in, first-out order, and storing datawaiterObject,waiterThe data structure is as follows:
type waiter struct {
	n     int64 // Wait for the caller weight value
	ready chan<- struct{} // Close channel means wake up
}
Copy the code

There are only two fields:

  • n: This is the weight of the waiting caller
  • ready: This is onechannel, the use ofchannelthecloseMechanism implementation wake up

Semaphore also provides a way to create Weighted objects that are initialized with a maximum weight:

NewWeighted creates a NewWeighted semaphore for concurrent access with a given maximum weight.
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}
Copy the code

Block the method of getting weights –Acquire

Let’s get straight to the code:

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock() // Lock the critical section
	// There are resources available and no goroutine waiting for weights
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n / / the weighted
		s.mu.Unlock() / / releases the lock
		return nil
	}
	// The weight to be obtained is greater than the maximum weight
	if n > s.size {
		// Release the lock first to ensure that other Goroutine calls Acquire are not blocked
		s.mu.Unlock()
		// block and wait for the context to return
		<-ctx.Done()
		return ctx.Err()
	}
	// There are no resources available now
	// Create a channel for notification wake up
	ready := make(chan struct{})
	// Create the waiter object
	w := waiter{n: n, ready: ready}
	// Waiter in order
	elem := s.waiters.PushBack(w)
	// Release the lock, wait to wake up, do not block other goroutines
	s.mu.Unlock()

	// Block waiting to wake up
	select {
	/ / the context to shut down
	case <-ctx.Done():
		err := ctx.Err() // Get the context error message first
		s.mu.Lock()
		select {
		case <-ready:
			// Wake up after the context was closed, try to fix the queue and pretend we didn't cancel
			err = nil
		default:
			// Check if it is the first element
			isFront := s.waiters.Front() == elem
			// Remove the first element
			s.waiters.Remove(elem)
			// Notify other waiter if it is the first element and a resource is available
			if isFront && s.size > s.cur {
				s.notifyWaiters()
			}
		}
		s.mu.Unlock()
		return err
	// Wake up
	case <-ready:
		return nil}}Copy the code

Comments have been added to the code, so there are three main processes to summarize this method:

  • Process 1: When resources are available and goroutine has no waiting weights, go through the normal weighting process;

  • The goroutine never gets the semaphore, so it blocks waiting for the context to close.

  • Process 3: If the first two steps are ok, it means that the system has no resources available, then it needs to block wait to wake up, there is special logic in blocking wait to wake up;

    • Special logic 1: If you wake up after the context is closed, try to repair the queue by ignoring the cancel first.

      • Special Logic 2:contextWhen closed, the decision is made to notify subsequent wake-up callers based on whether resources are available. The purpose of this is to avoid different situationscontextControl differentgoroutineWhen not closedgoroutineWill not block, still execute, look at an example (becausegoroutinePreemptive scheduling, so this example will also be haphazard) :
      func main(a)  {
      	s := semaphore.NewWeighted(3)
      	ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2)
      	defer cancel()
      
      	for i :=0; i < 3; i++{
      			ifi ! =0{
      				go func(num int) {
      					if err := s.Acquire(ctx,3); err ! =nil{
      						fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
      						return
      					}
      					time.Sleep(2 * time.Second)
      					fmt.Printf("goroutine: %d run over\n",num)
      					s.Release(3)
      
      				}(i)
      			}else {
      				go func(num int) {
      					ct,cancel := context.WithTimeout(context.Background(), time.Second * 3)
      					defer cancel()
      					if err := s.Acquire(ct,3); err ! =nil{
      						fmt.Printf("goroutine: %d, err is %s\n", num, err.Error())
      						return
      					}
      					time.Sleep(3 * time.Second)
      					fmt.Printf("goroutine: %d run over\n",num)
      					s.Release(3)
      				}(i)
      			}
      
      	}
      	time.Sleep(10 * time.Second)
      }
      Copy the code

      In the above example, goroutine:0 uses CT objects for control and the timeout duration is 3s. Goroutine :1 and Goroutine :2 use CTX objects for control and the timeout duration is 2s. All three goroutine objects occupy the maximum number of resources. This means that only one Of the Goruotine s two goroutines will succeed and both of them will be blocked as the Goroutine is preemptive scheduling so we are not sure which gouroutine will be executed first. Here we assume that the first one to acquire a semaphore is a gouroutine:2, The list of callers blocking wait is in order: Goroutine :1 -> goroutine:0, because there is a 2s delay in goroutine:2, CTX will issue the Done signal, because goroutine:2 and Goroutine :1 are controlled by CTX. So goroutine:1 is canceled from the queue, but since goroutine:1 belongs to the first member of the queue, and since goroutine:2 has freed resources, goroutine:0 is awakened to continue execution.

      This way, goroutine can avoid permanent insomnia.

Does not block the method to get weights –TryAcquire

func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock() / / lock
	// Goroutine with resources available and no waiting for resources
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}
Copy the code

This method is much simpler, fetching a semaphore of weight n without blocking, returning true on success, false on failure and leaving the semaphore unchanged.

Release the weight

func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	// Release resources
	s.cur -= n
	// If the number of resources released is greater than the number of resources held, panic occurs
	if s.cur < 0 {
		s.mu.Unlock()
		panic("semaphore: released more than held")}// Notify other waiting callers
	s.notifyWaiters()
	s.mu.Unlock()
}
Copy the code

Here is a very common operation, mainly the release of resources, and at the same time to determine the security, if the release of resources is greater than the holding resources, panic will occur.

Wake up thewaiter

NotifyWaiters are called in both the Acquire and Release methods, so let’s examine the waiters:

func (s *Weighted) notifyWaiters(a) {
	for {
		// Gets the member of the queue waiting for the caller
		next := s.waiters.Front()
		// There are no more callers to notify
		if next == nil {
			break // No more waiters blocked.
		}

		// Assert the waiter information
		w := next.Value.(waiter)
		if s.size-s.cur < w.n {
			// If there are not enough resources for the next caller, continue to block the caller, following the first-in, first-out principle,
			// Avoid the waiter that requires a large number of resources from starving to death
			//
			// Consider a scenario that uses semaphores as read-write locks, with N tokens, N readers, and a writer
			// Each reader can Acquire a read lock through Acquire (1), and writer writes can Acquire a write lock through Acquire (N)
			If we allow readers to advance through the queue, writer will starve - there is always one token available for each reader
			break
		}

		// Get resources
		s.cur += w.n
		// Remove from the waiter list
		s.waiters.Remove(next)
		// Wake up waiter using channel's close mechanism
		close(w.ready)
	}
}
Copy the code

There is only one point to note here: Wake the Waiter first in first out (FIFO) to avoid the waiter that needs a large number of resources from starving to death.

When to useSemaphore

So far we’ve looked at Semaphore’s source code. It’s a few lines of code and cleverly packaged, so when should we choose to use it?

Semaphore is used to limit the number of goroutines and errgroup is used for concurrency control. Example:

const (
	limit = 2
) 

func main(a)  {
	serviceName := []string{
		"cart"."order"."account"."item"."menu",
	}
	eg,ctx := errgroup.WithContext(context.Background())
	s := semaphore.NewWeighted(limit)
	for index := range serviceName{
		name := serviceName[index]
		if err := s.Acquire(ctx,1); err ! =nil{
			fmt.Printf("Acquire failed and err is %s\n", err.Error())
			break
		}
		eg.Go(func(a) error {
			defer s.Release(1)
			return callService(name)
		})
	}

	iferr := eg.Wait(); err ! =nil{
		fmt.Printf("err is %s\n", err.Error())
		return
	}
	fmt.Printf("run success\n")}func callService(name string) error {
	fmt.Println("call ",name)
	time.Sleep(1 * time.Second)
	return nil
}
Copy the code

The results are as follows:

call  order
call  cart
call  account
call  item
call  menu
run success
Copy the code

conclusion

In this paper, we mainly appreciate the Go official extension library Semaphore implementation, his simple design ideas, only with dozens of lines to complete a perfect package, worth our learning. However, in actual business scenarios, we use Semaphore in only a few scenarios. We can use channel for most scenarios, but Semaphore is better for some scenarios. For example, in the previous article, we used channel+sync to control the number of goroutines. This is not a good implementation because we already have multiple Goroutines, but we only control the number of working Goroutines. If you switch to semaphore implementation, you can really control the number of Goroutines.

The code has been uploaded to github: github.com/asong2020/G…

Welcome to our official account: [Golang Dream Factory]

Recommended previous articles:

  • Learning channel design: From getting started to giving up
  • Detail memory alignment
  • Which of the Go languages do you use to allocate memory, new or make?
  • Source analysis panic and recover, do not understand you hit me!
  • Interviewer: Matsuko is here to talk about memory escape
  • Interviewer: Can you talk about the conversion between string and []byte?
  • Interviewer: What is the result of two nil comparisons?
  • Errgroup for concurrent programming packages