RWMutex is a read-write lock that supports parallel read serial writes. RWMutex has write priority. When a write operation occurs, only the ongoing read operation is allowed to execute and subsequent read operations are blocked.

Usage scenarios

RWMutex is used for a large number of concurrent reads and a small number of concurrent writes. Such as microservice configuration update, transaction route caching and other scenarios. RWMutex read/write locks have better read performance than Mutex Mutex locks.

The following uses the scenario where multiple coroutines concurrently read STR variables and one coroutine periodically updates STR variables every 100 milliseconds as an example to compare the performance of RWMutex read/write locks and Mutex Mutex Mutex locks.

// Implementation based on RWMutex
var rwLock sync.RWMutex
var str1 = "hello"

func readWithRWLock(a) string {
    rwLock.RLock()
    defer rwLock.RUnlock()
    return str1
}

func writeWithRWLock(a) {
    rwLock.Lock()
    str1 = time.Now().Format("20060102150405")
    rwLock.Unlock()
}

// Multiple coroutines read string variables in parallel and update string variables every 100ms
func BenchmarkRWMutex(b *testing.B) {
    ticker := time.NewTicker(100 * time.Millisecond)
    go func(a) {
        for range ticker.C {
            writeWithRWLock()
        }
    }()
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            readWithRWLock()
        }
    })
}
// Based on Mutex implementation
var lock sync.Mutex
var str2 = "hello"

func readWithMutex(a) string {
    lock.Lock()
    defer lock.Unlock()
    return str2
}

func writeWithMutex(a) {
    lock.Lock()
    str2 = time.Now().Format("20060102150405")
    lock.Unlock()
}

// Multiple coroutines read string variables in parallel and update string variables every 100ms
func BenchmarkMutex(b *testing.B) {
    ticker := time.NewTicker(100 * time.Millisecond)
    go func(a) {
        for range ticker.C {
            writeWithMutex()
        }
    }()
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            readWithMutex()
        }
    })
}
Copy the code

The RWMutex read/write lock and Mutex Mutex lock are compared. The results are as follows:

# go testThe results ofGo test-bench. -benchtime=10s benchmarkrwmutex-8 227611413 49.5 ns/op benchmarkmutex-8 135363408 87.8 ns/op PASS OK Demo of 37.800 sCopy the code

The source code parsing

RWMutex is a write-first read/write lock, as shown below:

  1. When write operation C occurs, read operation A and read operation B are executing, so write operation C is suspended.
  2. When read operation D occurs, read operation D is suspended because write operation C is waiting for the lock.
  3. Read operations A and B are complete. Because no read operation or write operation is being performed, write operation C is woken up.
  4. When read operation E occurs, read operation E is suspended because write operation C is executing.
  5. After write operation C is complete, read operation D and read operation E are woken up.

RWMutex structure

RWMutex consists of the following variables:

  1. rwmutexMaxReadersSaid:RWMutexThe maximum number of read coroutines that can be accepted exceedsrwmutexMaxReadersThen panic occurs;
  2. w:MutexMutex, used to implement mutual exclusion between write operations
  3. writerSem: write operation operation semaphore; When a read operation exists, the write operation is suspended. After all read operations are complete, the system passeswriterSemA semaphore wakes up a write operation;
  4. readerSem: Read operation semaphore; When a write operation exists, the read operation is suspended. After the write operation is complete, passreaderSemSemaphore wake up read operation;
  5. readerCount: Number of read operations being performed. Counting from 0 if there is no write operation, is positive; Counting from negative rwmutexMaxReaders when there is a write operation, is negative;
  6. readerWait: Write Operation Number of read operations to be performed. When you performLock()If there are read operations, the number of read operations is recorded in thereaderWaitAnd suspend the write operation. After the read operation is complete, it will be updatedreaderWaitwhenreaderWaitIs 0, wake up the write operation.
const rwmutexMaxReaders = 1 << 30

type RWMutex struct {
    w           Mutex  // Mutex Mutex, used to implement the mutual exclusion of write operations

    writerSem   uint32 // Write operation semaphore, used to wake up write operation
    readerSem   uint32 // Read operation semaphore, used for write operation to wake up read operation

    readerCount int32  // The number of read operations, counting from 0 if no write operation exists, counting from -rwmutexMaxReaders if write operation exists
    readerWait  int32  // Write Operation Number of waiting read operations
}
Copy the code

The Lock () method

The Lock method is used to obtain the Lock from a write operation as follows:

  1. To obtainwMutex to ensure that only one write operation can be performed at a time.
  2. willreaderCountUpdate to negative, blocking subsequent read operations;
  3. If there are active read operationsr ! = 0, the write operation is blockedruntime_SemacquireMutex;
func (rw *RWMutex) Lock(a) {
    // Write operations are mutually exclusive using the w mutex
    rw.w.Lock()
    // 1. Update readerCount to a negative value, indicating that there is a write operation. When readerCount is negative, new reads are suspended
    // 2.r indicates the number of read operations being performed
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // r ! = 0 indicates that read operations are being performed. Write operations can be performed only after all read operations are complete.
    ifr ! =0&& atomic.AddInt32(&rw.readerWait, r) ! =0 {
        // Suspend the write operation
        runtime_SemacquireMutex(&rw.writerSem, false.0)}}Copy the code

Unlock () method

The Unlock method is used to write a lock. It does the following:

  1. willreaderCountUpdated to a positive number, indicating that no active write operation exists.
    1. If the updatedreaderCountIf the value is greater than 0, the current write operation is blockedreaderCountTo wake up all blocked read operations.
  2. willwThe mutex is released to allow other write operations to execute.
func (rw *RWMutex) Unlock(a) {
    // Update readerCount to positive, counting from 0
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")}// Wake up all read operations waiting for write operations
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false.0)}// Release the w mutex to allow other writes to enter
    rw.w.Unlock()
}
Copy the code

RLock () method

The RLock method is used to obtain the lock from a read operation.

  1. Atomic updatesreaderCount+1;
  2. If a write operation existsatomic.AddInt32(&rw.readerCount, 1) < 0, the read operation is blocked.
func (rw *RWMutex) RLock(a) {
    // Atom update readerCount+1
    // 1. If the value of readerCount+1 is negative, it indicates that there is a write operation. A read operation can be performed only after the write operation is complete
    // 2. If the value of readerCount+1 is not negative, the current write operation does not exist and the read operation can be performed
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // Suspend the read operation
        runtime_SemacquireMutex(&rw.readerSem, false.0)}}Copy the code

RUnlock () method

The RUnlock method is used to release the lock on a read operation.

  1. Atomic updatesreaderCount-1;
  2. If the current read operation blocks the write operationatomic.AddInt32(&rw.readerCount, -1)<0Atomic renewalreaderWait-1;
    1. whenreaderWaitIf the value is 0, it indicates that all read operations blocking write operations are completed, and the write operation is awakened.
func (rw *RWMutex) RUnlock(a) {
    // Atom update readercount-1
    // If readerCount-1 is negative, the current read operation is blocking the write operation and readerWait needs to be updated
    if r := atomic.AddInt32(&rw.readerCount, - 1); r < 0 {
        rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1= =0 || r+1 == -rwmutexMaxReaders {
        throw("sync: RUnlock of unlocked RWMutex")}// Atomic readerwait-1
    // When readerwait-1 is 0, it indicates that all read operations that caused the write operation to block have been completed and the write operation is awakened
    if atomic.AddInt32(&rw.readerWait, - 1) = =0 {
        // Wake up the read operation
        runtime_Semrelease(&rw.writerSem, false.1)}}Copy the code