This article is for the blogger to study notes, the content comes from the Internet, if there is infringement, please contact delete.

Personal Note: github.com/dbses/TechN…

01 | use KafkaTemplate integrated Kafka

Like JdbcTemplate and RestTemplate, Spring Boot, an integrated framework for rapid development, also provides a set of Template utility classes named -template for message communication.

For Kafka, this utility class is KafkaTemplate.

Send messages using KafkaTemplate

Introducing dependencies:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
Copy the code

KafkaTemplate provides a series of send methods to send messages. A typical send method is defined as follows:

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
}
Copy the code

Consume the message using the @kafKalistener annotation

When consuming a message in Kafka, a consumer needs to provide a Listener to listen on a Topic to retrieve the message.

Spring provides a listener for the @kafKalistener annotation, which defines the following code:

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
    String id(a) default "";
    String containerFactory(a) default "";
    / / message Topic
    String[] topics() default {};
    //Topic pattern matching expression
    String topicPattern(a) default "";
	  / / the Topic partition
    TopicPartition[] topicPartitions() default {};
    String containerGroup(a) default "";
    String errorHandler(a) default "";
	  // Message group Id
    String groupId(a) default "";
    boolean idIsGroup(a) default true;
    String clientIdPrefix(a) default "";
    String beanRef(a) default "__listener";
}
Copy the code

When using the @kafkalistener annotation, we can add it directly to the method that handles the message, as shown in the following code:

@ KafkaListener (switchable viewer = "demo. The topic")
public void handlerEvent(DemoEvent event) {
    //TODO: adds message processing logic
}
Copy the code

In addition, configuration items for message consumption need to be specified in the consumer’s profile:

spring:      
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: demo.topic
    consumer:
      group-id: demo.group
Copy the code

02 | using JmsTemplate integrated ActiveMQ

The JMS specification

The JMS specification provides a core set of interfaces for developers to use, and these interfaces form the client-side API architecture, as shown in the following figure:

JMS specification exists ActiveMQ, WMQ, TIBCO and other third-party implementation methods, among which ActiveMQ is the more mainstream.

For ActiveMQ, there are currently two implementation projects to choose from, one is the classic 5.x version and the other is the next generation Artemis.

Integrate ActiveMQ with JmsTemplate

Introducing dependencies:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
Copy the code

The consumption mode of ActiveMQ includes Push Consumer and Pull Consumer.

Send messages using JmsTemplate

JmsTemplate has a batch of send methods for sending messages, as shown in the following code:

@Override
public void send(MessageCreator messageCreator) throws JmsException {}@Override
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {}@Override
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {}Copy the code

These methods provide a MessageCreator interface for creating message objects. A typical implementation of sending a message through the send method is as follows:

public void sendDemoObject(DemoObject demoObject) { 
    jmsTemplate.send("demo.queue".new MessageCreator() { 
        @Override 
        public Message createMessage(Session session) throws JMSException { 
	          returnsession.createObjectMessage(demoObject); }}Copy the code

JmsTemplate also provides a simpler set of methods for sending messages, called convertAndSend methods, as shown in the following code:

public void sendDemoObject(DemoObject demoObject) {
    jmsTemplate.convertAndSend("demo.queue", demoObject, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws JMSException {
	          // Processing for Message
            returnmessage; }});Copy the code

Consume messages using JmsTemplate

Let’s first look at how to implement the pull consumption model.

JmsTemplate provides a set of receive methods for pulling messages from Artemis, as shown in the following code:

public Message receive(a) throws JmsException {}public Message receive(Destination destination) throws JmsException {}public Message receive(String destinationName) throws JmsException {}Copy the code

The message consumption method in push mode is shown in the following code:

@ JmsListener (the queues = "demo. Queue")
public void handlerEvent(DemoEvent event) {
    //TODO: adds message processing logic
}
Copy the code

03 | use RabbitTemplate integrated the RabbitMQ

The Advanced Message Queuing Protocol (AMQP) is an application-layer standard Advanced Message Queuing specification that provides unified messaging services.

Closer specification

There are three core components in the AMQP specification: Exchange, Queue and Binding.

If there are multiple queues, how does Exchange know which Queue to send messages to?

The message contains a Routing Key, which is generated by the message sender and provided to the Exchange standard for Routing the message. Exchange checks the Routing Key and uses the Routing algorithm to determine which Queue to route the message to.

In the figure above, different routing algorithms have different Exchange types. Direct exchanges, Fanout exchanges, Topic exchanges, and Header exchanges are specified in the AMQP specification.

Use RabbitTemplate to send messages

Lead dependence:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

Configure the RabbitMQ server address, port, user name, and password as follows:

spring:
  rabbitmq:
    host: 127.0. 01.
    port: 5672
    username: guest
    password: guest
    virtual-host: DemoHost
Copy the code

When integrating with the business code, we need to convert the business object to a Message object, as shown in the sample code below:

public void sendDemoObject(DemoObject demoObject) {
    MessageConverter converter = rabbitTemplate.getMessageConverter();
    MessageProperties props = new MessageProperties();
    Message message = converter.toMessage(demoObject, props);
    rabbitTemplate.send("demo.queue", message);
}
Copy the code

This can also be done using the RabbitTemplate convertAndSend method group, as shown in the following code:

public void sendDemoObject(DemoObject demoObject) { 
    rabbitTemplate.convertAndSend("demo.queue", demoObject); 
}
Copy the code

Sometimes we need to add some attributes to the message while it is being sent, as shown in the following code:

RabbitTemplate. ConvertAndSend (" demo. Queue ", the event,new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        // Processing for Message
        returnmessage; }});Copy the code

Consume messages using RabbitTemplate

A typical example of using RabbitTemplate in pull mode is as follows:

public DemoEvent receiveEvent(a) {
    return(DemoEvent) rabbitTemplate. ReceiveAndConvert (" demo. Queue "); }Copy the code

The implementation of push mode is also very simple, as shown in the following code:

@RabbitListener(queues = "demo.queue")
public void handlerEvent(DemoEvent event) {
    //TODO: adds message processing logic
}
Copy the code