Hands-on Implementation of a LocalCache – implementation article

preface

Hello, everyone, my name is Asong, after the introduction of the previous two articles, we have a basic understanding of how to design a local cache, this article is the end of this series, to implement a local cache by yourself, next listen to me carefully!!

The code has been uploaded to Github:Github.com/asong2020/g…

For now, this release is considered a 1.0 and will continue to be optimized and iterated.

Step 1: Abstract the interface

The first step is very important. Based on the principle of interface oriented programming, we first abstract out the methods to be exposed to the user, and provide the user with simple and understandable methods. Therefore, I abstract out the results as follows:

// ICache abstract interface
type ICache interface {
	// Set value use default expire time. default does not expire.
	Set(key string, value []byte) error
	// Get value if find it. if value already expire will delete.
	Get(key string) ([]byte, error)
	// SetWithTime set value with expire time
	SetWithTime(key string, value []byte, expired time.Duration) error
	// Delete manual removes the key
	Delete(key string) error
	// Len computes number of entries in cache
	Len() int
	// Capacity returns amount of bytes store in the cache.
	Capacity() int
	// Close is used to signal a shutdown of the cache when you are done with it.
	// This allows the cleaning goroutines to exit and ensures references are not
	// kept to the cache preventing GC of the entire cache.
	Close() error
	// Stats returns cache's statistics
	Stats() Stats
	// GetKeyHit returns key hit
	GetKeyHit(key string) int64
}
Copy the code
  • Set(key string, value []byte): The data stored in this method uses the default expiration time. If the asynchronous task is not enabled, it will never expire. Otherwise, the default expiration time is 10 minutes.
  • Get(key string) ([]byte, error): according to thekeyGets the object’s contents, which will be deleted if the data is out of date.
  • SetWithTime(key string, value []byte, expired time.Duration): Store objects with a custom expiration time
  • Delete(key string) error: Deletes cached data based on the key
  • Len() int: Gets the number of cached objects
  • Capacity() int: Gets the capacity of the current cache
  • Close() error: Disable cache
  • Stats() Stats: Caches monitoring data
  • GetKeyHit(key string) int64: getkeyHit ratio data of

Step 2: Define the cache object

The first step is to abstract the interface. Now we need to define a cache object instance to implement the interface.

type cache struct {
	// hashFunc represents used hash func
	hashFunc HashFunc
	// bucketCount represents the number of segments within a cache instance. value must be a power of two.
	bucketCount uint64
	// bucketMask is bitwise AND applied to the hashVal to find the segment id.
	bucketMask uint64
	// segment is shard
	segments []*segment
	// segment lock
	locks    []sync.RWMutex
	// close cache
	close chan struct{}}Copy the code
  • hashFunc: sharding hash function to use, users can define, to achieveHashFuncInterface, used by defaultfnvAlgorithm.
  • bucketCount: Specifies the number of shards, which must be an even number256.
  • bucketMask: Since the number of fragments is even, bit operation can be used instead of mod to improve performance and efficiency.hashValue % bucketCount == hashValue & bucketCount - 1.
  • segments: Shard object, the object structure of each shard will be described later.
  • locks: Read/write lock for each shard
  • close: Notifies others when closing the cache objectgoroutinesuspended

Next we write the constructor of the cache object:

// NewCache constructor cache instance
func NewCache(opts ... Opt) (ICache, error) {
	options := &options{
		hashFunc: NewDefaultHashFunc(),
		bucketCount: defaultBucketCount,
		maxBytes: defaultMaxBytes,
		cleanTime: defaultCleanTIme,
		statsEnabled: defaultStatsEnabled,
		cleanupEnabled: defaultCleanupEnabled,
	}
	for _, each := range opts{
		each(options)
	}

	if! isPowerOfTwo(options.bucketCount){return nil, errShardCount
	}

  if options.maxBytes <= 0 {
		return nil, ErrBytes
	}
  
	segments := make([]*segment, options.bucketCount)
	locks := make([]sync.RWMutex, options.bucketCount)

	maxSegmentBytes := (options.maxBytes + options.bucketCount - 1) / options.bucketCount
	for index := range segments{
		segments[index] = newSegment(maxSegmentBytes, options.statsEnabled)
	}

	c := &cache{
		hashFunc: options.hashFunc,
		bucketCount: options.bucketCount,
		bucketMask: options.bucketCount - 1,
		segments: segments,
		locks: locks,
		close: make(chan struct{})},if options.cleanupEnabled {
		go c.cleanup(options.cleanTime)
	}
	
	return c, nil
}
Copy the code

Here we use the Options programming mode for better extension, and our constructor does three main things:

  • Pre – parameter check, for external parameters, we still need to do basic verification
  • The fragment object is initialized
  • Constructing a cache object

When constructing the cache object, we need to calculate the capacity of each fragment first. By default, the entire local cache is 256 MB of data, and then the data is evenly divided into each area. Users can choose the size of data to cache by themselves.

Step 3: Define the sharding structure

The structure of each fragment is as follows:

type segment struct {
	hashmap map[uint64]uint32
	entries buffer.IBuffer
	clock   clock
	evictList  *list.List
	stats IStats
}
Copy the code
  • hashmp: storagekeyThe corresponding storage index
  • entries: storagekey/valueThe underlying structure, which we introduced in Step 4, is also the core of the code.
  • clock: Defines the time method
  • evicListHere we use a queue to recordoldIndex to delete when capacity is insufficient (temporary solution, the current storage structure is not suitable for useLRUElimination algorithm)
  • stats: Indicates cached monitoring data.

Let’s look at the constructor for each fragment:

func newSegment(bytes uint64, statsEnabled bool) *segment {
	if bytes == 0 {
		panic(fmt.Errorf("bytes cannot be zero"))}if bytes >= maxSegmentSize{
		panic(fmt.Errorf("too big bytes=%d; should be smaller than %d", bytes, maxSegmentSize))
	}
	capacity := (bytes + segmentSize - 1) / segmentSize
	entries := buffer.NewBuffer(int(capacity))
	entries.Reset()
	return &segment{
		entries: entries,
		hashmap: make(map[uint64]uint32),
		clock:   &systemClock{},
		evictList: list.New(),
		stats: newStats(statsEnabled),
	}
}
Copy the code

Here are the main points to note:

We calculate the capacity based on the size of the cache data for each segment, which corresponds to the initialization step of the cache object above.

Step 4: Define the cache structure

Now that the cache object is constructed, the core of the local cache is to define the cache structure.

Bigcache, FastCache and Freecache all use byte array instead of Map to store cached data, thus reducing GC pressure. Therefore, we can also use byte array for reference. Here, we use two-dimensional byte slice to store cached data keys/values. Let me draw a picture:

The advantage of using a two-dimensional array to store data compared to BigCache is that the corresponding data can be directly deleted according to the index. Although there are wormhole problems, we can record the index of the wormhole and keep filling.

The encapsulation structure of each cache is as follows:

Now that the basic idea is clear, let’s look at our encapsulation of the storage layer:

type Buffer struct {
	array [][]byte
	capacity int
	index int
	// maxCount = capacity - 1
	count int
	// availableSpace If any objects are removed after the buffer is full, the idle index is logged.
	// Avoid array "wormhole"
	availableSpace map[int]struct{}
	// placeholder record the index that buffer has stored.
	placeholder map[int]struct{}}Copy the code
  • array [][]byte: Stores a two-dimensional slice of a cache object
  • capacity: Maximum capacity of the cache structure
  • index: index, the index that records the location of the cache
  • count: Records the number of caches
  • availableSpace: Record the “wormhole”, when the cache object is deleted to record the index of the free location, convenient for later use of the “wormhole” when the capacity is full.
  • placeholder: Records the index of the cache object, which can be used to iteratively clear the stale cache.

The process of writing data to buffer (no code attached) :

Step 5: Refine the method of writing data to the cache

Now that we’ve defined all the structures we need, it’s time to populate our write cache method:

func (c *cache) Set(key string, value []byte) error  {
	hashKey := c.hashFunc.Sum64(key)
	bucketIndex := hashKey&c.bucketMask
	c.locks[bucketIndex].Lock()
	defer c.locks[bucketIndex].Unlock()
	err := c.segments[bucketIndex].set(key, hashKey, value, defaultExpireTime)
	return err
}

func (s *segment) set(key string, hashKey uint64, value []byte, expireTime time.Duration) error {
	if expireTime <= 0{
		return ErrExpireTimeInvalid
	}
	expireAt := uint64(s.clock.Epoch(expireTime))

	if previousIndex, ok := s.hashmap[hashKey]; ok {
		if err := s.entries.Remove(int(previousIndex)); err ! =nil{
			return err
		}
		delete(s.hashmap, hashKey)
	}

	entry := wrapEntry(expireAt, key, hashKey, value)
	for {
		index, err := s.entries.Push(entry)
		if err == nil {
			s.hashmap[hashKey] = uint32(index)
			s.evictList.PushFront(index)
			return nil
		}
		ele := s.evictList.Back()
		if err := s.entries.Remove(ele.Value.(int)); err ! =nil{
			return err
		}
		s.evictList.Remove(ele)
	}
}
Copy the code

The process analysis is as follows:

  • According to thekeyCalculate the hash value, and then obtain the corresponding shard position according to the number of shards
  • If the same exists in the current cachekey, delete first, and refresh the expiration time after reinserting
  • Encapsulate the storage structure based on the expiration timestamp,keyLength, hash size, cache object encapsulation
  • Store the data in the cache. If the cache fails, remove the oldest data and try again

Step 6: Refine the method of reading data from the cache

The first step is to calculate the hash value according to the key, and then obtain the corresponding fragment position according to the number of fragments:

func (c *cache) Get(key string) ([]byte, error)  {
	hashKey := c.hashFunc.Sum64(key)
	bucketIndex := hashKey&c.bucketMask
	c.locks[bucketIndex].RLock()
	defer c.locks[hashKey&c.bucketMask].RUnlock()
	entry, err := c.segments[bucketIndex].get(key, hashKey)
	iferr ! =nil{
		return nil, err
	}
	return entry,nil
}
Copy the code

The second step is to execute the sharding method to obtain the cached data:

  • Let’s look at the hash valuekeyDoes not exist in the cachekeyCould not find
  • Read data from the cache into the cachekeyDetermine if a hash conflict has occurred
  • Determine whether the cache object is expired and delete the cache data after expiration (whether the current expired data can be returned according to business optimization needs)
  • Monitor data in each record cache
func (s *segment) getWarpEntry(key string, hashKey uint64) ([]byte,error) {
	index, ok := s.hashmap[hashKey]
	if! ok { s.stats.miss()return nil, ErrEntryNotFound
	}
	entry, err := s.entries.Get(int(index))
	iferr ! =nil{
		s.stats.miss()
		return nil, err
	}
	if entry == nil{
		s.stats.miss()
		return nil, ErrEntryNotFound
	}

	ifentryKey := readKeyFromEntry(entry); key ! = entryKey { s.stats.collision()return nil, ErrEntryNotFound
	}
	return entry, nil
}

func (s *segment) get(key string, hashKey uint64) ([]byte, error) {
	currentTimestamp := s.clock.TimeStamp()
	entry, err := s.getWarpEntry(key, hashKey)
	iferr ! =nil{
		return nil, err
	}
	res := readEntry(entry)

	expireAt := int64(readExpireAtFromEntry(entry))
	if currentTimestamp - expireAt >= 0{
		_ = s.entries.Remove(int(s.hashmap[hashKey]))
		delete(s.hashmap, hashKey)
		return nil, ErrEntryNotFound
	}
	s.stats.hit(key)

	return res, nil
}
Copy the code

Step 7: Experiment with a test case

Let’s start with a simple test case:

func (h *cacheTestSuite) TestSetAndGet(a) {
	cache, err := NewCache()
	assert.Equal(h.T(), nil, err)
	key := "asong"
	value := []byte("Public account: Golang Dream Factory")

	err = cache.Set(key, value)
	assert.Equal(h.T(), nil, err)

	res, err := cache.Get(key)
	assert.Equal(h.T(), nil, err)
	assert.Equal(h.T(), value, res)
	h.T().Logf("get value is %s".string(res))
}
Copy the code

Running results:

=== RUN   TestCacheTestSuite
=== RUN   TestCacheTestSuite/TestSetAndGet
    cache_test.go:33Golang -- PASS: TestCacheTestSuite (0.00s)
    --- PASS: TestCacheTestSuite/TestSetAndGet (0.00s)
PASS
Copy the code

Once you’re done with the basics, all that’s left is benchmarking, tuning, and iterating.

Refer to the article

  • Github.com/allegro/big…
  • Github.com/VictoriaMet…
  • Github.com/coocood/fre…
  • Github.com/patrickmn/g…

conclusion

The implementation chapter ends here, but the coding of this project is not finished yet. I will continue to iterate and optimize the local cache based on this version.

  • The implementation is simple and the method provided to the user is easy to understand
  • Using two-dimensional slices as the storage structure avoids the disadvantage of not being able to delete the underlying data, and also avoids the “wormhole” problem to some extent.
  • Test cases complete, suitable for small white entry project

Points to be optimized:

  • Not using efficient cache flushing algorithms may result in frequent deletion of hot data
  • Periodically deleting expired data will cause the lock to be held for a long time, which needs to be optimized
  • Turning off cached instances requires optimization of the handling
  • Optimize for business scenarios (specific business scenarios)

Iteration points:

  • Add asynchronous load caching
  • …… (思考中)

The code has been uploaded to Github:Github.com/asong2020/g…

Well, that’s the end of this article. I amasongAnd we’ll see you next time.

Welcome to our official account: [Golang Dream Factory]