preface
Take advantage of the spare time, just think of a mini production – consumption message queue, dry dry. I am a Javer, this implementation, deliberately switched to go. That’s right, it’s zero basic go, by the way, you can learn to go.
Pre-knowledge:
- Basic Syntax of GO
- The message queue concept, there are only three: producer, consumer, queue
purpose
I didn’t think about how complicated the implementation would be, because time is limited, just mini, mini to what extent
-
Use bidirectional linked list data structures as queues
-
There are multiple topics for producer generation messages and consumer consumption messages
-
Supports concurrent writes by producers
-
Support consumer read, and ok, removed from the queue
-
Message not lost (persistence)
-
High performance (think first)
design
The overall architecture
agreement
The underlying communication protocol uses TCP. Mq uses a customized protocol based on TCP. The protocol is as follows
type Msg struct {
Id int64
TopicLen int64
Topic string
// 1-consumer 2-producer 3-comsumer-ack 4-error
MsgType int64 // Message type
Len int64 // Message length
Payload []byte / / message
}
Copy the code
Payload is used as a byte array because it’s treated as an array of bytes, no matter what data it is. Msg carries producer-produced messages, consumer-consumed messages, ACK and error messages, the first two of which are loaded, and the last two are empty in load and length
The codec processing of the protocol is the processing of bytes, and then there are two functions from byte to Msg and from Msg to byte
func BytesToMsg(reader io.Reader) Msg {
m := Msg{}
var buf [128]byte
n, err := reader.Read(buf[:])
iferr ! =nil {
fmt.Println("read failed, err:", err)
}
fmt.Println("read bytes:", n)
// id
buff := bytes.NewBuffer(buf[0:8])
binary.Read(buff, binary.LittleEndian, &m.Id)
// topiclen
buff = bytes.NewBuffer(buf[8:16])
binary.Read(buff, binary.LittleEndian, &m.TopicLen)
// topic
msgLastIndex := 16 + m.TopicLen
m.Topic = string(buf[16: msgLastIndex])
// msgtype
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
binary.Read(buff, binary.LittleEndian, &m.MsgType)
buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
binary.Read(buff, binary.LittleEndian, &m.Len)
if m.Len <= 0 {
return m
}
m.Payload = buf[msgLastIndex + 16:]
return m
}
func MsgToBytes(msg Msg) []byte {
msg.TopicLen = int64(len([]byte(msg.Topic)))
msg.Len = int64(len([]byte(msg.Payload)))
var data []byte
buf := bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Id)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.TopicLen)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Topic)...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.MsgType)
data = append(data, buf.Bytes()...)
buf = bytes.NewBuffer([]byte{})
binary.Write(buf, binary.LittleEndian, msg.Len)
data = append(data, buf.Bytes()...)
data = append(data, []byte(msg.Payload)...)
return data
}
Copy the code
The queue
Use container/list to implement first-in, first-out, with producers writing at the end of the queue and consumers reading at the end
package broker
import (
"container/list"
"sync"
)
type Queue struct {
len int
data list.List
}
var lock sync.Mutex
func (queue *Queue) offer(msg Msg) {
queue.data.PushBack(msg)
queue.len = queue.data.Len()
}
func (queue *Queue) poll(a) Msg{
if queue.len= =0 {
return Msg{}
}
msg := queue.data.Front()
return msg.Value.(Msg)
}
func (queue *Queue) delete(id int64) {
lock.Lock()
formsg := queue.data.Front(); msg ! =nil; msg = msg.Next() {
if msg.Value.(Msg).Id == id {
queue.data.Remove(msg)
queue.len = queue.data.Len()
break
}
}
lock.Unlock()
}
Copy the code
The offer method inserts data into the queue, poll reads the data element from the queue header, and DELETE deletes data from the queue based on the message ID. It is necessary to use the Queue structure to encapsulate the List. The List is the underlying data structure and we want to hide more of the underlying operations and only provide the basic operations to the customer
The delete operation removes the message from the queue after the consumer has successfully consumed and sent an ACK. Since consumers can consume more than one item at a time, the lock is locked when the critical region is entered.
broker
The broker acts as the server that receives connections, receives and responds to requests
package broker
import (
"bufio"
"net"
"os"
"sync"
"time"
)
var topics = sync.Map{}
func handleErr(conn net.Conn) {
if err := recover(a); err ! =nil {
println(err.(string))
conn.Write(MsgToBytes(Msg{MsgType: 4}}}))func Process(conn net.Conn) {
defer handleErr(conn)
reader := bufio.NewReader(conn)
msg := BytesToMsg(reader)
queue, ok := topics.Load(msg.Topic)
var res Msg
if msg.MsgType == 1 {
// comsumer
if queue == nil || queue.(*Queue).len= =0{
return
}
msg = queue.(*Queue).poll()
msg.MsgType = 1
res = msg
} else if msg.MsgType == 2 {
// producer
if ! ok {
queue = &Queue{}
queue.(*Queue).data.Init()
topics.Store(msg.Topic, queue)
}
queue.(*Queue).offer(msg)
res = Msg{Id: msg.Id, MsgType: 2}}else if msg.MsgType == 3 {
// consumer ack
if queue == nil {
return
}
queue.(*Queue).delete(msg.Id)
}
conn.Write(MsgToBytes(res))
}
Copy the code
When MsgType equals 1, the message is consumed directly; If MsgType = 2, the producer produces the message. If the queue is empty, a new queue needs to be created and placed under the corresponding topic. When MsgType is equal to 3, it indicates that the consumer has successfully consumed and can delete the message
Let’s say the message is not lost, and here it’s not completely implemented, so I’m persisting (not completely persisted, either). The idea is that messages in the queue corresponding to this topic are serialized in the protocol format and recovered from the file when the broker starts
Persistence needs to consider whether it is incremental or full, and how long it needs to be saved, which can affect implementation difficulty and performance (think of Persistence in Kafka and Redis)
func Save(a) {
ticker := time.NewTicker(60)
for {
select {
case <-ticker.C:
topics.Range(func(key, value interface{}) bool {
if value == nil {
return false
}
file, _ := os.Open(key.(string))
if file == nil {
file, _ = os.Create(key.(string))}formsg := value.(*Queue).data.Front(); msg ! =nil; msg = msg.Next() {
file.Write(MsgToBytes(msg.Value.(Msg)))
}
file.Close()
return false
})
default:
time.Sleep(1)}}}Copy the code
There is a question, when the delete operation above, does the file need to delete the corresponding message? The answer is yes, because if you don’t delete it, you’ll have to wait for the next full persistence to override it, and there will be dirty data in the middle
Here is the startup logic
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func main(a) {
listen, err := net.Listen("tcp"."127.0.0.1:12345")
iferr ! =nil {
fmt.Print("listen failed, err:", err)
return
}
go broker.Save()
for {
conn, err := listen.Accept()
iferr ! =nil {
fmt.Print("accept failed, err:", err)
continue
}
go broker.Process(conn)
}
}
Copy the code
producers
package main
import (
"awesomeProject/broker"
"fmt"
"net"
)
func produce(a) {
conn, err := net.Dial("tcp"."127.0.0.1:12345")
iferr ! =nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Id: 1102, Topic: "topic-test", MsgType: 2, Payload: []byte("我")}
n, err := conn.Write(broker.MsgToBytes(msg))
iferr ! =nil {
fmt.Print("write failed, err:", err)
}
fmt.Print(n)
}
Copy the code
consumers
package main
import (
"awesomeProject/broker"
"bytes"
"fmt"
"net"
)
func comsume(a) {
conn, err := net.Dial("tcp"."127.0.0.1:12345")
iferr ! =nil {
fmt.Print("connect failed, err:", err)
}
defer conn.Close()
msg := broker.Msg{Topic: "topic-test", MsgType: 1}
n, err := conn.Write(broker.MsgToBytes(msg))
iferr ! =nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("n", n)
var res [128]byte
conn.Read(res[:])
buf := bytes.NewBuffer(res[:])
receMsg := broker.BytesToMsg(buf)
fmt.Print(receMsg)
// ack
conn, _ = net.Dial("tcp"."127.0.0.1:12345")
l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
ife ! =nil {
fmt.Println("write failed, err:", err)
}
fmt.Println("l:", l)
}
Copy the code
The consumer re-creates the connection when it ack, otherwise the server will have to keep reading from conn until it finishes. For RabbitMQ, there are automatic and manual acks. If there is a manual ack, a new connection is required because the client does not know when to send the ACK. If there is an automatic connection, you can use the same connection
Start the
Start the broker, start producer, start comsumer, OK, can run, can send messages to the queue, consume messages from the queue
conclusion
Although the whole is simple, it is realized by USING GO after all. It seems that the operation is as fierce as a tiger, but in essence it is as panic as a dog. The first time I was troubled by go’s Gopath and go mod, the use of syntax behind, such as Pointers, values and references, etc., the most headache is type conversion, as a Javer, use go for type conversion, was really severely tortured.
In the end, I found it really cool to use Go, and I loved it…