Make writing a habit together! This is the fourth day of my participation in the “Gold Digging Day New Plan · April More text Challenge”. Click here for more details.
Yesterday was a simple demo of Reactor, today we will take a look at how to operate Redis. The operation method is the same as before, but the original synchronous method is changed to asynchronous implementation. ,
To configure
We need to operate redis, so we need REDis support, so we need JDBC support, we need to import dependencies:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
Copy the code
You need to specify the location of redis: spring.redis.host.
To get the code
We need the support of ReactiveStringRedisTemplate, but does not provide, so we need to define your own one
@Bean
ReactiveStringRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
return new ReactiveStringRedisTemplate(factory);
}
Copy the code
We can see that the factory has the redis connection we need.
Use the shared lock CountDownLatch. Since we have a group of two threads doing this, the number of tasks is one. Then, to see the effect, print a wait message.
CountDownLatch cdl = new CountDownLatch(1);
log.info("Waiting");
cdl.await();
Copy the code
Query data, this one is simple: it is a functional interface mapping object
List<Coffee> list = jdbcTemplate.query(
"select * from t_coffee", (rs, i) ->
Coffee.builder()
.id(rs.getLong("id"))
.name(rs.getString("name"))
.price(rs.getLong("price"))
.build()
);
Copy the code
To traverse the list as a stream and execute in a single thread:
Flux.fromIterable(list)
.publishOn(Schedulers.single())
.doOnComplete(() -> log.info("list ok"))
Copy the code
One – to – one mapping is stored in Redis as a stream
ReactiveHashOperations<String, String, String> hashOps = redisTemplate.opsForHash()
Copy the code
.flatMap(c -> {
log.info("try to put {},{}", c.getName(), c.getPrice());
return hashOps.put(KEY, c.getName(), c.getPrice().toString());
})
Copy the code
When done, print OK, followed by setting the expiration time for the key
.doOnComplete(() -> log.info("set ok"))
.concatWith(redisTemplate.expire(KEY, Duration.ofMinutes(1)))
.doOnComplete(() -> log.info("expire ok"))
Copy the code
Then we subscribe, see if the subscription is successful, and the task is complete.
.subscribe(b -> log.info("Boolean: {}", b),
e -> log.error("Exception {}", e.getMessage()),
() -> cdl.countDown());
Copy the code
You can see that the subscription is successful and the status is correct.
In fact, and we mentioned before the use of redis is not much different, just start another thread, and then the original synchronous operation becomes asynchronous, even a simple example of responsive programming, have you learned?