A Listener listens for changes in a program and responds to them. We often use various Listener annotations when implementing MQ consumers with SpringBoot.
@rocketmqMessagelistener used by RecketMQ consumers to listen for messages in RocketMQ @kafkalistener used by Kafka consumers to listen for new messages in Kafka
This example is to implement a simple SpringBoot Listener annotation, aimed at explaining how to implement a Listener annotation in SpringBoot, the implementation of Listener annotation generally have the following steps:
- Define Listener annotations;
- Define a message processing interface (modified by a Listener for processing upon receiving a message);
- Scan for objects modified by the Listener and save them.
- Upon receiving the message, the message processing interface is invoked.
- Add the logic of step 3 to the SpringBoot Bean(make sure that this logic can be executed when SpringBoot starts because it is the entry logic that implements the Listener)
1. The implementation
The directory structure is as follows:
1.1 Defining Listener annotations
We need to define the Listener annotation and define its properties:
- @target (elementtype.type) defines annotations used to describe classes, interfaces (including annotation types), or enum declarations
- @Retention(retentionPolicy.runtime) defines annotations to be valid at RUNTIME (i.e.
Java meta-annotations for details, see Java meta-annotations
package com.omg.starter.my.annotation;
import java.lang.annotation.*;
/ * * *@Description: Define annotations *@Author omg
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyTestListener {
/ * * *@Description: Listener Description */
String topic(a);
/ * * *@Description: Message filtering rule */
String filter(a);
}
Copy the code
1.2 Defining the MessageHandler interface
This interface is used to specify that a class decorated with a Listener should integrate with the MessageHandler so that the object (the object decorated with @Listener) can be called when a new event is received, otherwise the message will be received without knowing how to notify (what method to call? What parameters are passed? Return what? .
package com.omg.starter.my.core;
/ * * *@Description: Defines the interface for message processing *@Author omg
*/
public interface MessageHandler {
/ * * *@Description: message handler */
void onMessage(String message);
}
Copy the code
1.3 Scan for objects modified by the Listener and save them
1.3.1 Implements saving objects decorated with @Listener
package com.omg.starter.my.core; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Description: @author omg */ public class MessageHandlerRouter {public class MessageHandlerRouter { private static Map<String, MessageHandler> routesMap = new HashMap<>(); private static Map<String, MessageContainer> containerMap = new HashMap<>(); public static void putHandler(String topic, MessageHandler handler){ if (routesMap.get(topic) ! = null) { throw new IllegalStateException("multi handler destination is " + topic); } else { routesMap.put(topic, handler); } } public static MessageHandler getHandler(String topic){ return routesMap.get(topic); } public static void putContainer(String topic, MessageContainer container){ if (containerMap.get(topic) ! = null) { throw new IllegalStateException("multi handler destination is " + topic); } else { containerMap.put(topic, container); } } public static Map<String, MessageContainer> getAllContainer(){ return containerMap; }}Copy the code
1.3.2 Scan for objects modified by the Listener and save them
Pre-knowledge:
- ApplicationContextAware: Spring calls ApplicationContextAware
setApplicationContext
Method to set the context object environment. - SmartInitializingSingleton: after implement SmartInitializingSingleton interface, when all the singleton beans initialization is complete, the Spring IOC container will callback of the interface
afterSingletonsInstantiated()
Methods. - DisposableBean: The function of this interface is to allow a callback to be taken while the container destroys the bean. The DisposableBean interface also specifies only one method: destroy
package com.omg.starter.my.properties; import com.omg.starter.my.annotation.MyTestListener; import com.omg.starter.my.core.MessageContainer; import com.omg.starter.my.core.MessageHandler; import com.omg.starter.my.core.MessageHandlerRouter; import org.springframework.aop.framework.AopProxyUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import java.util.Map; import java.util.Objects; /** * @description: core logic * 1. Start the listener (start message producers) * 3. The release resources * @ when closing the Author omg * / public class MyStarterListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton , DisposableBean { private ConfigurableApplicationContext applicationContext; /** * @Description: Call this function after the completion of the Bean is initialized, execute custom logic. * to scan within the function to the custom Annotation, and perform the relevant logic * / @ Override public void afterSingletonsInstantiated () { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(MyTestListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); }} /** * @description: * 1. Parse the custom Annotation * 2. Add Annotation and topic mapping * 3. Start the message listener (in this case, MessageContainer) */ private void registerContainer(String beanName, Object bean) {Class<? > clazz = AopProxyUtils.ultimateTargetClass(bean); if (! MessageHandler.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + MyTestListener.class.getName()); } MyTestListener annotation = clazz.getAnnotation(MyTestListener.class); String topic = annotation.topic(); String filter = annotation.filter(); MessageHandlerRouter.putHandler(topic, (MessageHandler) bean); new MessageContainer(topic, filter).start(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } /** * @Description: */ @override public void destroy() throws Exception { MessageHandlerRouter.getAllContainer().values().forEach(MessageContainer::close); }}Copy the code
1.4 After receiving a message, invoke the message processing interface
In this example, to simulate receiving a message, the implementation starts a separate thread, produces one message per second, and calls MessageHandler to process the message.
package com.omg.starter.my.core;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/ * * *@Description: Emulated the receipt of a Message, notifying the Message Handler * Later Message filtering can be implemented here if needed *@author omg
*/
public class MessageContainer extends Thread{
// The topic property of the @Listener annotation, which defines the type of subscription message
private String topic;
// @listener filter, used to define message filtering rules
private String filter;
public MessageContainer(String topic, String filter) {
this.topic = topic;
this.filter = filter;
}
@Override
public void run(a) {
while (true) {// Get the current time (simulated message received)
String time = LocalDateTime.now().toString();
// Message filtering logic can be implemented here
if(filter ! =null) {// todo message filtering logic
}
// Get MessageHandler for topic
MessageHandler messageHandler = MessageHandlerRouter.getHandler(topic);
// Invoke message processing
messageHandler.onMessage(topic + ":" + time);
try {
sleep(1000);
} catch(InterruptedException e) { e.printStackTrace(); }}}public void close(a){
// Freeing resources can be done here, such as closing links, etc}}Copy the code
1.5 Add the logic in Step 3 to the SpringBoot Bean
This requires pre-implementation of SpringBootStarter(2) : to achieve a simple SpringBootStarter
package com.omg.starter.my.properties; import com.omg.starter.my.service.MyStarterService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; /** * @description: Spring Boot Auto Configuration * @author: omg */ @Configuration @ConditionalOnProperty(prefix = "my", name = "enable", havingValue = "true") @EnableConfigurationProperties(MyStarterProperties.class) // !!!! This line of code implements the logic from step 3 into the SpringBoot Bean //!!!! Make sure that the logic can be executed in SpringBoot startup, because the logic to achieve the entrance of the Listener logic @ Import (MyStarterListenerContainerConfiguration. Class) public class MyStarterAutoConfiguration { @Autowired private MyStarterProperties properties; @Bean public MyStarterService myStarterService(){ return new MyStarterService(properties); }}Copy the code
2. Test and verify
package com.hunliji.essync.listener; import com.omg.starter.my.annotation.MyTestListener; import com.omg.starter.my.core.MessageHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * @Description: @author omg */ @slf4j @Component @myTestListener (topic = "MyListenerTest", filter = "filter") public class MyTestMessageListener implements MessageHandler { @Override public void onMessage(String message) { log.info(message); }}Copy the code