[by] the original links: blog.csdn.net/yelvgou9995…

As we all know, Redis is a high-performance distributed key-value storage system. In the NoSQL database market, Redis itself occupies nearly half of the market, which is enough to see its strength. Also, because of the single-threaded nature of Redis, we can use it as a message queue. This article shows how to integrate Redis into Spring Boot and use it as a message queue…

What is a message queue

A message queue is a container that holds messages during their transmission. — Baidu Encyclopedia

A message can be thought of as data that passes through a computer or an entire computer network.

Queues are one of the basic data structures that we learn when we learn data structures, and they have first-in, first-out characteristics.

So, a message queue is a container that holds messages, and it has a first-in, first-out feature.

Why do message queues exist?

  1. Asynchronous: In the B/S architecture, the client sends a request to the server, but the server takes a long time to process the request. If the client waits for the server to finish processing the request, the system resources on the client will be wasted. After using the message queue, the server directly pushes the message to the message queue, and the special message processing program processes the message, so that the client does not have to spend a lot of time waiting for the response of the server;
  2. Decoupling: traditional software development mode, the call between modules is direct call, such a system is not conducive to the expansion of the system, at the same time, the mutual call between modules, the sharing of data between the problem is also very big, each module should always consider whether other modules will hang; With message queues, modules do not call each other directly, but through data, and when a module dies, the data is still stored in the message queue. The most typical is the producer-consumer model, which is used in this case;
  3. Peak load: At a certain point, the number of concurrent requests exceeds the maximum processing capacity of the system. If no processing is done, the system will crash. After using the message queue, the server pushes the request to the message queue, and the special message processor consumes the message at a reasonable speed, reducing the pressure on the server.

Here’s a quick look at message queues

As can be seen from the figure above, the message queue acts as a middleman and we can manipulate the message queue to ensure the stability of our system.

Two, environmental preparation

Java environment: JDK1.8

Spring Boot version: 2.2.1.release

Redis-server version: 3.2.100

Third, related dependencies

Only redis-related dependencies are shown here,

Here are two dependencies to explain:

  • The first dependency is support for Redis NoSQL
  • The second dependency is the combination of Spring Integration and Redis, which was added here primarily for distributed locking

4. Configuration files

Only redis-related configurations are shown here

Five, code configuration

Redis is used as the message queue, in the spring in the boot. Main show is a RedisTemplate convertAndSend () method and a MessageListener interface. So we’ll inject a RedisTemplate and a class that implements the MessageListener interface into the IOC container. Without further ado, let’s look at the code

Configuration RedisTemplate

The main purpose of configuring RedisTemplate is to configure the serialization method to solve the garble problem, and also to reduce the performance overhead.

Code line 12, we configure the default serialization way for GenericJackson2JsonRedisSerializer

In line 13, we configure StringRedisSerializer for the key

Code line 14, we configure the value of the hash table for GenericJackson2JsonRedisSerializer serialization of the way

RedisTemplate a brief introduction to several serialization methods

Redis queue Listener (consumer)

As mentioned above, the class associated with the Redis queue listener is an interface named MessageListener. Here is the source code for this interface

As you can see, the interface has only one onMessage(Message Message, @Nullable Byte [] Pattern) method, which is the callback method that listens for messages in the queue. Let’s explain these two parameters:

  • Message: Redis message class, which has only two methods
    • Byte [] getBody() gets the message body in binary form
    • Byte [] getChannel() gets the message channel in binary form
  • Pattern: Message channel in binary form, the same value as message.getChannel()

Having introduced the interface, let’s implement a simple Redis queue listener

The code is simple, just the key information contained in the output parameters. Note that the implementation of RedisSerializer must be the same as the serialization configured above.

Once the queue listener is implemented, we need to add this listener to the Redis queue listener container as follows:

These lines of code roughly mean creating a New Redis message listener container, binding the listener to the pipe name, and returning the container.

Note that this pipe name must be the same as the pipe name used to push the message, as described below, otherwise the listener will not hear the message.

7. Redis Queue Push Service (producer)

So we’ve configured the RedisTemplate that we’re going to use here.

The code is as follows:

The key code is line 7, where redis.convertandSend () is a method that pushes a message (second argument) to a channel (argument 1).

Again, the producer and consumer channel names should be the same.

At this point, the producers and consumers of the message queue are all written.

Viii. Problems encountered and solutions

1. Spring Boot uses log4j2 logging framework

After I added the spring-boot-starter-log4j2 dependency and removed spring-boot-starter-logging in spring-boot-starter-web, I still received the following error when running the project:

<pre style="box-sizing: border-box; outline: 0px; margin: 0px 0px 24px; padding: 8px; position: relative; font-family: Consolas, Inconsolata, Courier, monospace; white-space: pre-wrap; word-wrap: break-word; overflow-x: auto; font-size: 14px; line-height: 22px; color: rgb(0, 0, 0);">SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in[the jar file:... the m2 / repository/ch/qos/logback/logback - classic / 1.2.3 / logback - classic - 1.2.3. Jar! /org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found bindingin [jar:file:.....m2/repository/org/apache/logging/log4j/log4 j - slf4j - impl / 2.12.1 /log4 j - slf4j - impl - 2.12.1 jar! /org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]</pre>
Copy the code

This error is caused by multiple logging frameworks in Maven. After dependency analysis, it is found that spring-boot-starter-logging is also dependent on spring-boot-starter-logging in spring-boot-starter-data-redis. The solution is also very simple, and detailed code is posted below

2. Redis queue listener thread safety issues

The listening mechanism of the Redis queue listener is that a thread listens to the queue, and when there are unconsumed messages in the queue, a new thread is generated to consume the messages. If you remember, I started by saying that redis was used for message queues because of its single-threaded nature, but if listeners were to generate new threads to consume information every time they received a message, that would completely miss out on the single-threaded nature of Redis, as well as create thread-safety issues.

A single consumer (one consumer per channel) solution

The simplest way to do this is to lock the onMessage() method, which is simple and useful, but it doesn’t control the rate at which the queue listens, and unlimited threads of creation can end up draining system resources.

So how do you solve this situation? The thread pool.

In the add listeners to the container configuration, RedisMessageListenerContainer setTaskExecutor class has a method (Executor taskExecutor) to monitor the container configuration thread pool. After the thread pool is configured, all threads are generated by the thread pool, so we can adjust the thread pool to control the rate at which the queue listens.

Multiple consumers (multiple consumers on a channel) solution

The problem of a single consumer is relatively simple compared to multiple consumers, because Java built-in locks can only control the execution of their own programs, can not interfere with the execution of other programs; However, many times today we develop in a distributed environment, and it makes sense to deal with multiple consumers.

So how to solve this problem? Distributed locks.

Here’s a brief overview of what distributed locks are:

Distributed lock means that in a distributed environment, only one client can obtain the lock from a shared environment (such as Redis) at a time. Only the client that obtains the lock can execute the program.

However, distributed locks generally meet the following requirements: exclusivity (that is, only one client can acquire the lock at a time), deadlock avoidance (that is, automatic release after timeout), and high availability (that is, the mechanism for acquiring or releasing the lock must be highly available and perform well)

In the dependencies section, we imported a spring-integration-Redis dependency that contains a number of useful utility classes. The next part of the distributed locking toolkit is RedisLockRegistry.

How to use it first. After importing the dependency, first configure a Bean

Constructor of RedisLockRegistry. The first parameter is the redis connection pool, the second parameter is the lock prefix, that is, the lock taken out, the key name is “demo-lock:KEY_NAME”, and the third parameter is the lock expiration time (seconds). The default is 60 seconds.

Using the lock method, the following changes are made to the listener

The code above only adds an injected RedisLockRegistry to the listener code above, one that obtains locks through the redislockregistry.obtain () method, and one that adds locks and one that unlocks. This completes the use of distributed locks.

Note that the Lock acquisition method, redislockregistry. obtain(), returns a Lock named RedisLock, which is a private inner class that implements the Lock interface, so we cannot create an instance of it from outside the code, only the obtian() method can obtain the Lock.

Article source: blog.csdn.net/yelvgou9995…

END