As we all know, message queue is an important component in the application system, mainly to solve the application decoupling, asynchronous messaging, traffic cutting and other problems, to achieve high performance, high availability, scalable and final consistency architecture. The most popular message queues are ActiveMQ, RabbitMQ, ZeroMQ, Kafka, MetaMQ and RocketMQ.
But if you don’t want to introduce a heavyweight (as opposed to Redis) MQ to your system, but want to enjoy features like decoupling, asynchronous messaging, etc., you’ll get there with this article, implementing a simpler version of MQ via Redis.
Why Redis
- Redis is usually introduced as a caching service, so most systems will have Redis
- The resource consumption of Redis itself is minimal and meets our lightweight requirements
- Redis is fast and there are few speed bottlenecks
- Redis persistence, adjust the configuration items can be trade-off between data security and speed (see article) [segmentfault.com/a/119000000…].
How to implement
Using redis queue structure to realize message queue. Redis supports a maximum of 2 x 32-1 pieces of data per queue, which is sufficient for most applications.
To put it simply:
- There is a queue for each topic
- Data is written from one segment of the queue and read from the other
- Consumption failed, message is re-queued
Note: The code is for personal use only, do not use in a real production environment
The code is available only in a SpringBoot environment
Start by defining the annotations and interface classes
The comment code is as follows:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface MqConsumer {
/** * queue topic */
String topic(a) default "default_es_topic";
}
Copy the code
The class decorated by this annotation will receive messages under Topic.
The interface code is as follows:
public interface RedisConsumer {
/** * function description: consume method, the consumer class must inherit this method **@paramMessage Data carrier *@author 123
* @date2020/3/28 22:41 * /
void deal(String message);
}
Copy the code
This interface is used for processing methods that are scheduled to receive messages.
Scan the annotation modifier class
This part is divided into core code. First, we need to obtain the annotated class in the code, and then establish a loop to get data from the Redis queue. Finally, we call the deal method of the class object to consume the message.
- Part of the scanned code is as follows:
/** * MqConfiguration.java */
@Override
public void run(ApplicationArguments args) {
Map<String, Object> map = context.getBeansWithAnnotation(MqConsumer.class);
map.values().forEach(item -> {
if(! (iteminstanceof RedisConsumer)) {
log.warn("Notice that class {} annotated by @esConsumer does not implement the RedisConsumer interface.", item.getClass().getCanonicalName());
return;
}
MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);
MqConsumer annotation = annotations[0];
String topic = annotation.topic();
if (topicMap.containsKey(topic)) {
log.error("Multiple consumers {}, consuming the same message :{}, ignored", item.getClass().getCanonicalName(), topic);
} else{ topicMap.put(topic, (RedisConsumer) item); }}); log.info("Redis subscription information summary completed !!!!!!");
// The es queue data is always iterated by a thread
threadPoolExecutor.execute(loop());
}
Copy the code
The run method is called after spring has scanned and is implemented by implementing the ApplicationRunner interface to fetch all classes annotated by the MqConsumer interface through Spring’s methods (otherwise you need to write your own classloaders). After the data is summarized, a thread is used to loop wirelessly to fetch data from the Redis queue.
- Execute thread part of the code as follows:
private Runnable loop(a) {
return() - > {while (true) {
AtomicInteger count = new AtomicInteger(0);
topicMap.forEach((k, v) -> {
try {
String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);
if (message == null) {
count.getAndIncrement();
} else{ pushTask(v, message, k); }}catch (RedisConnectionFailureException connException) {
log.error("Redis cannot connect. Try again 10 seconds later.", connException);
sleep(10);
} catch (Exception e) {
log.error("Redis message queue exception", e); }});if (count.get() == topicMap.keySet().size()) {
// Sleep for 1s when all queues are empty
sleep(1); }}}; }private void pushTask(RedisConsumer item, String value, String key) {
threadPoolExecutor.execute(() -> {
try {
item.deal(value);
} catch (Exception e) {
log.error("Error executing consumption task", e);
// Perform data callback for non-broadcast messagesmqUtil.getRedisTemplate().opsForList().rightPush(key, value); }}); }Copy the code
The loop method takes data from redis based on topic in an infinite loop. If the data is retrieved, the pushTask method is called. If an error is reported, the data will be rewritten.
See the complete code at the end of this article
test
Called after running the project, the interface in MainController can be tested.
Full code: Github
A step-by-step guide to implementing a simple MQ message queue using Redis