preface

This week my sister joined the new company, the boss wanted to check out his bottom, took a look at his resume, whoops, kafka, this girl has two down, in this case, you write a message queue. Because want to use go language to write, this can give sister worry bad. Hurry to help me, I am so unyielding one person, in the sister’s soft and hard or promised him, so the next I will teach my sister how to write a message queue. Let’s take a look at what I wrote ~ ~ ~.

This code has been uploaded to my Github:

If you need it, you can download it by yourself and give it a little star

What is a message queue

Sister is really my worry bad, write their own master kafka, unexpectedly do not know what is a message queue, so, always good temper I began to tell my sister what is a message queue.

The Message Queue, commonly known as MQ (Message Queue), is a combination of two words. Queues are a first-in, first-out data structure, and the use of queues is fairly common, but there are queues, so why do you need MQ?

I: ask you, elder sister, know? Why do YOU need MQ?

Sister: Come on, you want a beating?

Me: Poof… It’s my big mouth

Owe owe I began the next patient explanation……

Take a simple example, suppose we want to make a system now, the login system needs to login successfully after the user, send an email to the user’s mailbox to remind, the demand is still very simple, let’s take a look at the firstMQHow do we do that? Let’s draw a sequence diagram:

If you look at this diagram, the email is sent when the login request is made, and when the password is successfully verified, the email is sent and the login is successfully returned. That’s fine, but it’s flawed. This makes our login operation complicated, each login request needs to be sent by email, if there is an error here, the whole login request also has an error, resulting in the login failed; There is a problem, we need login request call interface only 100 ms, to do an email from the middle of waiting, then call a login interface time will increase, that’s the problem, an E-mail he is not a high priority, the user doesn’t need real-time receive this email, so at this time, This shows the importance of message queues, so let’s use message queues to improve.

Here we will send email requests to Mq so that we can improve the throughput of the user experience, which is very important, the customer is king, after all, no one likes to use a very slow app.

This is just one of the many applications of MQ, namely asynchronous applications. MQ also has important applications in system coupling and peak clipping/limiting. I won’t go into the details of these two applications, but the principle is the same, think about it, you will understand.

channel

Well, sister finally know what is the message queue, but now still can not carry on the message queue development, because there is still a knowledge point, that is, go language channel. This is important, and we need it to develop our message queues.

Due to limited space, channel is not introduced in detail here, only the basic usage method is introduced.

What is thechannel

Goroutine and Channel are the two cornerstones of concurrent programming in Go. Goroutines are used to perform concurrent tasks, and channels are used for synchronization and communication between Goroutines. Go advocates the use of communication instead of shared memory. When a Goroutine needs to be shared with other Goroutine resources, a Channel Bridges them and provides a mechanism to ensure secure synchronization. A channel is essentially a queue and follows the FIFO principle. The specific rules are as follows:

  • A Goroutine that reads from a Channel first receives data first;
  • A Goroutine that sends data to a Channel first gets the right to send data first;

Create channels

To create a channel, use the keyword make in the following format:

Channel instance :=make(chanData type)Copy the code
  • Data type: The type of the element transferred within the channel.
  • Channel instance: Channel handle created by make.

Use of unbuffered channels

An unbuffered channel in Go is one that does not have the ability to hold any value until it is received. This type of channel requires both the send and receive goroutine to be ready at the same time to complete the send and receive operations.

An unbuffered channel is defined as follows:

= make(chan)Copy the code
  • Channel type: The same as the unbuffered channel, affecting the data type sent and received by the channel.
  • Buffer size: 0
  • Channel instance: Created channel instance.

Write an example to help you understand:

package main

import (
    "sync"
    "time"
)

func main(a) {
    c := make(chan string)

    var wg sync.WaitGroup
    wg.Add(2)

    go func(a) {
        defer wg.Done()
        c <- Golang Dream Factory} ()go func(a) {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println(`Message: `+ <-c)
    }()

    wg.Wait()
}

Copy the code

Use of buffered channels

A buffered channel in Go is a channel that can store one or more values before they are received. This type of channel does not enforce that both send and receive must be completed between Goroutines. The channel will block and the conditions for sending and receiving actions will be different. The receive action blocks only if there are no values to receive in the channel. The send action blocks only if the channel has no available buffer to hold the value being sent.

Buffered channels are defined as follows:

= make(chan, buffer size)Copy the code
  • Channel type: The same as the unbuffered channel, affecting the data type sent and received by the channel.
  • Buffer size: Determines the maximum number of elements a channel can hold.
  • Channel instance: Created channel instance.

Let’s write an example:

package main

import (
    "sync"
    "time"
)

func main(a) {
    c := make(chan string.2)

    var wg sync.WaitGroup
    wg.Add(2)

    go func(a) {
        defer wg.Done()

        c <- Golang Dream Factory
        c <- `asong`} ()go func(a) {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println('Public number:'+ <-c)
        println(Author:+ <-c)
    }()

    wg.Wait()
}
Copy the code

Ok, the concept of channel is introduced here, if necessary, the next article I will give a detailed explanation of channel.

Message queue encoding implementation

Ready to post

Finally began to enter the theme, sister are listening to fall asleep, I rumble a voice, immediately spirit, but, Asong is also got a small electric cannon, painful price ah, woo woo…………

Before starting to code directly, I need to think about our entire code architecture, which is the right way to code. Let’s define an interface first, list the methods we need to implement, and implement each code later. Therefore, the following methods can be listed:

type Broker interface {
	publish(topic string, msg interface{}) error
	subscribe(topic string) (< -chan interface{}, error)
	unsubscribe(topic string, sub <-chan interface{}) error
	close()
	broadcast(msg interface{}, subscribers []chan interface{})
	setConditions(capacity int)}Copy the code
  • publish: To push a message, there are two parameterstopic,msg, respectively, the topic to subscribe to, and the message to deliver
  • subscribe: message subscription, pass in the topic of the subscription, you can complete the subscription, and return the correspondingchannelChannels are used to receive data
  • unsubscribe: Unsubscribe, passing in the subscribed topic and the corresponding channel
  • closeThis is used to close the message queue
  • broadCast: This is an internal method that broadcasts pushed messages to ensure that each subscriber can receive them
  • setConditionsThis is used to set the condition, which is the size of the message queue, so that we can control the size of the message queue

If you notice any problems, all of this code is defined as internal methods, that is, not available outside the package. Why do we do that? Because this is what the proxy does, we need to encapsulate a layer of methods that the client can call directly, so that it fits the software architecture. So you can write code like this:

package mq


type Client struct {
	bro *BrokerImpl
}

func NewClient(a) *Client {
	return &Client{
		bro: NewBroker(),
	}
}

func (c *Client)SetConditions(capacity int)  {
	c.bro.setConditions(capacity)
}

func (c *Client)Publish(topic string, msg interface{}) error{
	return c.bro.publish(topic,msg)
}

func (c *Client)Subscribe(topic string) (< -chan interface{}, error){
	return c.bro.subscribe(topic)
}

func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
	return c.bro.unsubscribe(topic,sub)
}

func (c *Client)Close(a)  {
	 c.bro.close()}func (c *Client)GetPayLoad(sub <-chan interface{})  interface{} {for val:= range sub{
		ifval ! =nil{
			return val
		}
	}
	return nil
}
Copy the code

The above code structure is fine, but the message queue implementation structure we have not designed, let’s do it now.

type BrokerImpl struct {
	exit chan bool
	capacity int

	topics map[string] []chan interface{} Key: topic value: queue
	sync.RWMutex / / synchronization locks
}
Copy the code
  • exit: is also a channel, which is used to close message queues
  • capacity: is used to set the capacity of message queues
  • topics: a map structure is used heretopic, its value is a slice,chanType. The reason for this is that we can have multiple subscribers to a topic, so one subscriber corresponds to one channel
  • sync.RWMutex: Read/write lock. This lock is used to prevent errors in data push in the case of concurrency

Ok, now that we are well prepared, let’s start our method filling journey

Publishandbroadcast

The two reasons for this combination are that Braodcast belongs to Publish. The idea here is very simple, we just need to broadcast the incoming data, let’s look at the code implementation:

func (b *BrokerImpl) publish(topic string, pub interface{}) error {
	select {
	case <-b.exit:
		return errors.New("broker closed")
	default:
	}

	b.RLock()
	subscribers, ok := b.topics[topic]
	b.RUnlock()
	if! ok {return nil
	}

	b.broadcast(pub, subscribers)
	return nil
}


func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
	count := len(subscribers)
	concurrency := 1

	switch {
	case count > 1000:
		concurrency = 3
	case count > 100:
		concurrency = 2
	default:
		concurrency = 1
	}
	pub := func(start int) {
		for j := start; j < count; j += concurrency {
			select {
			case subscribers[j] <- msg:
			case <-time.After(time.Millisecond * 5) :case <-b.exit:
				return}}}for i := 0; i < concurrency; i++ {
		go pub(i)
	}
}
Copy the code

The publish method has nothing to speak of, said here is mainly about the realization of the broadcast:

We’re basically broadcasting the data, so we just push the data out, we don’t have to wait for it to succeed, so we use Goroutine. When pushing, when the push fails, we can’t wait forever, so we have added a timeout mechanism to stop pushing after 5 milliseconds, and then proceed with the following push.

You may be wondering why there is a switch option on it, what is it for? Considering such a problem, when there are a large number of subscribers, such as 10,000, a for cycle to do message push will take a lot of time, and there will be delays between different consumers, so using this method to decompose can reduce a certain amount of time.

subscribeunsubScribe

Let’s start with the code:

func (b *BrokerImpl) subscribe(topic string) (< -chan interface{}, error) {
	select {
	case <-b.exit:
		return nil, errors.New("broker closed")
	default:
	}

	ch := make(chan interface{}, b.capacity)
	b.Lock()
	b.topics[topic] = append(b.topics[topic], ch)
	b.Unlock()
	return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
	select {
	case <-b.exit:
		return errors.New("broker closed")
	default:
	}

	b.RLock()
	subscribers, ok := b.topics[topic]
	b.RUnlock()

	if! ok {return nil
	}
	// delete subscriber
	var newSubs []chan interface{}
	for _, subscriber := range subscribers {
		if subscriber == sub {
			continue
		}
		newSubs = append(newSubs, subscriber)
	}

	b.Lock()
	b.topics[topic] = newSubs
	b.Unlock()
	return nil
}
Copy the code

Here it is quite simple:

  • subscribeThe implementation here is to create one for the subscribed topicchannelAnd then add the subscribers to the correspondingtopic, and returns a receivechannel.
  • unsubScribeThe idea here is to implement what we just addedchannelDelete it.

close

func (b *BrokerImpl) close(a)  {
	select {
	case <-b.exit:
		return
	default:
		close(b.exit)
		b.Lock()
		b.topics = make(map[string] []chan interface{})
		b.Unlock()
	}
	return
}
Copy the code

Make (map[string][]chan interface{}) make(map[string][]chan interface{})

setConditions GetPayLoad

The last two methods are to set our message queue capacity and encapsulate a method to get the messages we subscribe to:

func (b *BrokerImpl)setConditions(capacity int)  {
	b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{})  interface{} {for val:= range sub{
		ifval ! =nil{
			return val
		}
	}
	return nil
}
Copy the code

test

Well, the code has been written so quickly, let’s test it out.

Unit testing

Before the formal test, we still need to do a unit test first, to form a good habit, only the first self-test, to have the confidence to say that my code is ok, or directly run the program, there will be a lot of bugs.

Here’s how we tested it: We sent different messages to different topics, and when the subscriber received the message, they unsubscribed.

func TestClient(t *testing.T) {
	b := NewClient()
	b.SetConditions(100)
	var wg sync.WaitGroup

	for i := 0; i < 100; i++ {
		topic := fmt.Sprintf("Golang DreamWorks % D", i)
		payload := fmt.Sprintf("asong%d", i)

		ch, err := b.Subscribe(topic)
		iferr ! =nil {
			t.Fatal(err)
		}

		wg.Add(1)
		go func(a) {
			e := b.GetPayLoad(ch)
			ife ! = payload { t.Fatalf("%s expected %s but get %s", topic, payload, e)
			}
			iferr := b.Unsubscribe(topic, ch); err ! =nil {
				t.Fatal(err)
			}
			wg.Done()
		}()

		iferr := b.Publish(topic, payload); err ! =nil {
			t.Fatal(err)
		}
	}

	wg.Wait()
}
Copy the code

Test passed, no problem, so let’s write a few methods to test it

test

There are two ways to test this

Test 1: Use a timer to push messages to a topic.

// a topic test
func OnceTopic(a)  {
	m := mq.NewClient()
	m.SetConditions(10)
	ch,err :=m.Subscribe(topic)
	iferr ! =nil{
		fmt.Println("subscribe failed")
		return
	}
	go OncePub(m)
	OnceSub(ch,m)
	defer m.Close()
}

// Timed push
func OncePub(c *mq.Client)  {
	t := time.NewTicker(10 * time.Second)
	defer t.Stop()
	for  {
		select {
		case <- t.C:
			err := c.Publish(topic,"Asong really handsome")
			iferr ! =nil{
				fmt.Println("pub message failed")}default:}}}// Accept subscription messages
func OnceSub(m <-chan interface{},c *mq.Client)  {
	for  {
		val := c.GetPayLoad(m)
		fmt.Printf("get message is %s\n",val)
	}
}
Copy the code

Test 2: Use a timer to send messages to multiple topics on a regular basis:

// Multiple topic tests
func ManyTopic(a)  {
	m := mq.NewClient()
	defer m.Close()
	m.SetConditions(10)
	top := ""
	for i:=0; i<10; i++{ top = fmt.Sprintf("Golang DreamWorks _% 02D",i)
		go Sub(m,top)
	}
	ManyPub(m)
}

func ManyPub(c *mq.Client)  {
	t := time.NewTicker(10 * time.Second)
	defer t.Stop()
	for  {
		select {
		case <- t.C:
			for i:= 0; i<10; i++{// Multiple topics push different messages
				top := fmt.Sprintf("Golang DreamWorks _% 02D",i)
				payload := fmt.Sprintf("Asong nice _ % 02 d",i)
				err := c.Publish(top,payload)
				iferr ! =nil{
					fmt.Println("pub message failed")}}default:}}}func Sub(c *mq.Client,top string)  {
	ch,err := c.Subscribe(top)
	iferr ! =nil{
		fmt.Printf("sub top:%s failed\n",top)
	}
	for  {
		val := c.GetPayLoad(ch)
		ifval ! =nil{
			fmt.Printf("%s get message is %s\n",top,val)
		}
	}
}
Copy the code

conclusion

Finally help my sister to solve this problem, my sister is happy to die, give me a kiss, ah no, is a kua, kua people are embarrassed.

Have you learned this one? Didn’t learn it doesn’t matter, hurry up to download the source code, read through it, very well understood.

In fact, this article is for the next kafka learning foundation, learn this article, the next learning kafka will be a lot easier ~ ~ ~

Github address: github.com/asong2020/G…

It would be nice to have a little star

At the end, I will send you a small welfare. Recently, I was reading the book [micro-service architecture design mode], which is very good. I also collected a PDF, which can be downloaded by myself if you need it. Access: Follow the public account: [Golang Dreamworks], background reply: [micro service], can be obtained.

I have translated a GIN Chinese document, which will be maintained regularly. If you need it, you can download it by replying to [GIN] in the background.

I am Asong, an ordinary program ape, let me gradually become stronger together. I built my owngolangCommunication group, you need to add mevxI’ll pull you into the group. We welcome your attention, and we’ll see you next time

Recommended previous articles:

  • Context package, read this article enough!!

  • Go -ElasticSearch: How to get started

  • Interviewer: Have you used for-range in go? Can you explain the reasons for these problems

  • Learn wire dependency injection, Cron timing task is actually so easy!

  • I heard you don’t know how to JWT or swagger. – I’m skipping meals and I’m here with my practice program

  • Master these Go language features and you will improve your level by N levels (ii)

  • Go multiplayer chat room, here you can talk about anything!!

  • GRPC Practice – Learning GRPC is that simple

  • Go Standard library RPC practices

  • Asong picked up English and translated it with his heart

  • Several hot loading methods based on GIN

  • Boss: The guy doesn’t know how to use the Validator library yet