Found the problem

A problem was encountered during the construction of the project: the base data (8MB) (changeable) was heavily used. Cache to Redis access efficiency is not high. There are multiple pieces of underlying data.

Self-implementation of level-2 cache is inconsistent. Even if periodic refresh is performed, data cached locally by different instances of the same application may be inconsistent within a period of time. The local cache of all application instances cannot be updated.

Application A implements level 2 caching with JetCache. Create two instances during deployment. Example 1 Modifies a piece of data and updates it to the database, local cache, and remote cache. In example 2, data in the local cache remains old until the local cache is flushed or expires. The maximum time for data inconsistency depends on the execution cycle of the cache refresh job

Redisson (Redis) VS JetCache (Redis) VS JetCache (Redis + Caffeine)

  • Configuration file:
  • Unit test method:
@ Test public void compareEfficiency () throws IOException {/ / security data is loaded into the cache airportCacheDao. GetAllFromRedisson (); long start = System.currentTimeMillis();for (int i = 0; i < 20; i++) {
            airportCacheDao.getAllFromRedisson();
        }
        long end = System.currentTimeMillis();
        System.out.println(String.format("Query redis via Redisson; 20 times full data, time: %s ms", end - start)); / / security data is loaded into the cache airportCacheDao. GetAllFromDbOrJetCacheRemote (); long start1 = System.currentTimeMillis();for (int i = 0; i < 20; i++) {
            airportCacheDao.getAllFromDbOrJetCacheRemote();
        }
        long end1 = System.currentTimeMillis();

        System.out.println(String.format("Query redis with JetCache; 20 times full data, time: %s ms", end1 - start1)); / / security data is loaded into the cache airportCacheDao. GetAllFromDbOrJetCacheBoth (); long start2 = System.currentTimeMillis();for (int i = 0; i < 20; i++) {
            airportCacheDao.getAllFromDbOrJetCacheBoth();
        }
        long end2 = System.currentTimeMillis();

        System.out.println(String.format("Query via JetCache level 2 cache; 20 times full data, time: %s ms", end2 - start2));
        System.in.read();
    }
Copy the code
  • Methods in DAO
    public List<MdBmdmAirportDO> getAllFromRedisson() {
        List<MdBmdmAirportDO> airports;
        RBucket<List<MdBmdmAirportDO>> airportRList = redissonClient.getBucket(RedisKey.MDMAirportDataList+"_Redisson");
        if (airportRList.isExists()) {
            //System.out.println("get from cache");
            airports = airportRList.get();
        } else {
            System.out.println("get from db");
            airports = iMdBmdmAirportDao.list(null);
            airportRList.set(airports);
        }
        return airports;
    }

    @Cached(name = RedisKey.MDMAirportDataList+"_JetCache_Remote", cacheType = CacheType.REMOTE, expire = 1000)
    public List<MdBmdmAirportDO> getAllFromDbOrJetCacheRemote() {
        System.out.println("get from db");
        List<MdBmdmAirportDO> airports;
        airports = iMdBmdmAirportDao.list(null);
        return airports;
    }

    @Cached(name = RedisKey.MDMAirportDataList+"_JetCache_Both", cacheType = CacheType.BOTH, expire = 1000)
    public List<MdBmdmAirportDO> getAllFromDbOrJetCacheBoth() {
        System.out.println("get from db");
        List<MdBmdmAirportDO> airports;
        airports = iMdBmdmAirportDao.list(null);
        return airports;
    }
Copy the code

It can be seen from the comparison experiment that the query efficiency is greatly different.

Description:

  1. Caffeine was selected for the JetCache local cache component.
  2. jetCache@cached Note: cacheType = cacheType.BOTH Level-2 cache cacheType = cacheType.REMOTE Level-1 cache
  3. Redisson Pro supports level 2 caching for a fee. Not be considered

JetCache overview and features

JetCache is a Java-based caching system package that provides a unified API and annotations to simplify the use of caches. JetCache natively supports TTL, two-level caching, distributed auto-refresh, and provides a Cache interface for manual caching operations. Currently there are four implementations, RedisCache, TairCache, CaffeineCache(in memory) and a simple LinkedHashMapCache(in memory).

  • All features: Unified API access Cache system implements declarative method caching through annotations, supports TTL and two-level caching Create and configure Cache instances through annotations Automatic statistics for all Cache instances and method caches Key generation strategy and Value serialization strategy are configurable distributed Cache automatic refresh, Distributed lock (2.2+) Asynchronous Cache API (2.2+ when using Redis’s lettuce client) Spring Boot support

Please refer to the official JetCache wiki

Simple usage

JetCache provides easy-to-use annotations. Compared to the annotations provided by Spring Framework.cache, JetCache has added some new functional annotations: @Cacherefresh: for periodically refreshing the cache. @cacheUpdate: used to update the cache. @cachePenetrationProtect: Used to protect concurrent loads in the event of a cache access miss. The current version implements protection within a single JVM, where the same key is loaded by only one thread within the same JVM and the other threads wait for results.

I will not list them here. Please refer to the JetCache annotation instructions

The method cache implemented through the JetCache annotation, the data type of the cache value is String. There’s nothing wrong with that. More efficient than accessing data from complex data structures. (RedissonSpringCacheManager implementation of spring level cache, the cache method is through the Hash structure to cache method return value).

The JetCache local cache has a maximum element limit of 100 by default. Can be configured. Elimination based on LRU.

JetCache has problems

Level-2 cache structure: Local cache consistency problem. This article has been illustrated with examples at the beginning, so I will not repeat them here. JetCache native does not support automatic flush of level 2 cache.

The extension implementation

Clone the JetCache source code. The JetCache architecture is clever. The release mechanism of cache update message is designed. This ensures the scalability of the architecture.

Custom implementation of CacheMessagePublisher:

package com.xmair.core.jetcache; import com.alibaba.fastjson.JSON; import com.alicp.jetcache.autoconfigure.LettuceFactory; import com.alicp.jetcache.support.CacheMessage; import com.alicp.jetcache.support.CacheMessagePublisher; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; /** @author Ryanlee * @updateTime 2020-04-27 17:26 */ @service @primary public class RedisMessagePublisher implements CacheMessagePublisher { public Logger logger = LoggerFactory.getLogger(RedisMessagePublisher.class); @Autowired @Qualifier("defaultClient")
    public LettuceFactory lettuceFactory;

    private StatefulRedisPubSubConnection<String, String> connection;


    @PreDestroy
    public void destroy(){
        connection.close();
    }


    @Value("${jetcache.cacheMessagePublisher.topic}")
    String topicName;

    @Override
    public void publish(String area, String cacheName, CacheMessage cacheMessage) {
        try {
            if(null == connection || ! connection.isOpen()) { System.out.println("Initialize connection");
                RedisClusterClient client = (RedisClusterClient) lettuceFactory.getObject();
                connection = client.connectPubSub();
            }
        } catch (Exception e) {
            connection.close();
            e.printStackTrace();
        }
        CacheMessageWithName cacheMessageWithName = new CacheMessageWithName();
        cacheMessageWithName.setArea(area);
        cacheMessageWithName.setCacheName(cacheName);
        cacheMessageWithName.setCacheMessage(cacheMessage);

        RedisPubSubAsyncCommands<String, String> async = connection.async();

        async.publish(topicName,JSON.toJSONString(cacheMessageWithName));
        logger.info(String.format("Send cache update message: message:%s",cacheMessageWithName)); }}Copy the code

A custom RedisPubSubListener implementation:

package com.xmair.core.jetcache;

import com.alibaba.fastjson.JSON;
import com.alicp.jetcache.anno.support.CacheContext;
import com.alicp.jetcache.support.CacheMessage;
import io.lettuce.core.pubsub.RedisPubSubListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author ryanlee
 * @updateTime 2020-04-27 20:09
 */
public class CusRedisPubSubListener implements RedisPubSubListener<String, String> {
    public Logger logger = LoggerFactory.getLogger(CusRedisPubSubListener.class);
    private CacheContext cacheContext;
    private ILocalCacheInvalidateStrategy localCacheInvalidateStrategy;
    public CusRedisPubSubListener(CacheContext cacheContext,ILocalCacheInvalidateStrategy localCacheInvalidateStrategy){
        this.cacheContext = cacheContext;
        this.localCacheInvalidateStrategy =localCacheInvalidateStrategy;
    }
    @Override
    public void message(String channel, String message) {
        consumeMessage(message);
    }

    @Override
    public void message(String pattern, String channel, String message) {
        consumeMessage(message);
    }

    private void consumeMessage(String message) {
        logger.info("CusRedisPubSubListener receives a local cache update message:"+message);
        CacheMessageWithName cacheMessageWithName = JSON.parseObject(message,CacheMessageWithName.class);
        String area = cacheMessageWithName.getArea();
        String cacheName = cacheMessageWithName.getCacheName();
        CacheMessage cacheMessage = cacheMessageWithName.getCacheMessage();
        localCacheInvalidateStrategy.invalidateLocalCache(cacheContext,area,cacheName,cacheMessage);
    }

    @Override
    public void subscribed(String channel, long count) {
        System.out.println(String.format("Subscribe topic: %s",channel));

    }

    @Override
    public void psubscribed(String pattern, long count) {

    }

    @Override
    public void unsubscribed(String channel, long count) {

    }

    @Override
    public void punsubscribed(String pattern, long count) {

    }
}
Copy the code

Register listening service:

package com.xmair.core.jetcache; import com.alibaba.fastjson.JSON; import com.alicp.jetcache.anno.support.CacheContext; import com.alicp.jetcache.anno.support.ConfigProvider; import com.alicp.jetcache.autoconfigure.AutoConfigureBeans; import com.alicp.jetcache.autoconfigure.LettuceFactory; import com.alicp.jetcache.autoconfigure.RedisLettuceAutoConfiguration; import io.lettuce.core.RedisClient; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.List; /** * RedisPubSubListener Listener on channel ** @author Ryanlee * @updateTime 2020-04-27 17:43 */ @service public class LocalCacheUpdateInitService { @Value("${jetcache.cacheMessagePublisher.topic}")
    String topicName;

    @Autowired
    @Qualifier("defaultClient") LettuceFactory lettuceFactory; @Autowired ConfigProvider configProvider; @PostConstruct public void initConsumer() throws Exception { RedisClusterClient client = (RedisClusterClient) lettuceFactory.getObject(); StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub(); CaffeineInvalidateStrategy caffeineInvalidateStrategy = new CaffeineInvalidateStrategy(); RedisPubSubListener<String, String> listener = new CusRedisPubSubListener(configProvider.getCacheContext(),caffeineInvalidateStrategy); connection.addListener(listener); RedisPubSubCommands<String, String> sync = connection.sync(); sync.subscribe(topicName); }}Copy the code

Custom Caffeine local Cache expiration policy:

package com.xmair.core.jetcache; import com.alibaba.fastjson.JSON; import com.alicp.jetcache.anno.support.CacheContext; import com.alicp.jetcache.support.CacheMessage; import com.alicp.jetcache.support.FastjsonKeyConvertor; import com.github.benmanes.caffeine.cache.Cache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; / * * * * * Caffeine local cache expiration strategy @ author ryanlee * @ updateTime 08:10 2020-04-29 * / public class CaffeineInvalidateStrategy implements ILocalCacheInvalidateStrategy { public Logger logger = LoggerFactory.getLogger(CaffeineInvalidateStrategy.class); @Override public void invalidateLocalCache(CacheContext cacheContext, String area, String cacheName, CacheMessage cacheMessage) { System.out.println(String.format(Area :%s; cacheName:%s; cacheMessage:%s", area, cacheName, JSON.toJSONString(cacheMessage)));

        Cache localCache = cacheContext.getCache(area, cacheName).unwrap(Cache.class); // We wanted to compare the local and remote cached values during TYPE_PUT. If they are the same, the local cache is not invalidated. However, the JetCache source code does not provide a way to obtain the remote cache. inttype = cacheMessage.getType();

        switch (type) {
            case CacheMessage.TYPE_PUT:
            case CacheMessage.TYPE_REMOVE: {
                Object[] keys = cacheMessage.getKeys();
                invalidateLocalCaches(localCache, keys);
                break;
            }
            case CacheMessage.TYPE_REMOVE_ALL: {
                localCache.invalidateAll();
                break; } logger.info(String. Format ())"Invalid local cache: area:%s; cacheName:%s; cacheMessage:%s", area, cacheName, JSON.toJSONString(cacheMessage)));

    }

    private void invalidateLocalCaches(Cache localCache, Object[] keys) {
        for (Object key : keys) {
            Object fastJsonKey = FastjsonKeyConvertor.INSTANCE.apply(key);
            Object valueBefore = localCache.getIfPresent(fastJsonKey);
            if(null ! = valueBefore) { invalidateKey(localCache, fastJsonKey);
            }
        }
    }

    private void invalidateKey(Cache localCache, Object fastJsonKey) {
        Object valueBefore = localCache.getIfPresent(fastJsonKey);
        System.out.println("local cache value before invalidate:" + JSON.toJSONString(valueBefore));
        localCache.invalidate(fastJsonKey);
        System.out.println("invalidate local cache key:" + fastJsonKey);
        Object valueAfter = localCache.getIfPresent(fastJsonKey);
        System.out.println("local cache value after invalidate:"+ JSON.toJSONString(valueAfter)); }}Copy the code

Inspection effect:

  1. WhileGet first executes whileGet, emulating an application instance loop to obtain cache values:
    @Test
    public void whileGet() {
        while (true) {
            System.out.println(airportCacheDao.testCachedOneDay("UK")); sleep(1000); }}Copy the code
    @Cached(name = "STA_JET", cacheType = CacheType.BOTH, expire = 1,timeUnit = TimeUnit.DAYS)
    public String testCachedOneDay(String value) {
        String result = String.format("%s; timestamp:%s", value, Calendar.getInstance().getTime());
        System.out.println("get from testCached method:"+result);
        return result;
    }
Copy the code
  1. The testInvalidate simulation is then performed: an application instance invalidates the cache value
    @Test
    public void testInvalidate() throws IOException {
        airportCacheDao.testCacheInvalidate("UK");
    }
Copy the code
    @CacheInvalidate(name = "STA_JET")
    public String testCacheInvalidate(String value) {
        String result = String.format("%s; timestamp:%s", value, Calendar.getInstance().getTime());
        System.out.println("get from testCacheInvalidate method:"+result);

        return result;
    }
Copy the code

Execution Result:

  1. The testUpdate simulation is then executed: an application instance initiates cache updates.
    @Test
    public void testUpdate() throws IOException {
        airportCacheDao.testCacheUpdate("UK"); Sleep (1000); sleep(1000); }Copy the code
    @CacheUpdate(name = "STA_JET" ,value = "#value")
    public String testCacheUpdate(String value) {
        System.out.println("run testCacheUpdate method:"+value);
        return String.format("%s; timestamp:%s",value,Calendar.getInstance().getTime());
    }
Copy the code

Other tests were performed to verify the integrity and validity of the mechanism. Not one by one.

Problems with the above implementation

The application instance consumes the messages broadcast by itself, causing some logic to be repeatedly executed.

A, B, C, D

  1. A invalidates A cache of JetCache (level 1 and level 2 caches) and broadcasts it.
  2. Instances A, B, C, and D invalidate the local cache value after receiving the cache update message (type: remove). Example A performed two local cache invalidation operations. The second of these was unnecessary.
  3. Instance C receives the query request first, calls the target method to get the return value and puts it into the first and second levels. C Broadcast cache update messages (type: new or updated).
  4. Instances A, B, C, and D invalidate their local cache values again after receiving the cache update message sent by C. Instance C receives a cache update message that it broadcasts and invalidates the value in the local cache. It’s not necessary.
  5. D receives the query request and finds that the local cache does not contain a value but the remote cache does. Load the value from the remote cache to the local cache. The cache update event is not triggered at this time.
  6. User A receives the query request and finds that the local cache does not contain A value but the remote cache does. Load the value from the remote cache to the local cache. The cache update event is not triggered at this time. . From the scenario outlined above, you can see the problem mentioned at the beginning of the chapter: the application instance consumes its own broadcast messages, causing some logic to repeat itself. Preliminary idea: filter broadcast messages sent by itself through the unique identifier of the application instance. TODO

About the JetCache cache update message

The cache update event is triggered when an application instance updates or deletes a cache value. If the application instance receives a query request, the level-1 cache and level-2 cache do not contain any value. The target method body is executed. The return value is saved to the primary and secondary caches. The cache update operation is triggered. If remote Cache (Redis) has a value, but local cache (Caffeine) does not. User A loads the value in the remote cache to the local cache. Cache update operations are not triggered at this time.

The above extension implementation needs to be improved. More practical scenarios need to be tested. Cast out bricks to attract jade. Interested in more communication, to improve together.