This article is for the blogger to study notes, the content comes from the Internet, if there is infringement, please contact delete.
Personal Note: github.com/dbses/TechN…
01 | use KafkaTemplate integrated Kafka
Like JdbcTemplate and RestTemplate, Spring Boot, an integrated framework for rapid development, also provides a set of Template utility classes named -template for message communication.
For Kafka, this utility class is KafkaTemplate.
Send messages using KafkaTemplate
Introducing dependencies:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Copy the code
KafkaTemplate provides a series of send methods to send messages. A typical send method is defined as follows:
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
}
Copy the code
Consume the message using the @kafKalistener annotation
When consuming a message in Kafka, a consumer needs to provide a Listener to listen on a Topic to retrieve the message.
Spring provides a listener for the @kafKalistener annotation, which defines the following code:
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
String id(a) default "";
String containerFactory(a) default "";
/ / message Topic
String[] topics() default {};
//Topic pattern matching expression
String topicPattern(a) default "";
/ / the Topic partition
TopicPartition[] topicPartitions() default {};
String containerGroup(a) default "";
String errorHandler(a) default "";
// Message group Id
String groupId(a) default "";
boolean idIsGroup(a) default true;
String clientIdPrefix(a) default "";
String beanRef(a) default "__listener";
}
Copy the code
When using the @kafkalistener annotation, we can add it directly to the method that handles the message, as shown in the following code:
@ KafkaListener (switchable viewer = "demo. The topic")
public void handlerEvent(DemoEvent event) {
//TODO: adds message processing logic
}
Copy the code
In addition, configuration items for message consumption need to be specified in the consumer’s profile:
spring:
kafka:
bootstrap-servers:
- localhost:9092
template:
default-topic: demo.topic
consumer:
group-id: demo.group
Copy the code
02 | using JmsTemplate integrated ActiveMQ
The JMS specification
The JMS specification provides a core set of interfaces for developers to use, and these interfaces form the client-side API architecture, as shown in the following figure:
JMS specification exists ActiveMQ, WMQ, TIBCO and other third-party implementation methods, among which ActiveMQ is the more mainstream.
For ActiveMQ, there are currently two implementation projects to choose from, one is the classic 5.x version and the other is the next generation Artemis.
Integrate ActiveMQ with JmsTemplate
Introducing dependencies:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
Copy the code
The consumption mode of ActiveMQ includes Push Consumer and Pull Consumer.
Send messages using JmsTemplate
JmsTemplate has a batch of send methods for sending messages, as shown in the following code:
@Override
public void send(MessageCreator messageCreator) throws JmsException {}@Override
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {}@Override
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {}Copy the code
These methods provide a MessageCreator interface for creating message objects. A typical implementation of sending a message through the send method is as follows:
public void sendDemoObject(DemoObject demoObject) {
jmsTemplate.send("demo.queue".new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
returnsession.createObjectMessage(demoObject); }}Copy the code
JmsTemplate also provides a simpler set of methods for sending messages, called convertAndSend methods, as shown in the following code:
public void sendDemoObject(DemoObject demoObject) {
jmsTemplate.convertAndSend("demo.queue", demoObject, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
// Processing for Message
returnmessage; }});Copy the code
Consume messages using JmsTemplate
Let’s first look at how to implement the pull consumption model.
JmsTemplate provides a set of receive methods for pulling messages from Artemis, as shown in the following code:
public Message receive(a) throws JmsException {}public Message receive(Destination destination) throws JmsException {}public Message receive(String destinationName) throws JmsException {}Copy the code
The message consumption method in push mode is shown in the following code:
@ JmsListener (the queues = "demo. Queue")
public void handlerEvent(DemoEvent event) {
//TODO: adds message processing logic
}
Copy the code
03 | use RabbitTemplate integrated the RabbitMQ
The Advanced Message Queuing Protocol (AMQP) is an application-layer standard Advanced Message Queuing specification that provides unified messaging services.
Closer specification
There are three core components in the AMQP specification: Exchange, Queue and Binding.
If there are multiple queues, how does Exchange know which Queue to send messages to?
The message contains a Routing Key, which is generated by the message sender and provided to the Exchange standard for Routing the message. Exchange checks the Routing Key and uses the Routing algorithm to determine which Queue to route the message to.
In the figure above, different routing algorithms have different Exchange types. Direct exchanges, Fanout exchanges, Topic exchanges, and Header exchanges are specified in the AMQP specification.
Use RabbitTemplate to send messages
Lead dependence:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
Configure the RabbitMQ server address, port, user name, and password as follows:
spring:
rabbitmq:
host: 127.0. 01.
port: 5672
username: guest
password: guest
virtual-host: DemoHost
Copy the code
When integrating with the business code, we need to convert the business object to a Message object, as shown in the sample code below:
public void sendDemoObject(DemoObject demoObject) {
MessageConverter converter = rabbitTemplate.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(demoObject, props);
rabbitTemplate.send("demo.queue", message);
}
Copy the code
This can also be done using the RabbitTemplate convertAndSend method group, as shown in the following code:
public void sendDemoObject(DemoObject demoObject) {
rabbitTemplate.convertAndSend("demo.queue", demoObject);
}
Copy the code
Sometimes we need to add some attributes to the message while it is being sent, as shown in the following code:
RabbitTemplate. ConvertAndSend (" demo. Queue ", the event,new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// Processing for Message
returnmessage; }});Copy the code
Consume messages using RabbitTemplate
A typical example of using RabbitTemplate in pull mode is as follows:
public DemoEvent receiveEvent(a) {
return(DemoEvent) rabbitTemplate. ReceiveAndConvert (" demo. Queue "); }Copy the code
The implementation of push mode is also very simple, as shown in the following code:
@RabbitListener(queues = "demo.queue")
public void handlerEvent(DemoEvent event) {
//TODO: adds message processing logic
}
Copy the code