First of all, I would like to say that golang’s channel is not badly designed. As the main communication mechanism between Golang threads, golang’s channel is quite good, but I think there are still some areas that are not perfect enough.

     

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29Copy the code

               
package main


import (
"fmt"
"time"
)


func dgucoproducer ( c chan int . max int ) {
for i : = 0 ; i < max ; i ++ {
c <- i
}
}


func dgucoconsumer ( c chan int ) {
ok : = true
for ok {
if value . ok : = <- c ; ok {
fmt . Println ( value )
}
}
}


func main (a) {
c : = make ( chan int )
defer close ( c )
go dgucoproducer ( c . 10 )
go dgucoconsumer ( c )
time . Sleep ( time . Millisecond * 10 )
}
Copy the code

A piece of CODE from CODE

dgucocustomer.go

This is probably the most common usage we use, n Goroutine write channels and n groutine read, the groutine that tries to write when the channel is full and the Goro utine that tries to read when the channel is empty hangs. To put it bluntly, it is the classic producer-consumer problem. How does Channel deal with this problem?

   

Channel data structure, some basic information, data type, current number of data, channel size, data pointer array, read and write index, read and write Goro UTine and read and write lock.

(Steal someone else’s picture)

  

When a Goroutine calls send to send a data to a channel, sendx is incremented by one. Recvx is less than sendx and is readable. When a goroutine calls send to a channel, sendx is incremented by one. When reading a data, recv also increses by one, which is basically a queue operation that replaces the familiar real and front Pointers with recvx and sendx. The only difference here is that we have to lock the queue to keep it thread-safe before reading or writing. Lock ->recv-> UNLOCK,lock->send-> UNLOCK, of course the basic process, there are a lot of details in the operation, change read/write index, data copy, clear buff and so on.

That’s what we’re talking about today. First of all, locking is a huge waste of performance, and it gets slower and slower as the granularity of the lock increases. In other words, the bigger our channel is, the slower it reads and writes because the lock and unlock become slower.

First, let’s look at the disadvantages of queue. If more than one producer wants to put something on the queue, the tail pointer becomes a conflict point because multiple threads want to update it. If there are multiple consumers, then the header pointer will be in contention because after the element is consumed, the pointer will need to be updated, so there will be not only read but also write operations. Another problem is that when the head pointer and tail pointer overlap we have to keep a variable of size in order to tell whether the queue is empty or full. Otherwise, it needs to be judged by the contents of the elements in the queue, in which case a write is required after an Entry is consumed to clear the tag, or to indicate that the node has been consumed. No matter how it is implemented, there will always be a lot of contention on headers, tails, and size variables, or if a consume operation removes an element that requires a write operation, the element itself contains contention. Based on the above, these three variables are often in the same cache line, which can lead to false sharing. Therefore, not only do you have to worry about producers and consumers writing the size variable (or element) at the same time, but also be aware that since the header pointer and the tail pointer are in the same location, updating the tail pointer will cause a cache miss when the header pointer is updated. Too much competition causes channels to be locked even in single-producer, single-consumer situations.

Is there an implementation that does not lock, or even consume nothing at all under a single producer consumer?

Use a ring queue instead of Disruptor, which has a very different read and write strategy.

  

First of all, don’t mind my sketchy style. Join us in applying for 10 sizes of a certain type of channel, We need a volatile 64-bit global cusor to mark the maximum index produced by the producer. This variable will be read and written frequently by multiple threads. Volatile ensures visibility between threads. The entire writing process is as follows:

Apply for a writable location. If there is a single producer that does not need any synchronization operation, add cusor to one. If there is a multi-producer that requires another ATMOic variable (i.e. a CAS operation), apply for a writable location first. ClaimSequence (&claimSequence,8, 9), CAS(&claimSequence,8, 9), CAS(& Cusor,8,9), CAS(& Cusor,8,9), CAS(& Cusor,8,9), CAS(& Cusor,8,9) Each time a value is written, Cusor is incremented by one, and then the specific position in the ring queue is obtained by modulo operation for the consumer to read.

In addition, in order to prevent producers from producing too fast, the data of consumers is overwritten in the circular queue. The producers should track the consumption of consumers, which is to read the current consumption position of each consumer. For example, if the size of a ring queue is 10 and two consumers consume up to element 4, the producer cannot produce more than 13 new elements, then the producer goroutine can be suspended. This requires each consumer to record their consumption location information, which has two advantages.

1 Suppose that a data needs to pass through consumer A -> B -> C in the following scenario, such a serial consumption process. Producer P1’s production event is written into the ringbuffer, and consumer A needs to judge whether there are consumable events according to the position at the end of the queue. Consumer B needs to judge whether there are consumable events according to the location of consumer A, and similarly, C needs to judge according to B. Producers need to track the location of C to prevent overwriting unconsumed data. If we use queues, we have to create a channel for producers -> A,a-> B, and b-> C. Each additional process creates an additional channel. Of course, this also has the disadvantage of increasing the production to consumption time, because the whole process is longer, but if this situation is not taken into account its efficiency decision is much faster than the above implementation, at least an order of magnitude faster for a single producer and consumer.

2. In the current channel, if one producer has multiple consumers, we need such a function that the data produced by the producer needs to be available to all consumers. Therefore, we have to set up a channel between the producer and each consumer, and then send the data to all channels. If each consumer records his or her consumption location, all consumers can obtain data based on their own information in a channel, while the producer only needs to track the slowest consumer and place coverage as consumption data. Of course, he also has his own disadvantages. If only one of the consumers receives it, we simply add a claimSequence to mark the location information currently read as multiple producers do, and consume it each time.

Finally, the producer knows that there is no data to read. It is very simple to suspend the producer when its position information is equal to Cusor and wake up when cusOR is greater than its position information.

Of course, this implementation can only be adopted under the condition that the channel reading speed is extremely demanding. Of course, it also has many bad places. This is just my own imagination, and the official did not use this implementation.