Redis’s publish/subscribe model

Redis implements SUBSCRIBE and PUBLISH mode through PUBLISH, SUBSCRIBE and other commands. This function provides two information mechanisms, namely SUBSCRIBE/PUBLISH to channel and SUBSCRIBE/PUBLISH to channel. This article discusses the implementation of SUBSCRIBE/PUBLISH to channel

This model is similar to RocketMQ’s broadcast model, consumer subscription Topic

The figure shows that after Posting a message to Channel1, each client receives a message

Although Redis can implement publish/subscribe functionality, it has the following disadvantages, so be careful before choosing it

  • 1. The message cannot be persisted and may be lost

Unlike regular MQ, the redis implementation of the publish/subscribe model message cannot be persisted, and once published, the message is lost even if no subscriber processes it

  • 2. There is no ACK mechanism

That is, the publisher does not ensure that the subscriber receives successfully

  • 3. Broadcast mechanism, downstream consumption capacity depends on the consumer itself

The broadcast mechanism cannot increase consumption power by adding multiple consumers because this is incompatible with the purpose of the publish/subscribe model itself. The purpose of the broadcast mechanism is for a publisher to be processed differently by multiple subscriptions

Redis publish/subscribe application scenario

Due to the flaws of the Redis publish/subscribe model, there are a few things to consider before using it

  • 1. The reliability requirement for message processing is not strong
  • 2. Consumption power does not need to be enhanced by increasing the number of consumers. Considering the above two points, the following scenarios can come to mind
  • 1. After the user registers, relevant preferential information will be sent
  • 2. A user changes the user name. Because some service tables have redundant fields for the user name, the user can subscribe to the channel that changes the user name to trigger the field modification of each service table

Specific usage needs to consider the requirements of the business scenario

SpringBoot uses Redis’ publish/subscribe functionality

In the current SpringBoot Redis operation, it is officially recommended to use spring-data-redis in the SpringData module. Therefore, spring-data-redis is used in the following sections

The following needs to have a certain basic understanding of Springboot project

Maven relies on Redis components

<dependency>
	<groupId>org.springframework.data</groupId>
	<artifactId>spring-data-redis</artifactId>
</dependency>
Copy the code

2. Redis serialization configuration

Serialization is used GenericJackson2JsonRedisSerializer, use this class can be serialized right Null objects. If you use Jackson2JsonRedisSerializer, serial Numbers will object into an empty array.

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisSerializer<Object> redisSerializer, RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);
        template.setKeySerializer(RedisSerializer.string());
        template.setDefaultSerializer(redisSerializer);
        return template;
    }

    @Bean
    public RedisSerializer<Object> redisSerializer(a){
        GenericJackson2JsonRedisSerializer redisSerializer = new GenericJackson2JsonRedisSerializer();
        returnredisSerializer; }}Copy the code

3. Configure the publishing method

For simplicity, use SpringSchedule here to publish messages periodically

@EnableScheduling
@Component
public class RedisPublisher {

    private static final Logger log = LoggerFactory.getLogger(RedisPublisher.class);

    @Autowired
    private RedisTemplate<Object, Object> redisTemplate;

    private AtomicInteger incrInteger = new AtomicInteger();


    @Scheduled(initialDelay = 500, fixedDelay = 10000)
    public void publish(a) {
        int incrementAndGet = incrInteger.incrementAndGet();
        String topic = "redis/test";
        String message = "current num : " + incrementAndGet;
        log.info("Release the news.. Topic: {}, content: {}", topic, message); redisTemplate.convertAndSend(topic, message); }}Copy the code

4. Subscription configuration

The subscription application, there are two of the more important class, is the MessageListenerAdapter and RedisMessageListenerContainer respectively MessageListenerAdapter MessageListener is used to adapt customized consumer classes. The class must have a public consuming method that takes two arguments,arg1 for channel and arg2 for Message. The reason can be found in the MessageListenerAdapter source code

4.1 the MessageListenerAdapter and a custom consumer

In the MessageListenerAdapter. OnMessage method, through reflection method calls for consumer, and the method of the parameters and sequence hard-coded, so you must provide a public methods in the consumer

4.2 RedisMessageListenerContainer

From the official document, it is known that after RedisMessageListenerContainer function is used to receive messages distributed, and through the internal thread pool for asynchronous distributed, (you can also use a custom thread pool and related policy failure)

4.3 Complete Subscription Configuration

@Configuration
public class ConsumerConfig {

    @Bean
    public MessageListenerAdapter processorOne(RedisSerializer<Object> serializer, RedisConsumer redisConsumer) {

        MessageListenerAdapter adapter = new MessageListenerAdapter(redisConsumer, "onMessage");
        adapter.setSerializer(serializer);
        return adapter;
    }

    /** ** Supports dynamic listening **@param adapter
     * @return* /
    @Bean
    public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        // Specify the topic serialization method,String
        container.setTopicSerializer(RedisSerializer.string());
        // Add a listener
        container.addMessageListener(adapter, new PatternTopic("redis/**"));
        returncontainer; }}Copy the code

Start publishers and subscribers to view logs

Start one publisher, two subscribers

A disadvantage of the Redis publish-subscribe model is that some of the published messages are discarded because of the publisher who started them first

Publisher log

Subscriber 1 log

Subscriber 2 log