Redis5 Stream

Redis. IO/switchable viewer/stre…

SpringBoot integration

A, POM. XML

<! -- redis --><dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
Copy the code

Second, the configuration

spring:
    redis:
        database: 0
        host: 127.0. 01.
        port: 6379
        password:
        timeout: 2000
        lettuce:
          pool:
            max-active: 8
            max-wait: -1
            max-idle: 8
            min-idle: 0
Copy the code

Push message to Stream with StringRedisTemplate


    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    public void sendExchangeMsg(TestDto testDto) {
        // Assemble send message parameters
        try {
            ObjectRecord objectRecord = ObjectRecord.create(ContainerKey.STEAM_KEY, testDto);
            RecordId recordId = this.stringRedisTemplate.opsForStream().add(objectRecord);
         recordId);
        }catch (Exception ex){
            log.error("Error sending message, testDto={}", testDto, ex); }}Copy the code

Consume messages from Stream

OrderMsgListener uses ApplicationRnner to initialize the listener after the system starts. Start listening for consumption.

@Slf4j
@Component
public class RedisStreamRunner implements ApplicationRunner.DisposableBean {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private OrderMsgListener orderMessageListener;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${server.port}")
    private String port;

    private StreamMessageListenerContainer<String, ObjectRecord<String, TestDto>> streamMessageListenerContainer;

    private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, TestDto>> options() {
        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                .batchSize(400)
                .executor(this.threadPoolTaskExecutor)
                .errorHandler(t -> log.error("redis msg listener error, e:{}", t.getMessage()))
                .pollTimeout(Duration.ZERO)
                .serializer(new StringRedisSerializer())
                .targetType(TestDto.class)
                .build();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // Initialize the container
        streamMessageListenerContainer = StreamMessageListenerContainer.create(this.redisConnectionFactory, options());
        String streamKey = ContainerKey.STEAM_KEY;
        String groupId = ContainerKey.STEAM_GROOP;
        String consumerName = Inet4Address.getLocalHost().getHostName() + ":" + port;
        log.info("streamKey:{},groupId:{},consumerName{}",streamKey,groupId,consumerName);
        // Check whether the stream is initialized or not
        if (!Boolean.TRUE.equals(stringRedisTemplate.hasKey(streamKey))) {
            // If you send a message to a stream, the stream is automatically created
            RecordId recordId = stringRedisTemplate.opsForStream().add(streamKey, Collections.singletonMap("testKey"."testV"));
            // Create a consumer group
            stringRedisTemplate.opsForStream().createGroup(streamKey, groupId);
            // Delete create
            stringRedisTemplate.opsForStream().delete(streamKey, recordId);
        }
        // Use a listener container to listen for messages and respond automatically
        streamMessageListenerContainer.receiveAutoAck(
                Consumer.from(groupId, consumerName),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                orderMessageListener);
        // Start the listener
        this.start();
    }

    /** * The subscription container starts */
    public void start() {
        streamMessageListenerContainer.start();
    }

    @Override
    public void destroy() throws Exception {
        this.streamMessageListenerContainer.stop();
    }
Copy the code

OrderMsgListener implements function interface StreamListener<K, V extends Record<K,? >>, from the definition of the message consumption implementation

@Slf4j
@Component
public class OrderMsgListener implements StreamListener<String.ObjectRecord<String.TesyDto>> {


    @Autowired
    private IOrderService iOrderService;

    @Override
    public void onMessage(ObjectRecord<String, TestDto> testDto) {
        / / message ID
// RecordId messageId = testDto.getId();
        // Key and value of the message
        TestDto testDto = testDto.getValue();
        try {
            iOrderService.paySuccess(testDto);
        }catch (Exception e){
            log.error("Message processing exception order number :{}, exception message is {}",testDto.getOrderNo(),e.getMessage()); }}}Copy the code