Redis publishing and subscription usage scenarios and JAVA code implementation (including source code)

Introduction:

Redis is a noSQL database product we are very commonly used, we usually use Redis to match the use of relational database, make up for the deficiency of relational database.

Redis’ publish and subscribe feature is also a highlight. Although it is not a publish-subscription product, it has publish-subscription functionality that meets our daily needs.

So how does Redis publish and subscribe and what scenarios can it be used in? Today we are going to explore that question.

What is publish subscription

Publish subscription is when a publisher publishes a message and a subscriber receives a message, and the two are related through some medium. This is similar to the old “subscription”, when we subscribed to a certain newspaper (such as the Financial newspaper), when a new issue of the newspaper was published, a postman would deliver it to us. In other words, only those who subscribe to the newspaper will receive the new newspaper published by the publisher.

Redis has a similar publish-and-subscribe feature, with publishers first and subscribers second. Once you have publishers and subscribers, what’s missing?

That is, the “certain newspaper” mentioned above, not every newspaper published by the publishing house (such as People’s Daily, financial news, sports news) will be sent to you, but it is clear that you want to decide which kind of newspaper, you decide which kind will be sent to you.

Back to Redis publication subscription, the above “certain newspaper” is abstracted into a channel. After the client subscribs to a channel, when the publisher publishes the news through this channel, all subscribers will receive the news published by this channel.

PUBLISH and subscribe When a client sends information to subscribers through a PUBLISH command, the client is called a publisher. When a client receives information using a SUBSCRIBE or PSUBSCRIBE command, we call the client a subscriber. To decouple the relationship between publisher and subscriber, Redis uses a channel as an intermediary between the two — the publisher issues information directly to the channel, which takes care of sending information to the appropriate subscriber. Publishers and subscribers have no relationship with each other and are unaware of each other’s existence.

Redis client A
Redis client B
channel
Financial newspapers
Redis client C
channel
Financial newspapers
Stocks are up today!
Redis client A
Redis client B

The principle of

Redis is realized by USING C. By analyzing the pubsub. C file in Redis source code, we can understand the underlying implementation of the publishing and subscription mechanism and deepen our understanding of Redis.

Redis implements PUBLISH and SUBSCRIBE through commands such as PUBLISH, SUBSCRIBE, and PSUBSCRIBE.

After subscribing to a channel with the SUBSCRIBE command, a dictionary is maintained in Redis-server. The key of the dictionary is a channel, and the value of the dictionary is a linked list, which stores all the clients that SUBSCRIBE to this channel. The key to the SUBSCRIBE command is to add a client to the subscription list for a given channel.

Using a PUBLISH command to send a message to a subscriber, redis-server uses the given channel as a key, looks up a list of clients that have subscribed to the channel in the channel dictionary it maintains, iterates through the list, and publishes the message to all subscribers.

Detailed reference: Redis publish/subscribe mechanism analysis

The business scenario

Now that we know the principles and basic flow of Redis publish and subscribe, let’s take a look at what Redis publish and subscribe can do.

1. Asynchronous message notification

For example, when the channel calls the payment platform, we can give the payment platform a callback interface to inform us of the payment status, and we can also use the Redis publish subscription to achieve this. For example, when we initiate a payment and subscribe to pay_notice_ + WK (if our channel id is WK, we can’t get other channels to subscribe to this channel), when the payment platform finishes processing the channel, the payment platform sends a message to the channel’s subscribers informing them of the payment information and status of the order. After receiving the message, update the order information and subsequent operations based on the message content.

When a lot of people call a payment platform, it can be a problem to subscribe to the same channel when they pay. For example, user A pays for pay_notice_wk, and then user B pays for pay_notice_wk while the payment platform is still processing it. After user A receives the notification, user B’s pay notification is also announced, and the channel does not receive A second notification. Because when the same channel receives a message, the subscription is automatically cancelled, that is, the subscription is one-time.

So the order status we subscribe to has to be unique, one channel per order, and we can add the order number Pay_notice_wk +orderNo to make the channel unique. In this way, we can send the channel number as a parameter when we pay, and the payment platform can use this channel to release messages to us after processing. (In practice, interface callback notifications are mostly used, because publishing subscriptions with Redis is very restrictive and systems must share a set of Redis)

2. Task notification

For example, the batching system tells the application system to do something (when the batching system can’t get user data and the application system can’t do scheduled tasks). If the user data of some users are loaded to Redis at 3 o ‘clock in the morning every day, the application system cannot do scheduled tasks. Instead, the batch running system can issue tasks to the application system through the system’s public Redis, and the application system receives instructions to do corresponding operations.

Is it important to note that in the case of online cluster deployment, all service instances are notified to do the same? It’s totally unnecessary. You can use Redis to implement the locking mechanism, and one of the instances takes the lock and executes the task. In addition, if the task is time-consuming, it can not be locked, you can consider task fragmentation execution. Of course, this is beyond the scope of this article, not to be repeated here.

3. Refresh and load parameters

As we all know, we use Redis is nothing more than the system does not change, query and more frequent data cache, such as our system home page of the wheel broadcast graph ah, dynamic links to the page ah, some system parameters ah, public data ah are loaded into Redis, and then there is a background management system to configure and modify these data.

For example, if we want to add another chart to the round-robin chart on our home page, we should add it to the back-end system, and that’s it? Of course not, because Redis is still old data. Would you say there is no expiration date? Yes, but what if the expiration time is set to a longer time like 24 hours and we want it to take effect immediately? At this point, we can use Redis publish and subscribe mechanism to achieve real-time data refresh. When we are finished modifying the data, click the refresh button, and through the publish and subscribe mechanism, subscribers can call the reload method after receiving the message.

Code implementation

The theory of publish and subscribe and its usage scenarios are well understood, but how to implement publish and subscribe in code? Here is the implementation to share with you.

Let’s take the third usage scenario as an example and look at the overall implementation class diagram first.

To clarify, we first define a unified interface, ICacheUpdate, with only one update method. We have the Service layer implement this method to perform specific update operations. Let’s look at RedisMsgPubSub, it inherits redis. Clients. Jedis. JedisPubSub, main rewrite its onMessage () method (subscription channel is message arrival triggered when this method). In this method we call the update method of RedisMsgPubSub to perform the update operation. When we have multiple services implementing ICacheUpdate, there is an urgent need for a manager to centrally manage these services and to tell the onMessage method which ICacheUpdate implementation class to call when the onMessage method is fired. So we have PubSubManager. And we start a separate Thread to maintain publish subscriptions, so the manager inherits the Thread class.

Specific code:

Unified interface

ICacheUpdate.java

public interface ICacheUpdate {
    public void update(a);
}
Copy the code

The Service layer

Implement ICacheUpdate update method to perform specific update operations

InfoService.java

public class InfoService implements ICacheUpdate {
	private static Logger logger = LoggerFactory.getLogger(InfoService.class);
	@Autowired
	private RedisCache redisCache;
	@Autowired
	private InfoMapper infoMapper;
	/** * Query information by information type *@return* /
	public Map<String, List<Map<String, Object>>> selectAllInfo(){
		Map<String, List<Map<String, Object>>> resultMap = new HashMap<String, List<Map<String, Object>>>();
		List<String> infoTypeList = infoMapper.selectInfoType();// All types of information involved in the information table
		logger.info("------- Find public information by message type start ----"+infoTypeList);
		if(infoTypeList! =null && infoTypeList.size()>0) {
			for(String infoType : infoTypeList) { List<Map<String, Object>> result = infoMapper.selectByInfoType(infoType); resultMap.put(infoType, result); }}return resultMap;
	}
	@Override
	public void update(a) {
		// Cache home page information
		logger.info(InfoService selectAllInfo flushes cache);
		Map<String, List<Map<String, Object>>> resultMap = this.selectAllInfo();
		Set<String> keySet = resultMap.keySet();
		for(String key:keySet){ List<Map<String, Object>> value = resultMap.get(key); redisCache.putObject(GlobalSt.PUBLIC_INFO_ALL+key, value); }}}Copy the code

Redis publish subscribe extension class

Function:

1. Uniformly manage ICacheUpdate and add all classes implementing ICacheUpdate interface to the Updates container

2. Rewrite the onMessage method to refresh the cache after subscribes to the message

RedisMsgPubSub.java

/** * Redis updates the ICacheUpdate interface by adding all ICacheUpdate classes to the updates container */
public class RedisMsgPubSub extends JedisPubSub {
    private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class);
    private Map<String , ICacheUpdate> updates = new HashMap<String , ICacheUpdate>();
    //1. ICacheUpdate is centrally managed by Updates
    public boolean addListener(String key , ICacheUpdate update) {
        if(update == null) 
            return false;
	updates.put(key, update);
	return true;
    }
    / * * * 2, rewrite the onMessage method, subscribe to news refresh cache operation * * / subscription channels received news
    @Override  
    public void onMessage(String channel, String message) {
        logger.info("RedisMsgPubSub onMessage channel:{},message :{}" ,channel, message);
        ICacheUpdate updater = null;
        if(StringUtil.isNotEmpty(message)) 
            updater = updates.get(message);
        if(updater! =null)
            updater.update();
    }
    //other code...
}
Copy the code

Publish subscription manager

Operations performed:

1. Add all Service classes that need to be refreshed (implementing the ICacheUpdate interface) to updates of RedisMsgPubSub

2. Start thread to subscribe to pubsub_config channel, subscribe again five seconds after receiving the message (avoid ending the subscription after subscribing to a message)

PubSubManager.java

public class PubSubManager extends Thread{
    private static Logger logger = LoggerFactory.getLogger(PubSubManager.class);

    public static Jedis jedis;
    RedisMsgPubSub msgPubSub = new RedisMsgPubSub();
    / / channel
    public static final String PUNSUB_CONFIG = "pubsub_config";
    //1. Add all Service classes that need to be refreshed (implementing the ICacheUpdate interface) to the updates of RedisMsgPubSub
    public boolean addListener(String key, ICacheUpdate listener){
        return msgPubSub.addListener(key,listener);
    }
    @Override
    public void run(a){
        while (true) {try {
                JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class);
                if(jedisPool! =null){
                    jedis = jedisPool.getResource();
                    if(jedis! =null) {//2. Start thread subscription pubsub_config channel blockedjedis.subscribe(msgPubSub,PUNSUB_CONFIG); }}}catch (Exception e) {
                logger.error("redis connect error!");
            } finally {
                if(jedis! =null)
                    jedis.close();
            }
            try {
                //3. Subscribe again five seconds after receiving a message (avoid ending the subscription after subscribing to a message)
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("InterruptedException in redis sleep!"); }}}}Copy the code

At this point, Redis publish subscription is largely implemented. When do we turn it on? We can choose to subscribe and completion of the starting project based data loading, so we by implementing javax.mail. Servlet. SevletContextListener to complete this operation. Then add listeners to web.xml.

CacheInitListener.java

/** * Load system parameters */
public class CacheInitListener implements ServletContextListener{
    private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class);

    @Override
    public void contextDestroyed(ServletContextEvent arg0) {}@Override
    public void contextInitialized(ServletContextEvent arg0) {
        logger.info("-- CacheListener initialization starts --");
        init();
        logger.info("-- CacheListener initialization completed --");
    }

    public void init(a) {
        try {
            // Get the manager
            PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class);

            InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
            // Add to the manager
            pubSubManager.addListener("infoService", infoService);
            //other service...

            // Start the thread to execute the subscription operation
            pubSubManager.start();
            // Initialize the load
            loadParamToRedis();
        } catch(Exception e) { logger.info(e.getMessage(), e); }}private void loadParamToRedis(a) {
        InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
        infoService.update();
        //other service...}}Copy the code

web.xml

<listener>
	<listener-class>com.xxx.listener.CacheInitListener</listener-class>
</listener>
Copy the code

“The end”

The article was first published on the public account @Programdadao