The introduction

In other words, there is a scenario where a client sends messages repeatedly, requiring the server to process them asynchronously.

It’s a waste of resources to do it one at a time. Better to do it in batches.

Using a Message Queue such as Kafka is naturally preferred when message volumes are particularly high, but more often than not, we want to solve this problem with a much lighter solution.

Let’s look at the technical requirements in detail. This solution needs to achieve the following:

  • Post-processing of message aggregation (maximum BatchSize)
  • Delay processing (delay time is LingerTime)
  • Custom error handling
  • Concurrent processing

implementation

Based on this requirement, I quickly implemented the first step, message aggregation post-processing.

var (
	eventQueue     = make(chan interface{}, 4)
	batchSize      = 8
	workers        = 2
	batchProcessor = func(messages []interface{}) {
		fmt.Printf("%+v \n", messages)
	}
)

for i := 0; i < workers; i++ {
	go func(a) {
		var batch []interface{}
		for {
			msg := <-eventQueue
			batch = append(batch, msg)
			if len(batch) == batchSize {
				batchProcessor(batch)
				batch = make([]interface{}, 0)}}}()}for i := 0; i < 100; i++ {
	eventQueue <- i
}
Copy the code

The code is simple, but the core is already there.

  • A channel with a buffer is equivalent to a FIFO queue
  • Multiple resident Goroutines to improve concurrency
  • Goroutines are parallel, but serial within each goroutine, so batch operations are not locked.

The next step is to add delay handling and error handling.

var (
	eventQueue     = make(chan interface{}, 4)
	batchSize      = 8
	workers        = 2
	lingerTime     = 14 * time.Millisecond
	batchProcessor = func(batch []interface{}) error {
		fmt.Printf("%+v \n", batch)
		return nil
	}
	errHandler = func(err error, batch []interface{}) {
		fmt.Println("some error happens")})for i := 0; i < workers; i++ {
	go func(a) {
		var batch []interface{}
		lingerTimer := time.NewTimer(0)
		if! lingerTimer.Stop() { <-lingerTimer.C }defer lingerTimer.Stop()

		for {
			select {
			case msg := <-eventQueue:
				batch = append(batch, msg)
				if len(batch) ! = batchSize {if len(batch) == 1 {
						lingerTimer.Reset(lingerTime)
					}
					break
				}

				iferr := batchProcessor(batch); err ! =nil {
					errHandler(err, batch)
				}

				if! lingerTimer.Stop() { <-lingerTimer.C } batch =make([]interface{}, 0)
			case <-lingerTimer.C:
				iferr := batchProcessor(batch); err ! =nil {
					errHandler(err, batch)
				}

				batch = make([]interface{}, 0)}}}()}for i := 0; i < 100; i++ {
	eventQueue <- i
	time.Sleep(1 * time.Millisecond)
}
Copy the code

Even with just two points added, the code is significantly more complex, which is how many libraries grow.

When you focus on core problems, the code is clear, but as the functionality expands, the lines of code grow rapidly.

At this point, it’s easy to get lost in the code if you don’t grasp the core. About this point, I believe you are joining a new project, or looking at the source code of some mature projects have the same feeling. (That’s why I’ve listed all the different stages of the code. I don’t know what you think.)

The reason for using time.Timer instead of time.After is that memory leaks occur when time.After is used in for select. For details, see Golang Time. After memory leak problem analysis and Golang Time. After release problem.

So yes, the more code you write, the more bugs you get, but if the function is not perfect, the code still needs to be written.

At this point, it is more than enough to be a prototype, but there is still a lot to be done as a general purpose library, such as custom configuration.

The final version of the code, no more, no less, exactly 200 lines, will not be posted. For those of you interested, please click aggregator. go.

Finally, aggregators are included in my open source ChannelX repository, which is designed to implement various useful lightweight tools using channels. If there is a tool you like to use, please give it a thumbs up or a star 🙂