Found the problem
A problem was encountered during the construction of the project: the base data (8MB) (changeable) was heavily used. Cache to Redis access efficiency is not high. There are multiple pieces of underlying data.
Self-implementation of level-2 cache is inconsistent. Even if periodic refresh is performed, data cached locally by different instances of the same application may be inconsistent within a period of time. The local cache of all application instances cannot be updated.
Application A implements level 2 caching with JetCache. Create two instances during deployment. Example 1 Modifies a piece of data and updates it to the database, local cache, and remote cache. In example 2, data in the local cache remains old until the local cache is flushed or expires. The maximum time for data inconsistency depends on the execution cycle of the cache refresh job
Redisson (Redis) VS JetCache (Redis) VS JetCache (Redis + Caffeine)
- Configuration file:
- Unit test method:
@ Test public void compareEfficiency () throws IOException {/ / security data is loaded into the cache airportCacheDao. GetAllFromRedisson (); long start = System.currentTimeMillis();for (int i = 0; i < 20; i++) {
airportCacheDao.getAllFromRedisson();
}
long end = System.currentTimeMillis();
System.out.println(String.format("Query redis via Redisson; 20 times full data, time: %s ms", end - start)); / / security data is loaded into the cache airportCacheDao. GetAllFromDbOrJetCacheRemote (); long start1 = System.currentTimeMillis();for (int i = 0; i < 20; i++) {
airportCacheDao.getAllFromDbOrJetCacheRemote();
}
long end1 = System.currentTimeMillis();
System.out.println(String.format("Query redis with JetCache; 20 times full data, time: %s ms", end1 - start1)); / / security data is loaded into the cache airportCacheDao. GetAllFromDbOrJetCacheBoth (); long start2 = System.currentTimeMillis();for (int i = 0; i < 20; i++) {
airportCacheDao.getAllFromDbOrJetCacheBoth();
}
long end2 = System.currentTimeMillis();
System.out.println(String.format("Query via JetCache level 2 cache; 20 times full data, time: %s ms", end2 - start2));
System.in.read();
}
Copy the code
- Methods in DAO
public List<MdBmdmAirportDO> getAllFromRedisson() {
List<MdBmdmAirportDO> airports;
RBucket<List<MdBmdmAirportDO>> airportRList = redissonClient.getBucket(RedisKey.MDMAirportDataList+"_Redisson");
if (airportRList.isExists()) {
//System.out.println("get from cache");
airports = airportRList.get();
} else {
System.out.println("get from db");
airports = iMdBmdmAirportDao.list(null);
airportRList.set(airports);
}
return airports;
}
@Cached(name = RedisKey.MDMAirportDataList+"_JetCache_Remote", cacheType = CacheType.REMOTE, expire = 1000)
public List<MdBmdmAirportDO> getAllFromDbOrJetCacheRemote() {
System.out.println("get from db");
List<MdBmdmAirportDO> airports;
airports = iMdBmdmAirportDao.list(null);
return airports;
}
@Cached(name = RedisKey.MDMAirportDataList+"_JetCache_Both", cacheType = CacheType.BOTH, expire = 1000)
public List<MdBmdmAirportDO> getAllFromDbOrJetCacheBoth() {
System.out.println("get from db");
List<MdBmdmAirportDO> airports;
airports = iMdBmdmAirportDao.list(null);
return airports;
}
Copy the code
It can be seen from the comparison experiment that the query efficiency is greatly different.
Description:
- Caffeine was selected for the JetCache local cache component.
- jetCache@cached Note: cacheType = cacheType.BOTH Level-2 cache cacheType = cacheType.REMOTE Level-1 cache
- Redisson Pro supports level 2 caching for a fee. Not be considered
JetCache overview and features
JetCache is a Java-based caching system package that provides a unified API and annotations to simplify the use of caches. JetCache natively supports TTL, two-level caching, distributed auto-refresh, and provides a Cache interface for manual caching operations. Currently there are four implementations, RedisCache, TairCache, CaffeineCache(in memory) and a simple LinkedHashMapCache(in memory).
- All features: Unified API access Cache system implements declarative method caching through annotations, supports TTL and two-level caching Create and configure Cache instances through annotations Automatic statistics for all Cache instances and method caches Key generation strategy and Value serialization strategy are configurable distributed Cache automatic refresh, Distributed lock (2.2+) Asynchronous Cache API (2.2+ when using Redis’s lettuce client) Spring Boot support
Please refer to the official JetCache wiki
Simple usage
JetCache provides easy-to-use annotations. Compared to the annotations provided by Spring Framework.cache, JetCache has added some new functional annotations: @Cacherefresh: for periodically refreshing the cache. @cacheUpdate: used to update the cache. @cachePenetrationProtect: Used to protect concurrent loads in the event of a cache access miss. The current version implements protection within a single JVM, where the same key is loaded by only one thread within the same JVM and the other threads wait for results.
I will not list them here. Please refer to the JetCache annotation instructions
The method cache implemented through the JetCache annotation, the data type of the cache value is String. There’s nothing wrong with that. More efficient than accessing data from complex data structures. (RedissonSpringCacheManager implementation of spring level cache, the cache method is through the Hash structure to cache method return value).
The JetCache local cache has a maximum element limit of 100 by default. Can be configured. Elimination based on LRU.
JetCache has problems
Level-2 cache structure: Local cache consistency problem. This article has been illustrated with examples at the beginning, so I will not repeat them here. JetCache native does not support automatic flush of level 2 cache.
The extension implementation
Clone the JetCache source code. The JetCache architecture is clever. The release mechanism of cache update message is designed. This ensures the scalability of the architecture.
Custom implementation of CacheMessagePublisher:
package com.xmair.core.jetcache; import com.alibaba.fastjson.JSON; import com.alicp.jetcache.autoconfigure.LettuceFactory; import com.alicp.jetcache.support.CacheMessage; import com.alicp.jetcache.support.CacheMessagePublisher; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import javax.annotation.PreDestroy; /** @author Ryanlee * @updateTime 2020-04-27 17:26 */ @service @primary public class RedisMessagePublisher implements CacheMessagePublisher { public Logger logger = LoggerFactory.getLogger(RedisMessagePublisher.class); @Autowired @Qualifier("defaultClient")
public LettuceFactory lettuceFactory;
private StatefulRedisPubSubConnection<String, String> connection;
@PreDestroy
public void destroy(){
connection.close();
}
@Value("${jetcache.cacheMessagePublisher.topic}")
String topicName;
@Override
public void publish(String area, String cacheName, CacheMessage cacheMessage) {
try {
if(null == connection || ! connection.isOpen()) { System.out.println("Initialize connection");
RedisClusterClient client = (RedisClusterClient) lettuceFactory.getObject();
connection = client.connectPubSub();
}
} catch (Exception e) {
connection.close();
e.printStackTrace();
}
CacheMessageWithName cacheMessageWithName = new CacheMessageWithName();
cacheMessageWithName.setArea(area);
cacheMessageWithName.setCacheName(cacheName);
cacheMessageWithName.setCacheMessage(cacheMessage);
RedisPubSubAsyncCommands<String, String> async = connection.async();
async.publish(topicName,JSON.toJSONString(cacheMessageWithName));
logger.info(String.format("Send cache update message: message:%s",cacheMessageWithName)); }}Copy the code
A custom RedisPubSubListener implementation:
package com.xmair.core.jetcache;
import com.alibaba.fastjson.JSON;
import com.alicp.jetcache.anno.support.CacheContext;
import com.alicp.jetcache.support.CacheMessage;
import io.lettuce.core.pubsub.RedisPubSubListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ryanlee
* @updateTime 2020-04-27 20:09
*/
public class CusRedisPubSubListener implements RedisPubSubListener<String, String> {
public Logger logger = LoggerFactory.getLogger(CusRedisPubSubListener.class);
private CacheContext cacheContext;
private ILocalCacheInvalidateStrategy localCacheInvalidateStrategy;
public CusRedisPubSubListener(CacheContext cacheContext,ILocalCacheInvalidateStrategy localCacheInvalidateStrategy){
this.cacheContext = cacheContext;
this.localCacheInvalidateStrategy =localCacheInvalidateStrategy;
}
@Override
public void message(String channel, String message) {
consumeMessage(message);
}
@Override
public void message(String pattern, String channel, String message) {
consumeMessage(message);
}
private void consumeMessage(String message) {
logger.info("CusRedisPubSubListener receives a local cache update message:"+message);
CacheMessageWithName cacheMessageWithName = JSON.parseObject(message,CacheMessageWithName.class);
String area = cacheMessageWithName.getArea();
String cacheName = cacheMessageWithName.getCacheName();
CacheMessage cacheMessage = cacheMessageWithName.getCacheMessage();
localCacheInvalidateStrategy.invalidateLocalCache(cacheContext,area,cacheName,cacheMessage);
}
@Override
public void subscribed(String channel, long count) {
System.out.println(String.format("Subscribe topic: %s",channel));
}
@Override
public void psubscribed(String pattern, long count) {
}
@Override
public void unsubscribed(String channel, long count) {
}
@Override
public void punsubscribed(String pattern, long count) {
}
}
Copy the code
Register listening service:
package com.xmair.core.jetcache; import com.alibaba.fastjson.JSON; import com.alicp.jetcache.anno.support.CacheContext; import com.alicp.jetcache.anno.support.ConfigProvider; import com.alicp.jetcache.autoconfigure.AutoConfigureBeans; import com.alicp.jetcache.autoconfigure.LettuceFactory; import com.alicp.jetcache.autoconfigure.RedisLettuceAutoConfiguration; import io.lettuce.core.RedisClient; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.List; /** * RedisPubSubListener Listener on channel ** @author Ryanlee * @updateTime 2020-04-27 17:43 */ @service public class LocalCacheUpdateInitService { @Value("${jetcache.cacheMessagePublisher.topic}")
String topicName;
@Autowired
@Qualifier("defaultClient") LettuceFactory lettuceFactory; @Autowired ConfigProvider configProvider; @PostConstruct public void initConsumer() throws Exception { RedisClusterClient client = (RedisClusterClient) lettuceFactory.getObject(); StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub(); CaffeineInvalidateStrategy caffeineInvalidateStrategy = new CaffeineInvalidateStrategy(); RedisPubSubListener<String, String> listener = new CusRedisPubSubListener(configProvider.getCacheContext(),caffeineInvalidateStrategy); connection.addListener(listener); RedisPubSubCommands<String, String> sync = connection.sync(); sync.subscribe(topicName); }}Copy the code
Custom Caffeine local Cache expiration policy:
package com.xmair.core.jetcache; import com.alibaba.fastjson.JSON; import com.alicp.jetcache.anno.support.CacheContext; import com.alicp.jetcache.support.CacheMessage; import com.alicp.jetcache.support.FastjsonKeyConvertor; import com.github.benmanes.caffeine.cache.Cache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; / * * * * * Caffeine local cache expiration strategy @ author ryanlee * @ updateTime 08:10 2020-04-29 * / public class CaffeineInvalidateStrategy implements ILocalCacheInvalidateStrategy { public Logger logger = LoggerFactory.getLogger(CaffeineInvalidateStrategy.class); @Override public void invalidateLocalCache(CacheContext cacheContext, String area, String cacheName, CacheMessage cacheMessage) { System.out.println(String.format(Area :%s; cacheName:%s; cacheMessage:%s", area, cacheName, JSON.toJSONString(cacheMessage)));
Cache localCache = cacheContext.getCache(area, cacheName).unwrap(Cache.class); // We wanted to compare the local and remote cached values during TYPE_PUT. If they are the same, the local cache is not invalidated. However, the JetCache source code does not provide a way to obtain the remote cache. inttype = cacheMessage.getType();
switch (type) {
case CacheMessage.TYPE_PUT:
case CacheMessage.TYPE_REMOVE: {
Object[] keys = cacheMessage.getKeys();
invalidateLocalCaches(localCache, keys);
break;
}
case CacheMessage.TYPE_REMOVE_ALL: {
localCache.invalidateAll();
break; } logger.info(String. Format ())"Invalid local cache: area:%s; cacheName:%s; cacheMessage:%s", area, cacheName, JSON.toJSONString(cacheMessage)));
}
private void invalidateLocalCaches(Cache localCache, Object[] keys) {
for (Object key : keys) {
Object fastJsonKey = FastjsonKeyConvertor.INSTANCE.apply(key);
Object valueBefore = localCache.getIfPresent(fastJsonKey);
if(null ! = valueBefore) { invalidateKey(localCache, fastJsonKey);
}
}
}
private void invalidateKey(Cache localCache, Object fastJsonKey) {
Object valueBefore = localCache.getIfPresent(fastJsonKey);
System.out.println("local cache value before invalidate:" + JSON.toJSONString(valueBefore));
localCache.invalidate(fastJsonKey);
System.out.println("invalidate local cache key:" + fastJsonKey);
Object valueAfter = localCache.getIfPresent(fastJsonKey);
System.out.println("local cache value after invalidate:"+ JSON.toJSONString(valueAfter)); }}Copy the code
Inspection effect:
- WhileGet first executes whileGet, emulating an application instance loop to obtain cache values:
@Test
public void whileGet() {
while (true) {
System.out.println(airportCacheDao.testCachedOneDay("UK")); sleep(1000); }}Copy the code
@Cached(name = "STA_JET", cacheType = CacheType.BOTH, expire = 1,timeUnit = TimeUnit.DAYS)
public String testCachedOneDay(String value) {
String result = String.format("%s; timestamp:%s", value, Calendar.getInstance().getTime());
System.out.println("get from testCached method:"+result);
return result;
}
Copy the code
- The testInvalidate simulation is then performed: an application instance invalidates the cache value
@Test
public void testInvalidate() throws IOException {
airportCacheDao.testCacheInvalidate("UK");
}
Copy the code
@CacheInvalidate(name = "STA_JET")
public String testCacheInvalidate(String value) {
String result = String.format("%s; timestamp:%s", value, Calendar.getInstance().getTime());
System.out.println("get from testCacheInvalidate method:"+result);
return result;
}
Copy the code
Execution Result:
- The testUpdate simulation is then executed: an application instance initiates cache updates.
@Test
public void testUpdate() throws IOException {
airportCacheDao.testCacheUpdate("UK"); Sleep (1000); sleep(1000); }Copy the code
@CacheUpdate(name = "STA_JET" ,value = "#value")
public String testCacheUpdate(String value) {
System.out.println("run testCacheUpdate method:"+value);
return String.format("%s; timestamp:%s",value,Calendar.getInstance().getTime());
}
Copy the code
Other tests were performed to verify the integrity and validity of the mechanism. Not one by one.
Problems with the above implementation
The application instance consumes the messages broadcast by itself, causing some logic to be repeatedly executed.
A, B, C, D
- A invalidates A cache of JetCache (level 1 and level 2 caches) and broadcasts it.
- Instances A, B, C, and D invalidate the local cache value after receiving the cache update message (type: remove). Example A performed two local cache invalidation operations. The second of these was unnecessary.
- Instance C receives the query request first, calls the target method to get the return value and puts it into the first and second levels. C Broadcast cache update messages (type: new or updated).
- Instances A, B, C, and D invalidate their local cache values again after receiving the cache update message sent by C. Instance C receives a cache update message that it broadcasts and invalidates the value in the local cache. It’s not necessary.
- D receives the query request and finds that the local cache does not contain a value but the remote cache does. Load the value from the remote cache to the local cache. The cache update event is not triggered at this time.
- User A receives the query request and finds that the local cache does not contain A value but the remote cache does. Load the value from the remote cache to the local cache. The cache update event is not triggered at this time. . From the scenario outlined above, you can see the problem mentioned at the beginning of the chapter: the application instance consumes its own broadcast messages, causing some logic to repeat itself. Preliminary idea: filter broadcast messages sent by itself through the unique identifier of the application instance. TODO
About the JetCache cache update message
The cache update event is triggered when an application instance updates or deletes a cache value. If the application instance receives a query request, the level-1 cache and level-2 cache do not contain any value. The target method body is executed. The return value is saved to the primary and secondary caches. The cache update operation is triggered. If remote Cache (Redis) has a value, but local cache (Caffeine) does not. User A loads the value in the remote cache to the local cache. Cache update operations are not triggered at this time.
The above extension implementation needs to be improved. More practical scenarios need to be tested. Cast out bricks to attract jade. Interested in more communication, to improve together.