In the second article, we will build on the previous one by adding a falling disk mechanism.

questions

The memory of a machine is always limited. In order to improve the service capacity of Mini-NSQ, when the memory usage reaches the set upper limit, we need to save the next message to the file.

Code design

In the design of NSQ, each topic and each channel will have a new coroutine responsible for reading and writing files, and they will be stored in their own files. At the same time, to prevent files from being too large, a new file will be created for storage when the file size reaches the preset capacity. After all the contents of a file are read, the file will be deleted.

Code change analysis

diskQueue

All operations on files are performed using the diskQueue structure

type diskQueue struct {
	name            string
	dataPath        string
	maxBytesPerFile int64 // The maximum storage capacity of a file. If it exceeds the maximum storage capacity, a new file must be created

	readChan  chan []byte // exposed via ReadChan()
	writeChan chan []byte
	writeResponseChan chan error

	readPos      int64
	writePos     int64
	readFileNum  int64
	writeFileNum int64 


	// Reading the file and actually sending the read to the receiver are two steps,
	// The following two variables are used to record the intermediate state between the two steps
	nextReadPos     int64
	nextReadFileNum int64

	readFile  *os.File
	writeFile *os.File
	reader    *bufio.Reader
	writeBuf  bytes.Buffer
}

func NewDiskQueue(name string) *diskQueue {
	d := diskQueue{
		name:            name,
		maxBytesPerFile: 24,
		readChan:        make(chan []byte),
		writeChan:       make(chan []byte),
		writeResponseChan: make(chan error),
	}
	d.dataPath, _ = os.Getwd()
	go d.ioLoop()
	return &d
}

// ReadChan returns the receive-only []byte channel for reading data
func (d *diskQueue) ReadChan(a) <-chan []byte {
	return d.readChan
}

// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
	d.writeChan <- data
	return <-d.writeResponseChan
}
Copy the code

DiskQueue’s readFileNum and readPos are used to identify which file is read to which location, while writeFileNum and writePos are used to identify which file is written to which location. In NewDiskQueue, we pass a name that uniquely identifies the file as part of the file name. Here we set the maximum size maxBytesPerFile to 24 bytes for later testing. In the actual NSQ, this value defaults to 100M and supports custom configuration.

We provide two apis, Put and ReadChan, for external calls.

The Put method is responsible for storing the message to a file. As you can see, it throws data to the writeChan channel, which is used to facilitate concurrent operations. For example, for the same topic, multiple clients may be publishing messages at the same time. With channels we don’t need to worry about the problems of concurrent writing, as long as there is a working coroutine constantly receiving messages from the channel and writing to the file. Note that in NewDiskQueue we set writeChan’s capacity to 0, so that it blocks by default, and if you go back up the call chain it will eventually block the protocal’s IOLoop method, that is, receiving network messages from the client, Take the pressure off the server by referring to the TCP sliding window protocol.

The ReadChan method exposes ReadChan to the caller, and any data read from the file is sent to ReadChan, so that the caller only needs to listen to ReadChan to retrieve the data in the file.

Notice that in the NewDiskQueue we’re going to start a new coroutine that calls the ioLoop method that actually reads and writes files, so let’s look at that method.

func (d *diskQueue) ioLoop(a) {
	var dataRead []byte
	var err error
	var r chan []byte

	for {
		if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
			NextReadPos = nextReadPos = nextReadPos = nextReadPos = nextReadPos
			// But the following select branch does not select r < -dataread, so we do not need to read again this time round
			if d.nextReadPos == d.readPos && d.nextReadFileNum==d.readFileNum {
				dataRead, err = d.readOne()
				iferr ! =nil {
					log.Printf("DISKQUEUE(%s) reading at %d of %s - %s",
						d.name, d.readPos, d.fileName(d.readFileNum), err)
					d.handleReadError()
					continue
				}
			}
			r = d.readChan
		} else {
			r = nil
		}

		select {
		case r <- dataRead:
			d.moveForward()
		case dataWrite := <-d.writeChan:
			d.writeResponseChan <- d.writeOne(dataWrite) // Pass the error directly to the upper level}}}Copy the code

In the infinite for loop, we first read one message if there are any left to read, and then in the select below, if we enter the r < -dataread branch, that means that the read message has been received somewhere, The moveForward method is then executed to update the read location and delete the old file. If the dataWrite := < -d.whitechan branch successfully receives the message that the caller of the Put method wants to store, then writeOne is executed to write the message to the file and the result is sent to writeResponseChan. It is eventually returned to the caller.

You can see that read and write operations do not occur at the same time for the same file. As we will see later in the analysis, it is possible to change the location of the next write and delete the file (in fact, rename the file) during a read operation when a read error occurs. In this way, you don’t have to worry about all sorts of concurrency issues if no concurrency is going to happen.

Let’s look at the actual write method

func (d *diskQueue) writeOne(data []byte) error { var err error if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err ! = nil { return err } log.Printf("DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err ! = nil { d.writeFile.Close() d.writeFile = nil return err } } } dataLen := int32(len(data)) d.writeBuf.Reset() err = Binary.Write(&d.whitebuf, binary.bigendian, dataLen) if err! = nil {return err} _, err = d.err.Write(data) if err! = nil { return err } // only write to the file once _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err ! = nil { d.writeFile.Close() d.writeFile = nil return err } totalBytes := int64(4 + dataLen) d.writePos += totalBytes If d.ritepos >= d.maaxbytesperfile {d.ritefilno ++ d.ritepos = 0 if d.ritefile! = nil { d.writeFile.Close() d.writeFile = nil } } return nil }Copy the code

First determine whether the file to be written is already open, if not open and jump to the location to be written. The actual writing of the message is a two-step process, starting with a four-byte message length and then writing the message in its entirety. The last place to write the file is updated.

Let’s look at the actual reading method

func (d *diskQueue) readOne(a) ([]byte, error) {
	var err error
	var msgSize int32

	if d.readFile == nil {
		curFileName := d.fileName(d.readFileNum)
		d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
		iferr ! =nil {
			return nil, err
		}

		log.Printf("DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)

		if d.readPos > 0 {
			_, err = d.readFile.Seek(d.readPos, 0)
			iferr ! =nil {
				d.readFile.Close()
				d.readFile = nil
				return nil, err
			}
		}

		d.reader = bufio.NewReader(d.readFile)
	}

	err = binary.Read(d.reader, binary.BigEndian, &msgSize)
	iferr ! =nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}

	readBuf := make([]byte, msgSize)
	_, err = io.ReadFull(d.reader, readBuf)
	iferr ! =nil {
		d.readFile.Close()
		d.readFile = nil
		return nil, err
	}

	totalBytes := int64(4 + msgSize)

	// we only advance next* because we have not yet sent this to consumers
	d.nextReadPos = d.readPos + totalBytes
	d.nextReadFileNum = d.readFileNum
	if d.nextReadPos >= d.maxBytesPerFile {
		ifd.readFile ! =nil {
			d.readFile.Close()
			d.readFile = nil
		}

		d.nextReadFileNum++
		d.nextReadPos = 0
	}

	return readBuf, nil
}
Copy the code

The read method is similar to the write method, except that it does not update the read location directly at the end. Instead, it stores the next read location into the nextReadFileNum, nextReadPos variable. This is because reading a message actually consists of two steps: 1. Read from a file 2. Send the read message to the receiver. The specific update action is the moveForward method that is called after it has been sent to the recipient, so let’s look at this method in detail.

func (d *diskQueue) moveForward(a) {
	oldReadFileNum := d.readFileNum
	d.readFileNum = d.nextReadFileNum
	d.readPos = d.nextReadPos
	// see if we need to clean up the old file
	ifoldReadFileNum ! = d.nextReadFileNum { fn := d.fileName(oldReadFileNum) err := os.Remove(fn)iferr ! =nil {
			log.Printf("DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
		}
	}
}
Copy the code

The method is very simple, at the end of the file to avoid infinite growth, we will delete the file that has been read.

Something else

func (d *diskQueue) handleReadError(a) {
	if d.readFileNum == d.writeFileNum {
		// if you can't properly read from the current write file it's safe to
		// assume that something is fucked and we should skip the current file too
		ifd.writeFile ! =nil {
			d.writeFile.Close()
			d.writeFile = nil
		}
		d.writeFileNum++
		d.writePos = 0
	}

	badFn := d.fileName(d.readFileNum)
	badRenameFn := badFn + ".bad"

	log.Printf("DISKQUEUE(%s) jump to next file and saving bad file as %s",
		d.name, badRenameFn)

	err := os.Rename(badFn, badRenameFn)
	iferr ! =nil {
		log.Printf(
			"DISKQUEUE(%s) failed to rename bad diskqueue file %s to %s",
			d.name, badFn, badRenameFn)
	}

	d.readFileNum++
	d.readPos = 0
	d.nextReadFileNum = d.readFileNum
	d.nextReadPos = 0
}

func (d *diskQueue) fileName(fileNum int64) string {
	return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum)
}
Copy the code

When there is an error in reading a file, the handleReadError method will be called. The specific method is to abandon this file and directly update the reading location to the next file. In addition, we will rename the wrong file as “*. Bad “for manual troubleshooting later.

Topic

Let’s look at how diskQueue is used in Topic. Note that the following code is not complete and the same parts as in the previous section may be omitted.

type Topic struct {
        memoryMsgChan:     make(chan *Message, 1),
	backend           *diskQueue
}

func NewTopic(topicName string) *Topic {
	t.backend = NewDiskQueue(topicName)
}
Copy the code

A member variable of type diskQueue is added to Topic backend to perform file-related operations. The name passed in is the name of the Topic itself. In addition, we set the memoryMsgChan capacity to 1 message here for later testing purposes.

func (t *Topic) PutMessage(m *Message) error {
	log.Printf("The message into the topic")
	select {
	case t.memoryMsgChan <- m:  // If there is room for it, put it in memory first
	default:
		err := writeMessageToBackend(m, t.backend) // If not, record to disk
		iferr ! =nil {
			log.Printf(
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}
Copy the code

The PutMessage method first tries to put it into memory, and if it fails, writes it to a file.

func (t *Topic) messagePump(a) {
	var err error
	var msg *Message
	var buf []byte
	var chans []*Channel
	var memoryMsgChan chan *Message
	var backendChan <-chan []byte

	t.Lock()
	for _, c := range t.channelMap {
		chans = append(chans, c)
	}
	t.Unlock()

	if len(chans) > 0 {
		memoryMsgChan = t.memoryMsgChan
		backendChan = t.backend.ReadChan()
	}

	// main message loop
	for {
		select {
		case msg = <-memoryMsgChan:
		case buf = <-backendChan:
			msg, err = decodeMessage(buf)
			iferr ! =nil {
				log.Printf("failed to decode message - %s", err)
				continue
			}
		case <-t.channelUpdateChan:
			log.Println("Topic update channel")
			chans = chans[:0]
			t.Lock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.Unlock()
			if len(chans) == 0 {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		}

		// The number of chans must be greater than 0, otherwise the message will be lost,
		// So we will set memoryMsgChan to nil when chans is 0
		for _, channel := range chans {
			err := channel.PutMessage(msg)
			iferr ! =nil {
				log.Printf(
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}
}
Copy the code

In select, the subsequent processing is the same whether the message is read from memory or from a file, so we keep the subsequent processing outside the select to avoid duplicate code. Also, since we store binary data in the file, we decode it to Message after reading it

Channel

type Channel struct {
	backend       *diskQueue
}

func NewChannel(topicName string, channelName string) *Channel {
	return &Channel{
                memoryMsgChan: make(chan *Message, 1),
		backend:       NewDiskQueue(getBackendName(topicName,channelName)),
	}
}

func getBackendName(topicName, channelName string) string {
	// backend names, for uniqueness, automatically include the topic... <topic>:<channel>
	backendName := topicName + "_____" + channelName
	return backendName
}

func (c *Channel) PutMessage(m *Message) error {
	log.Printf("The message into the channel, the body: % s", m.Body)
	select {
	case c.memoryMsgChan <- m:  // If there is room for it, put it in memory first
	default:
		err := writeMessageToBackend(m, c.backend) // If not, record to disk
		iferr ! =nil {
			log.Printf(
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil
}
Copy the code

The change of channel and topic is basically the same. Note that the name passed in here is spelled topic name before the name of channel to prevent confusion caused by the same naming. In addition, we spelled “___” in the middle. NSQ originally spelled “:”, and restricted the topic name and channel name from having “:” at the very beginning, thus eliminating the possibility of duplication. However, because Windows stipulated that “:” could not exist in files, In order to make it easier to see the data file in the test, we do not use the “:” here. NSQ is spelled with “:” on Linux and “; “on Windows. )

protocol

Since messages stored in a channel are ultimately retrieved and used in protocol.go, we also need to look at the protocol file

func (p *protocol) messagePump(client *client) {
	var err error
	var msg *Message
	var memoryMsgChan chan *Message
	var backendMsgChan <-chan []byte
	var subChannel *Channel
	// subEventChan is newly created so that it can be set to nil below so that a client can subscribe only once
	subEventChan := client.SubEventChan

	for {
		select {
		case subChannel = <-subEventChan:  // indicates that a subscription event occurred, where the subChannel is the channel to which the consumer is actually bound
			log.Printf("Topic :%s channel:%s subscription event",subChannel.topicName,subChannel.name)
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			// you can't SUB anymore
			subEventChan = nil
			continue
		case msg = <-memoryMsgChan: // If the memory channel corresponding to the channel has a message
		case buf := <-backendMsgChan:
			msg, err = decodeMessage(buf)
			iferr ! =nil {
				log.Printf("failed to decode message - %s", err)
				continue
			}
		}

		time.Sleep(time.Second*3)
		err = p.SendMessage(client, msg)
		iferr ! =nil {
			go func(a) {
				_=subChannel.PutMessage(msg)
			}()
			log.Printf("PROTOCOL(V2): [%s] messagePump error - %s", client.RemoteAddr(), err)
			goto exit
		}
	}

exit:
	log.Printf("PROTOCOL(V2): [%s] exiting messagePump", client.RemoteAddr())
}
Copy the code

Similar to topic, we won’t go into details. Just notice that in each for loop, we force the sleep to be 3s, in order to slow down the sending of messages to the consumer, in order to more visually see the changes in the data file, just for testing purposes.

Message

There are also two helper methods in Message, neither of which is complex

func decodeMessage(b []byte) (*Message, error) {
	var msg Message
	copy(msg.ID[:], b[:MsgIDLength])
	msg.Body = b[MsgIDLength:]
	return &msg, nil
}

func writeMessageToBackend(msg *Message, bq *diskQueue) error {
	msgByte, err := msg.Bytes()
	iferr ! =nil {
		return err
	}
	return bq.Put(msgByte)
}
Copy the code

test

client.go

func main(a) {
	log.SetFlags(log.Lshortfile | log.Ltime)
	nsqdAddr := "127.0.0.1:4150"
	conn, err := net.Dial("tcp", nsqdAddr)
	go readFully(conn)
	iferr ! =nil {
		log.Fatal(err)
	}
	var cmd *Command
	pubOnce(conn)
	time.Sleep(time.Second*3)

	cmd = Subscribe("mytopic"."mychannel")
	cmd.WriteTo(conn)

	select{}}func pubOnce(conn net.Conn){
	var cmd *Command
	cmd = Publish("mytopic"And []byte("one one "))
	cmd.WriteTo(conn)

	cmd = Publish("mytopic"And []byte("two two"))
	cmd.WriteTo(conn)

	cmd = Publish("mytopic"And []byte("three three"))
	cmd.WriteTo(conn)

	cmd = Publish("mytopic"And []byte("four four"))
	cmd.WriteTo(conn)
}
Copy the code

The client first sends 4 messages and then sleeps for 3s. At this time, no consumers receive these messages in mini-NSQ, and we have set the memory storage capacity of the topic to 1 message, so that three messages will be stored in the file. As shown below.

After 3s, the client subscribes to the topic, mini-NSQ will read and fetch data from the data file of the topic and pass it to the specified channel, because the memory capacity of the channel is only 1 message. In addition, we deliberately slowed down the speed of sending messages to consumers, so we can also briefly see the data files generated by the channel, as shown below

A few seconds later, when all the messages have been sent to the consumer, we look and see that there are no data files left.

As for the specific drop disk and read disk operation readers can check from the console output.

The code address

git clone https://github.com/xianxueniao150/mini-nsq.git
git checkout day02
Copy the code