1. Tidwall/WAL module introduction

Before we look at the source code for Tidwall/Raft-WAL, let’s take a look at the tidwall/ Wal module

A detailed analysis of Tidwall/Wal can be found in this previous article

2. Tidwall/Raft – WAL persistent data storage structure

3. Source code analysis

Persistence is nothing more than a few interfaces:

  • Initialize the
  • write
  • read
  • delete

Let’s analyze them one by one

3.1 Introduction to core data structure

// LogStore is a write ahead Raft log
type LogStore struct {
	mu    sync.Mutex
	// Wal log object
	log   *wal.Log
	buf   []byte
	batch wal.Batch
}
Copy the code
// Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine.
type Log struct {
	// Index holds the index of the log entry.
	Index uint64

	// Term holds the election term of the log entry.
	Term uint64

	// Type holds the type of the log entry.
	Type LogType

	// Data holds the log entry's type-specific data.
	Data []byte

	// Extensions holds an opaque byte slice of information for middleware. It
	// is up to the client of the library to properly modify this as it adds
	// layers and remove those layers when appropriate. This value is a part of
	// the log, so very large values could cause timing issues.
	//
	// N.B. It is _up to the client_ to handle upgrade paths. For instance if
	// using this with go-raftchunking, the client should ensure that all Raft
	// peers are using a version that can handle that extension before ever
	// actually triggering chunking behavior. It is sometimes sufficient to
	// ensure that non-leaders are upgraded first, then the current leader is
	// upgraded, but a leader changeover during this process could lead to
	// trouble, so gating extension behavior via some flag in the client
	// program is also a good idea.
	Extensions []byte
}
Copy the code

3.2 Read, Write, DeleteRange source code analysis

3.2.1 Initializing and basic Interfaces

var _ raft.LogStore = &LogStore{}

// Options for Open
type Options struct {
	// NoSync disables fsync after writes. This is less durable and puts the
	// log at risk of data loss when there's a server crash. Default false.
	NoSync bool
}

// Open the Raft log
func Open(path string, opts *Options) (*LogStore, error) {
	s := new(LogStore)
	// Default option
	wopts := *wal.DefaultOptions
	ifopts ! =nil {
		wopts.NoSync = opts.NoSync
	}
	// opts.LogFormat = wal.JSON
	var err error
	s.log, err = wal.Open(path, &wopts)
	iferr ! =nil {
		return nil, err
	}
	return s, nil
}

// Close the Raft log
func (s *LogStore) Close(a) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.log.Close()
}

// FirstIndex returns the first known index from the Raft log.
func (s *LogStore) FirstIndex(a) (uint64, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.log.FirstIndex()
}

// LastIndex returns the last known index from the Raft log.
func (s *LogStore) LastIndex(a) (uint64, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	return s.log.LastIndex()
}
Copy the code

3.2.2 Read Interface for reading logs

// GetLog is used to retrieve a log from FastLogDB at a given index.
// Read logs according to index
func (s *LogStore) GetLog(index uint64, log *raft.Log) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	data, err := s.log.Read(index)
	iferr ! =nil {
		if err == wal.ErrNotFound {
			return raft.ErrLogNotFound
		}
		return err
	}
	/ / set the index
	log.Index = index
	if len(data) == 0 {
		return wal.ErrCorrupt
	}
    Type+Term+len(Data)+Data+len(Ext)+Ext
	// Read 1 byte of type
	log.Type = raft.LogType(data[0])
	/ / offset data
	data = data[1:]
	var n int
	// Read varint term
	log.Term, n = binary.Uvarint(data)
	if n <= 0 {
		return wal.ErrCorrupt
	}
	/ / offset data
	data = data[n:]
	// Read the length of the data
	size, n := binary.Uvarint(data)
	if n <= 0 {
		return wal.ErrCorrupt
	}

	// Offset data
	data = data[n:]
	if uint64(len(data)) < size {
		return wal.ErrCorrupt
	}
	// Read data
	log.Data = data[:size]
	// Offset data
	data = data[size:]
	// Read the extended data length
	size, n = binary.Uvarint(data)
	if n <= 0 {
		return wal.ErrCorrupt
	}
	// Offset data
	data = data[n:]
	if uint64(len(data)) < size {
		return wal.ErrCorrupt
	}
	// Read the offset data content
	log.Extensions = data[:size]
	// Offset data
	data = data[size:]
	if len(data) > 0 {
		return wal.ErrCorrupt
	}
	return nil
}

func appendUvarint(dst []byte, x uint64) []byte {
	var buf [10]byte
	n := binary.PutUvarint(buf[:], x)
	dst = append(dst, buf[:n]...)
	return dst
}
Copy the code

3.2.3 Write Interface for writing logs


// StoreLog is used to store a single raft log
func (s *LogStore) StoreLog(log *raft.Log) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.buf = s.buf[:0]
	s.buf = appendLog(s.buf, log)
	return s.log.Write(log.Index, s.buf)
}

// StoreLogs is used to store a set of raft logs
func (s *LogStore) StoreLogs(logs []*raft.Log) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.batch.Clear()
	for _, log := range logs {
		s.buf = s.buf[:0]
		s.buf = appendLog(s.buf, log)
		s.batch.Write(log.Index, s.buf)
	}
	return s.log.WriteBatch(&s.batch)
}
Type+Term+len(Data)+Data+len(Ext)+Ext
func appendLog(dst []byte, log *raft.Log) []byte {
	dst = append(dst, byte(log.Type))
	dst = appendUvarint(dst, log.Term)
	dst = appendUvarint(dst, uint64(len(log.Data)))
	dst = append(dst, log.Data...)
	dst = appendUvarint(dst, uint64(len(log.Extensions)))
	dst = append(dst, log.Extensions...)
	return dst
}
Copy the code

3.2.4 DeleteRange Deletes a log interface


// DeleteRange is used to delete logs within a given range inclusively.
func (s *LogStore) DeleteRange(min, max uint64) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	first, err := s.log.FirstIndex()
	iferr ! =nil {
		return err
	}
	last, err := s.log.LastIndex()
	iferr ! =nil {
		return err
	}
	if min == first {
        // Delete previous data including Max
		if err := s.log.TruncateFront(max + 1); err ! =nil {
			return err
		}
	} else if max == last {
        // Delete data after min
		if err := s.log.TruncateBack(min - 1); err ! =nil {
			return err
		}
	} else {
		return wal.ErrOutOfRange
	}
	return nil
}

// Sync performs an fsync on the log. This is not necessary when the
// durability is set to High.
func (s *LogStore) Sync(a) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.log.Sync()
}

Copy the code

4. Relevant information

  • Tidwall /wal library github address
  • Tidwall/Raft – WAL library github address