Go has officially designed a semaphore library
preface
Hello, everyone, I’m Asong. In my last article do Not abuse Goroutine, I discovered that the Go language extension provides Semaphore, a weighted Semaphore library, with which we can implement a “work pool” that controls a certain number of concurrent goroutine jobs. Curious about the source code, I took a close look at the library and parsed it over the weekend, which is recorded here.
What is a semaphore
If I want to know what something is, I like to search baidu Baike, type in “semaphore”, and the answer will come.
Baidu Encyclopedia explanation:
A Semaphore, sometimes called a Semaphore, is a device used in multithreaded environments to ensure that two or more critical sections of code are not called concurrently. Before entering a critical section, a thread must acquire a Semaphore; once the critical section is complete, Other threads that want to enter the critical code segment must wait until the first thread releases the semaphore. To do this, create a semaphore VI. Then place Acquire Semaphore VI and Release Semaphore VI at the beginning and end of each key snippet. Make sure that the Semaphore VI references the Semaphore originally created.
A semaphore is a variable or abstract data type used to control the atomicity of access to a common resource by multiple processes in a concurrent system. Semaphores fall into two main categories:
- Binary semaphore: As the name implies, there are only two values
0
or1
, is equivalent to the mutex, and the value is1
When resources are available, and the value is0
The resource is locked and the process is blocked from continuing. - Count semaphore: A semaphore is an arbitrary integer that starts if the count of the counter is
0
, the created semaphore is an unreachable state if the count of the counter is greater than0
, the created semaphore is the available state, and the total number of retrievals is equal to the value of the counter.
How semaphore works
Semaphores are maintained by the operating system. Semaphores can only perform two operations, waiting and sending signals. In summary, the core operation is PV operation:
- P primitive: P is the first letter of the Dutch Proberen(to test). Is a blocking primitive responsible for switching the current process from a running state to a blocked state until another process wakes it up. The operation is: apply for a free resource (reduce the semaphore 1), if successful, exit; If it fails, the process is blocked.
- V primitive: V is the first letter of the Dutch Verhogen(to increase). The wake up primitive, which is responsible for waking up a blocked process, has a parameter list containing information about the process waiting to be woken up. The operation is to release an occupied resource (increment the semaphore by 1), and if a blocked process is found, select one to wake up.
PV operations on semaphores are atomic operations, and no interrupts are allowed during PV primitive execution.
PV primitives operate on semaphores in three cases:
- The semaphore is regarded as the remaining number of a certain type of shared resource to achieve access to a class of shared resources
- Use semaphores as synchronization between processes
- A semaphore is a locking flag that implements access to a shared variable
Without further analysis of the specific scenarios, let’s focus on Semaphore, the Go language extension package, and see how it is implemented.
Official Expansion PackSemaphore
We have always seen these functions when analyzing the Go language source code:
func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
Copy the code
These functions are PV operations on semaphores, but they are for internal use in Go. If you want to use Semaphore, you can use the official extension package Semaphore, which is a weighted Semaphore. We will focus on this library.
Installation method: Go get -u golang.org/x/sync
The data structure
type Weighted struct {
size int64 // Set a maximum weight value
cur int64 // Identifies the number of resources that have been used
mu sync.Mutex // Provide critical section protection
waiters list.List // Block the waiting caller list
}
Copy the code
Weighted is the core structure of semaphore library, which consists of four fields:
size
: This represents the maximum power, in the creationWeighted
Object to specifycur
: acts as a cursor to record the weights currently in usemu
: mutex, concurrent critical section protectionwaiters
: blocks the waiting list of callers, using a linked list data structure to ensure first-in, first-out order, and storing datawaiter
Object,waiter
The data structure is as follows:
type waiter struct {
n int64 // Wait for the caller weight value
ready chan<- struct{} // Close channel means wake up
}
Copy the code
There are only two fields:
n
: This is the weight of the waiting callerready
: This is onechannel
, the use ofchannel
theclose
Mechanism implementation wake up
Semaphore also provides a way to create Weighted objects that are initialized with a maximum weight:
NewWeighted creates a NewWeighted semaphore for concurrent access with a given maximum weight.
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
Copy the code
Block the method of getting weights –Acquire
Let’s get straight to the code:
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock() // Lock the critical section
// There are resources available and no goroutine waiting for weights
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n / / the weighted
s.mu.Unlock() / / releases the lock
return nil
}
// The weight to be obtained is greater than the maximum weight
if n > s.size {
// Release the lock first to ensure that other Goroutine calls Acquire are not blocked
s.mu.Unlock()
// block and wait for the context to return
<-ctx.Done()
return ctx.Err()
}
// There are no resources available now
// Create a channel for notification wake up
ready := make(chan struct{})
// Create the waiter object
w := waiter{n: n, ready: ready}
// Waiter in order
elem := s.waiters.PushBack(w)
// Release the lock, wait to wake up, do not block other goroutines
s.mu.Unlock()
// Block waiting to wake up
select {
/ / the context to shut down
case <-ctx.Done():
err := ctx.Err() // Get the context error message first
s.mu.Lock()
select {
case <-ready:
// Wake up after the context was closed, try to fix the queue and pretend we didn't cancel
err = nil
default:
// Check if it is the first element
isFront := s.waiters.Front() == elem
// Remove the first element
s.waiters.Remove(elem)
// Notify other waiter if it is the first element and a resource is available
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
// Wake up
case <-ready:
return nil}}Copy the code
Comments have been added to the code, so there are three main processes to summarize this method:
-
Process 1: When resources are available and goroutine has no waiting weights, go through the normal weighting process;
-
The goroutine never gets the semaphore, so it blocks waiting for the context to close.
-
Process 3: If the first two steps are ok, it means that the system has no resources available, then it needs to block wait to wake up, there is special logic in blocking wait to wake up;
-
Special logic 1: If you wake up after the context is closed, try to repair the queue by ignoring the cancel first.
- Special Logic 2:
context
When closed, the decision is made to notify subsequent wake-up callers based on whether resources are available. The purpose of this is to avoid different situationscontext
Control differentgoroutine
When not closedgoroutine
Will not block, still execute, look at an example (becausegoroutine
Preemptive scheduling, so this example will also be haphazard) :
func main(a) { s := semaphore.NewWeighted(3) ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2) defer cancel() for i :=0; i < 3; i++{ ifi ! =0{ go func(num int) { if err := s.Acquire(ctx,3); err ! =nil{ fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) return } time.Sleep(2 * time.Second) fmt.Printf("goroutine: %d run over\n",num) s.Release(3) }(i) }else { go func(num int) { ct,cancel := context.WithTimeout(context.Background(), time.Second * 3) defer cancel() if err := s.Acquire(ct,3); err ! =nil{ fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) return } time.Sleep(3 * time.Second) fmt.Printf("goroutine: %d run over\n",num) s.Release(3) }(i) } } time.Sleep(10 * time.Second) } Copy the code
In the above example, goroutine:0 uses CT objects for control and the timeout duration is 3s. Goroutine :1 and Goroutine :2 use CTX objects for control and the timeout duration is 2s. All three goroutine objects occupy the maximum number of resources. This means that only one Of the Goruotine s two goroutines will succeed and both of them will be blocked as the Goroutine is preemptive scheduling so we are not sure which gouroutine will be executed first. Here we assume that the first one to acquire a semaphore is a gouroutine:2, The list of callers blocking wait is in order: Goroutine :1 -> goroutine:0, because there is a 2s delay in goroutine:2, CTX will issue the Done signal, because goroutine:2 and Goroutine :1 are controlled by CTX. So goroutine:1 is canceled from the queue, but since goroutine:1 belongs to the first member of the queue, and since goroutine:2 has freed resources, goroutine:0 is awakened to continue execution.
This way, goroutine can avoid permanent insomnia.
- Special Logic 2:
-
Does not block the method to get weights –TryAcquire
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock() / / lock
// Goroutine with resources available and no waiting for resources
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
Copy the code
This method is much simpler, fetching a semaphore of weight n without blocking, returning true on success, false on failure and leaving the semaphore unchanged.
Release the weight
func (s *Weighted) Release(n int64) {
s.mu.Lock()
// Release resources
s.cur -= n
// If the number of resources released is greater than the number of resources held, panic occurs
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")}// Notify other waiting callers
s.notifyWaiters()
s.mu.Unlock()
}
Copy the code
Here is a very common operation, mainly the release of resources, and at the same time to determine the security, if the release of resources is greater than the holding resources, panic will occur.
Wake up thewaiter
NotifyWaiters are called in both the Acquire and Release methods, so let’s examine the waiters:
func (s *Weighted) notifyWaiters(a) {
for {
// Gets the member of the queue waiting for the caller
next := s.waiters.Front()
// There are no more callers to notify
if next == nil {
break // No more waiters blocked.
}
// Assert the waiter information
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// If there are not enough resources for the next caller, continue to block the caller, following the first-in, first-out principle,
// Avoid the waiter that requires a large number of resources from starving to death
//
// Consider a scenario that uses semaphores as read-write locks, with N tokens, N readers, and a writer
// Each reader can Acquire a read lock through Acquire (1), and writer writes can Acquire a write lock through Acquire (N)
If we allow readers to advance through the queue, writer will starve - there is always one token available for each reader
break
}
// Get resources
s.cur += w.n
// Remove from the waiter list
s.waiters.Remove(next)
// Wake up waiter using channel's close mechanism
close(w.ready)
}
}
Copy the code
There is only one point to note here: Wake the Waiter first in first out (FIFO) to avoid the waiter that needs a large number of resources from starving to death.
When to useSemaphore
So far we’ve looked at Semaphore’s source code. It’s a few lines of code and cleverly packaged, so when should we choose to use it?
Semaphore is used to limit the number of goroutines and errgroup is used for concurrency control. Example:
const (
limit = 2
)
func main(a) {
serviceName := []string{
"cart"."order"."account"."item"."menu",
}
eg,ctx := errgroup.WithContext(context.Background())
s := semaphore.NewWeighted(limit)
for index := range serviceName{
name := serviceName[index]
if err := s.Acquire(ctx,1); err ! =nil{
fmt.Printf("Acquire failed and err is %s\n", err.Error())
break
}
eg.Go(func(a) error {
defer s.Release(1)
return callService(name)
})
}
iferr := eg.Wait(); err ! =nil{
fmt.Printf("err is %s\n", err.Error())
return
}
fmt.Printf("run success\n")}func callService(name string) error {
fmt.Println("call ",name)
time.Sleep(1 * time.Second)
return nil
}
Copy the code
The results are as follows:
call order
call cart
call account
call item
call menu
run success
Copy the code
conclusion
In this paper, we mainly appreciate the Go official extension library Semaphore implementation, his simple design ideas, only with dozens of lines to complete a perfect package, worth our learning. However, in actual business scenarios, we use Semaphore in only a few scenarios. We can use channel for most scenarios, but Semaphore is better for some scenarios. For example, in the previous article, we used channel+sync to control the number of goroutines. This is not a good implementation because we already have multiple Goroutines, but we only control the number of working Goroutines. If you switch to semaphore implementation, you can really control the number of Goroutines.
The code has been uploaded to github: github.com/asong2020/G…
Welcome to our official account: [Golang Dream Factory]
Recommended previous articles:
- Learning channel design: From getting started to giving up
- Detail memory alignment
- Which of the Go languages do you use to allocate memory, new or make?
- Source analysis panic and recover, do not understand you hit me!
- Interviewer: Matsuko is here to talk about memory escape
- Interviewer: Can you talk about the conversion between string and []byte?
- Interviewer: What is the result of two nil comparisons?
- Errgroup for concurrent programming packages