This is my 31st day of the August Genwen Challenge
Recently we encountered a troublesome requirement for a microservice application to access two different Redis clusters simultaneously. We wouldn’t normally use Redis this way, but the two Redis are different business clusters and now need a microservice to access them simultaneously.
In fact, we may encounter similar scenarios in actual business development. For example, Redis read-write separation, which is also not provided by Spring-data-Redis. The underlying connection pools such as Lettuce or Jedis all provide apis for accessing read-only connections, but there are two drawbacks:
- The upper layer spring-data-Redis does not encapsulate this interface
- Based on redis architecture, sentinel mode needs to configure the address of Sentinel, and cluster mode needs to be aware of the cluster topology. In the cloud native environment, these are hidden by the cloud provider by default, and only one dynamic VIP domain name is exposed outside.
Therefore, we need to implement a mechanism to dynamically switch redis connections based on spring-data-Redis.
The spring-data-redis configuration class is: Org. Springframework. Boot. Autoconfigure. Data. Redis. RedisProperties, you can configure a single redis instance or a connection to the redis cluster configuration. From these configurations, a unified Redis connection factory, RedisConnectionFactory, is generated
The spring-data-Redis core interface and the connection-related abstract relation behind it are:
From this diagram, we can know that we can implement a RedisConnectionFactory that can dynamically return different Redis connections, and according to the auto-loading source of spring-data-Redis, we can know that the RedisConnectionFactory can dynamically return different Redis connections. All redisConnectionFactories in the framework are @conditionAlonmissingBean’s, meaning we can replace them with our own implementation of the RedisConnectionFactory.
Project address: github.com/JoJoTec/spr…
We can wrap the RedisProperties configuration outer layer with a multi-redis connection configuration, namely MultiRedisProperties:
@Data @NoArgsConstructor @ConfigurationProperties(prefix = "spring.redis") public class MultiRedisProperties { /** * Public static Final String default = "default"; private boolean enableMulti = false; private Map<String, RedisProperties> multi; }Copy the code
This configuration is based on the original configuration, that is, the user can use the original configuration, also can use the multi-redis configuration, that is, you need to configure spring. Multi The key in the Map is the name of the data source that the user can use to specify which Redis to use before using either RedisTemplate or ReactiveRedisTemplate.
Next we implemented MultiRedisLettuceConnectionFactory that can dynamically switch RedisConnectionFactory Redis connection and our project USES Redis client is Lettuce:
public class MultiRedisLettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
private final Map<String, LettuceConnectionFactory> connectionFactoryMap;
private static final ThreadLocal<String> currentRedis = new ThreadLocal<>();
public MultiRedisLettuceConnectionFactory(Map<String, LettuceConnectionFactory> connectionFactoryMap) {
this.connectionFactoryMap = connectionFactoryMap;
}
public void setCurrentRedis(String currentRedis) {
if (!connectionFactoryMap.containsKey(currentRedis)) {
throw new RedisRelatedException("invalid currentRedis: " + currentRedis + ", it does not exists in configuration");
}
MultiRedisLettuceConnectionFactory.currentRedis.set(currentRedis);
}
@Override
public void destroy() throws Exception {
connectionFactoryMap.values().forEach(LettuceConnectionFactory::destroy);
}
@Override
public void afterPropertiesSet() throws Exception {
connectionFactoryMap.values().forEach(LettuceConnectionFactory::afterPropertiesSet);
}
private LettuceConnectionFactory currentLettuceConnectionFactory() {
String currentRedis = MultiRedisLettuceConnectionFactory.currentRedis.get();
if (StringUtils.isNotBlank(currentRedis)) {
MultiRedisLettuceConnectionFactory.currentRedis.remove();
return connectionFactoryMap.get(currentRedis);
}
return connectionFactoryMap.get(MultiRedisProperties.DEFAULT);
}
@Override
public ReactiveRedisConnection getReactiveConnection() {
return currentLettuceConnectionFactory().getReactiveConnection();
}
@Override
public ReactiveRedisClusterConnection getReactiveClusterConnection() {
return currentLettuceConnectionFactory().getReactiveClusterConnection();
}
@Override
public RedisConnection getConnection() {
return currentLettuceConnectionFactory().getConnection();
}
@Override
public RedisClusterConnection getClusterConnection() {
return currentLettuceConnectionFactory().getClusterConnection();
}
@Override
public boolean getConvertPipelineAndTxResults() {
return currentLettuceConnectionFactory().getConvertPipelineAndTxResults();
}
@Override
public RedisSentinelConnection getSentinelConnection() {
return currentLettuceConnectionFactory().getSentinelConnection();
}
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return currentLettuceConnectionFactory().translateExceptionIfPossible(ex);
}
}
Copy the code
The logic is very simple, that is, the interface is provided to set the Redis data source, and put into ThreadLocal, and only valid for the current, after reading the empty.
Then MultiRedisLettuceConnectionFactory as Bean registered to our ApplicationContext:
@ConditionalOnProperty(prefix = "spring.redis", value = "enable-multi", matchIfMissing = false) @Configuration(proxyBeanMethods = false) public class RedisCustomizedConfiguration { /** * @param builderCustomizers * @param clientResources * @param multiRedisProperties * @return * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration */ @Bean public MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory( ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers, ClientResources clientResources, MultiRedisProperties multiRedisProperties, ObjectProvider<RedisSentinelConfiguration> sentinelConfigurationProvider, ObjectProvider < RedisClusterConfiguration > clusterConfigurationProvider) {/ / reading configuration Map < String, LettuceConnectionFactory> connectionFactoryMap = Maps.newHashMap(); Map<String, RedisProperties> multi = multiRedisProperties.getMulti(); Multi. ForEach ((k, v) -> { We are just packing a layer on the outside of the RedisProperties LettuceConnectionConfiguration LettuceConnectionConfiguration = new LettuceConnectionConfiguration( v, sentinelConfigurationProvider, clusterConfigurationProvider ); LettuceConnectionFactory lettuceConnectionFactory = lettuceConnectionConfiguration.redisConnectionFactory(builderCustomizers, clientResources); connectionFactoryMap.put(k, lettuceConnectionFactory); }); return new MultiRedisLettuceConnectionFactory(connectionFactoryMap); }}Copy the code
Let’s test using embedded- Redis to launch local Redis for unit testing. We start two Redis, put different keys in the two Redis, verify the existence, and test the validity of synchronous interface, multi-threaded call synchronous interface, and multiple asynchronous interface without waiting subscription. :
import com.github.jojotech.spring.boot.starter.redis.related.lettuce.MultiRedisLettuceConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Mono;
import redis.embedded.RedisServer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
"spring.redis.enable-multi=true",
"spring.redis.multi.default.host=127.0.0.1",
"spring.redis.multi.default.port=6379",
"spring.redis.multi.test.host=127.0.0.1",
"spring.redis.multi.test.port=6380",
})
public class MultiRedisTest {
//启动两个 redis
private static RedisServer redisServer;
private static RedisServer redisServer2;
@BeforeAll
public static void setUp() throws Exception {
System.out.println("start redis");
redisServer = RedisServer.builder().port(6379).setting("maxheap 200m").build();
redisServer2 = RedisServer.builder().port(6380).setting("maxheap 200m").build();
redisServer.start();
redisServer2.start();
System.out.println("redis started");
}
@AfterAll
public static void tearDown() throws Exception {
System.out.println("stop redis");
redisServer.stop();
redisServer2.stop();
System.out.println("redis stopped");
}
@EnableAutoConfiguration
@Configuration
public static class App {
}
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ReactiveStringRedisTemplate reactiveRedisTemplate;
@Autowired
private MultiRedisLettuceConnectionFactory multiRedisLettuceConnectionFactory;
private void testMulti(String suffix) {
//使用默认连接,设置 "testDefault" + suffix, "testDefault" 键值对
redisTemplate.opsForValue().set("testDefault" + suffix, "testDefault");
//使用 test 连接,设置 "testSecond" + suffix, "testDefault" 键值对
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
redisTemplate.opsForValue().set("testSecond" + suffix, "testSecond");
//使用默认连接,验证 "testDefault" + suffix 存在,"testSecond" + suffix 不存在
Assertions.assertTrue(redisTemplate.hasKey("testDefault" + suffix));
Assertions.assertFalse(redisTemplate.hasKey("testSecond" + suffix));
//使用 test 连接,验证 "testDefault" + suffix 不存在,"testSecond" + suffix 存在
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertFalse(redisTemplate.hasKey("testDefault" + suffix));
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
Assertions.assertTrue(redisTemplate.hasKey("testSecond" + suffix));
}
//单次验证
@Test
public void testMultiBlock() {
testMulti("");
}
//多线程验证
@Test
public void testMultiBlockMultiThread() throws InterruptedException {
Thread thread[] = new Thread[50];
AtomicBoolean result = new AtomicBoolean(true);
for (int i = 0; i < thread.length; i++) {
int finalI = i;
thread[i] = new Thread(() -> {
try {
testMulti("" + finalI);
} catch (Exception e) {
e.printStackTrace();
result.set(false);
}
});
}
for (int i = 0; i < thread.length; i++) {
thread[i].start();
}
for (int i = 0; i < thread.length; i++) {
thread[i].join();
}
Assertions.assertTrue(result.get());
}
//reactive 接口验证
private Mono<Boolean> reactiveMulti(String suffix) {
return reactiveRedisTemplate.opsForValue().set("testReactiveDefault" + suffix, "testReactiveDefault")
.flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.opsForValue().set("testReactiveSecond" + suffix, "testReactiveSecond");
}).flatMap(b -> {
return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {
Assertions.assertTrue(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {
Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveDefault" + suffix);
}).map(b -> {
Assertions.assertFalse(b);
System.out.println(Thread.currentThread().getName());
return b;
}).flatMap(b -> {
multiRedisLettuceConnectionFactory.setCurrentRedis("test");
return reactiveRedisTemplate.hasKey("testReactiveSecond" + suffix);
}).map(b -> {
Assertions.assertTrue(b);
return b;
});
}
//多次调用 reactive 验证,并且 subscribe,这本身就是多线程的
@Test
public void testMultiReactive() throws InterruptedException {
for (int i = 0; i < 10000; i++) {
reactiveMulti("" + i).subscribe(System.out::println);
}
TimeUnit.SECONDS.sleep(10);
}
}
Copy the code
Run the test and pass.
Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers