preface

In many Internet application systems, request processing asynchrony is a common way to improve system performance, and asynchronous processing based on message system is widely used in Internet systems with high concurrent requests because of its characteristics of high reliability and high throughput. At the same time, this scheme also brought calls link to deal with problems, because most applications will require synchronous response real-time processing results, and as a result of the request processing through the message already asynchronous decoupling, so the whole call link becomes asynchronous link, at this time how the originators of synchronous request link to get the response as a result, Additional system design considerations are required.

In order to understand this problem more clearly, Xiao Xiao gives us a picture to describe the IOT system of shared bikes that he is working on recently, as shown in the picture:

In the above system process, the terminal device and the server are connected through MQTT protocol, and MQTT protocol is essentially an asynchronous message connection mode. Therefore, after the business application (the order system in the figure) initiates the lock unlocking request, IOT application system will be in the form of the MQTT protocol through the Internet of things platform (used here is AWS IOT service) to the device by the lock down, and the process in the IOT application system and the Internet of things platform after the interaction of the synchronous calls link is over, because the Internet of things platform and lock device through the MQTT messaging service asynchronous decoupling between the, Of course, the iot platform uses a series of reliable messaging mechanisms to ensure that lockpick messages are sent to the listening MQTT queue of the specified device. Whether the locking device can receive MQTT messages in time depends on the mobile network condition of the locking device at that time.

After receiving the MQTT unlocking message, the locking device will trigger the hardware device to complete the unlocking action through the embedded software system, and then it needs to feedback the unlocking result to the server through the MQTT uplink message, so that the server system can determine whether to create cycling orders and calculate the costs. This process requires the Iot platform to monitor the MQTT upstream message queue corresponding to the specified locking device. Since the Iot platform needs to be connected to tens of thousands of devices, it is impossible to directly forward the MQTT upstream message generated by each locking device to the Iot application system. Therefore, in the Internet of Things platform, MQTT messages of a class of devices can be forwarded to a specific business message queue, such as the lock unlocking uplink message. MQTT unlocking response uplink messages of all devices can be forwarded to the Iot business message queue that represents the lock unlocking response, such as “iOT_UPSTREAM_lock_response”. In this way, the Iot business system no longer needs to pay attention to the MQTT message of the underlying device, and can process the lock unlock response result in a way more conducive to business understanding.

Now the problem is that the downlink and uplink messages of unlocking through MQTT protocol have been completely in two different asynchronous network links, and the initiator of the link needs to wait for the result of unlocking synchronously at this time, but in fact the synchronization link has already been completed after the Iot application system sends the unlocking message to the Internet of Things platform. Therefore, in order to meet the synchronous request/response needs of the caller, it is necessary to perform additional synchronous blocking wait after the Iot application system sends the lock unlocking message, and listen to the Iot service message queue “iOT_upSTREAM_LOCK_response” for the upstream message about the lock unlocking request. Then the synchronous blocking wait logic is terminated to realize the synchronous call effect of returning the result of real-time unlock response to the business caller. So how do you implement additional synchronous blocking and listen for callback messages in the above process? In the following content and everyone together to discuss the specific implementation plan!

Solution Analysis

The above problem is a common requirement in the application scenario where message services are used for asynchronous decoupling. Because the asynchronous invocation link is very long, the common solution is to block synchronously at the beginning of the invocation chain, and implement it by callback at the end of the invocation chain, as shown in the following figure:

In the graphic above, link after initial queue is sent first asynchronous message opens a temporary queue and a synchronized block to monitor the temporary callback message queue, and the end of the link queue after complete the logic processing need starting temporary queue queue listener callback, and because of the request has been blocked thread to monitor the state of temporary queue, Therefore, once the callback message is received, the blocking process can be completed to complete the synchronous response of the entire link. Although the above logic can be implemented by common message-oriented middleware, for example, the former company of Codeman has implemented synchronous call of message link via temporary queue based on RabbitMQ, the message-oriented approach is somewhat cumbersome. For the common message middleware such as RocketMQ and RabbitMQ, asynchronous message is its strength. If the synchronization of message invocation link is achieved at the cost of creating and destroying a large number of temporary queues, it will not only be troublesome to use, but also bring some bad effects on the stability of the message middleware.

Therefore, in the IOT system mentioned above, we adopt the publish/subscribe function based on Redis to realize the synchronous call of asynchronous message link. Due to the high performance and rich application scenarios of Redis, Redis is very suitable for scenarios with frequent data changes. It can be used as NoSQL database in the system, and also supports distributed locks and other functions, so the cost performance of maintenance is relatively high. Next we are based on the Spring Boot development framework to demonstrate how to use Redis publish/subscribe to achieve asynchronous message link synchronous callback!

Redis publish subscribe mechanism

Redis itself can realize certain message queue functions through the publish and subscribe mechanism. In Redis, subscribe/publish and other commands can realize the publish and subscribe function. Based on this, the original IOT system processing diagram is as follows:

As shown in the figure above, after sending asynchronous MQTT messages to the IOT application end, the Key composed of message IDS will be used as the channel ****, and the requesting thread keeps synchronous monitoring of the channel until receiving the upstream message of unlock result from the IOT service message queue. At the consuming end of message queue, the uplink message is published to the channel also composed of message requestId, so as to realize the synchronous call effect of asynchronous message system based on Redis publish and subscribe mechanism.

Spring Boot code implementation

The following is a demonstration of how to implement it through code based on Spring Boot. After creating a Spring Boot project, we introduce the Spring Boot Redis integration dependency package as follows:

<! --Redis dependency --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>Copy the code

Then add Redis service connection information to the configuration file of the project, as shown below:

Spring: redis: host: 127.0.0.1 Port: 6379 password: 123456Copy the code

At this point, the project has the ability to access Redis. Next, we will demonstrate the function through specific code implementation. The subscription listening code is as follows:

@RestController
@RequestMapping("/iot") public class IotController {/ / injection Redis message container object @autowired RedisMessageListenerContainer RedisMessageListenerContainer; @RequestMapping(value ="/unLock", method = RequestMethod.POST)
    public boolean unLock(@RequestParam(value = "thingName") String thingName,
            @RequestParam(value = "requestId") String requestId) throws InterruptedException, ExecutionException, TimeoutException {// This implements asynchronous message call processing.... // Generate listening channel Key String Key ="IOT_" + thingName + "_"+ requestId; // Create listener Topic ChannelTopic ChannelTopic = new ChannelTopic(key); IotMessageTask IotMessageTask = new IotMessageTask(); / / object and listening tasks Topic added to the message to monitor container try {redisMessageListenerContainer. AddMessageListener (new IotMessageListener (iotMessageTask), channelTopic); System.out.println("start redis subscribe listener->"+ key); // Enter synchronous blocking wait, Timeout is set to 60 seconds Message Message = (Message) iotMessageTask. GetIotMessageFuture () get (60000, TimeUnit. MILLISECONDS); System.out.println("receive redis callback message->"+ message.toString()); } finally {// Destroys the message listener objectif (iotMessageTask != null) {
                redisMessageListenerContainer.removeMessageListener(iotMessageTask.getMessageListener());
            }
        }
        return true; }}Copy the code

In the above code, we simulated a lock unlocking request. After the completion of asynchronous message processing, Redis subscription listening will be enabled. In order to achieve asynchronous blocking, we also need to create a message task object, the code is as follows:

Public class IotMessageTask<T> {// Declare thread asynchronous block object (JDK 1.8 new Api) private CompletableFuture<T> iotMessageFuture = new CompletableFuture<>(); Private MessageListener MessageListener; Private Boolean isTimeout; publicIotMessageTask() {
    }
    public CompletableFuture<T> getIotMessageFuture() {
        return iotMessageFuture;
    }
    public void setIotMessageFuture(CompletableFuture<T> iotMessageFuture) {
        this.iotMessageFuture = iotMessageFuture;
    }
    public MessageListener getMessageListener() {
        return messageListener;
    }
    public void setMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }
    public boolean isTimeout() {
        return isTimeout;
    }
    public void setTimeout(boolean timeout) { isTimeout = timeout; }}Copy the code

In the message task object, we use the new CompletableFuture class provided by JDK1.8 to implement the thread blocking effect, and improve the processing mechanism by defining the message listener object and timeout time. In addition, according to the Controller layer code, you also need to customize the message listening processing object, the code is as follows:

public class IotMessageListener implements MessageListener { IotMessageTask iotMessageTask; public IotMessageListener(IotMessageTask iotMessageTask) { this.iotMessageTask = iotMessageTask; @override public void onMessage(Message Message, byte[] bytes) {system.out.println ("subscribe redis iot task response:{}"+ message.toString()); / / thread block complete iotMessageTask getIotMessageFuture (.) complete (the message); }}Copy the code

At this point, the writing of the logic of Redis service subscription is completed. In the subsequent logical processing, message publishing is required to end the blocking wait here normally. Next, we write a piece of code to simulate message publishing, the code is as follows:

@RestController
@RequestMapping("/iot"Public class IotCallBackController {Autowired StringRedisTemplate StringRedisTemplate; @RequestMapping(value ="/unLockCallBack", method = RequestMethod.POST)
    public boolean unLockCallBack(@RequestParam(value = "thingName") String thingName,
            @RequestParam(value = "requestId") String requestId) {// Generate listen channel Key String Key ="IOT_" + thingName + "_"+ requestId; . / / simulation implementation news callback stringRedisTemplate convertAndSend (key,"this is a redis callback");
        return true; }}Copy the code

At this time, the Spring Boot application calls the lock simulation interface, and the logic will be in the subscription wait state temporarily. The simulation then invokes the unlock callback Redis message publishing logic, and the blocking wait completes the synchronous return due to listening for the callback.