【MQ series 】SprigBoot + RabbitMq message sending basic position

The first two posts covered the core of RabbitMq and the demo application with SpringBoot. How to play RabbitMQ in SpringBoot

This article focuses on message sending, including the following points

  • RabbitTemplateBasic usage posture for sending messages
  • Customize message base properties
  • Custom message converterAbstractMessageConverter
  • Case of the failure to send an Object message

I. Basic posture

1. The configuration

We use SpringBoot 2.2.1.RELEASE + RabbitMQ 3.7.5 to build and test the whole project

The project POM.xml is as follows

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

The contents of the application.yml configuration file are as follows

spring:
  rabbitmq:
    virtual-host: /
    username: admin
    password: admin
    port: 5672
    host: 127.0. 01.
Copy the code

2. The configuration class

From rabbitMQ we can see the main logic of sending a message to an Exchange and then to a queue according to different policies.

This post focuses on sending messages. For the purposes of the following example, we define a Topic exchange and bind a queue. (For the sender side, different exchange types do not have much influence on the use posture of the sender side, but the consumers do)

public class MqConstants {

    public static final String exchange = "topic.e";

    public static final String routing = "r";

    public final static String queue = "topic.a";

}

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(a) {
        return new TopicExchange(MqConstants.exchange);
    }

    @Bean
    public Queue queue(a) {
        // Create a persistent queue
        return new Queue(MqConstants.queue, true);
    }

    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return newRabbitTemplate(connectionFactory); }}Copy the code

3. Send messages

The RabbitTemplate#convertAndSend method is used to send messages

@Service
public class BasicPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /** * Push message **@param ans
     * @return* /
    private String publish2mq1(String ans) {
        String msg = "Durable msg = " + ans;
        System.out.println("publish: " + msg);
        rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        returnmsg; }}Copy the code

Emphasis is a line above rabbitTemplate. ConvertAndSend (MqConstants. Exchange, MqConstants routing, MSG);

  • Represents sending the MSG to the specified Exchange and setting the routing key for the message

Please note that

In the above way, the message sent is persistent by default. When the persistent message is sent to the persistent queue, the message will be dropped.

In some cases, we are less stringent about message integrity and more concerned about MQ performance, and some data loss is acceptable; At this point we may need to customize the attributes of the sent message (such as making the message non-persistent)

Here are two postures, with the second recommended

/** * push a non-persistent message. When pushed to the persistent queue, MQ restarts and the message is lost; The persistent message above is not lost * *@param ans
 * @return* /
private String publish2mq2(String ans) {
    MessageProperties properties = new MessageProperties();
    properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties);

    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message);

    System.out.println("publish: " + message);
    return message.toString();
}


private String publish2mq3(String ans) {
    String msg = "Define msg = " + ans;
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setHeader("ta"."Test");
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            returnmessage; }});return msg;
}
Copy the code

Pay attention to

  • In actual project development, it is recommendedMessagePostProcessorTo customize message properties
  • Second, it is not recommended to create one every time a message is sentMessagePostProcessorObject, please define a generic object, can reuse reuse

4. The non-serialized object sends an exception case

From the rabbitTemplate#convertAndSend interface definition, we know that messages sent can be of type Object, so does that mean that any Object can be pushed to mq?

Here is a test case

private String publish2mq4(String ans) {
    NonSerDO nonSerDO = new NonSerDO(18, ans);
    System.out.println("publish: " + nonSerDO);
    rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
    return nonSerDO.toString();
}


@Data
public static class NonSerDO {
    private Integer age;
    private String name;

    public NonSerDO(int age, String name) {
        this.age = age;
        this.name = name; }}Copy the code

When we call the publish2MQ4 method above, we do not expect direct success, but instead throw a parameter type exception

Why is this a problem? From stack analysis, we know that RabbitTemplate uses SimpleMessageConverter to encapsulate Message logic by default

/ / the following code from org. Springframework. Closer. Support. The converter. The SimpleMessageConverter# createMessage
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
	byte[] bytes = null;
	if (object instanceof byte[]) {
		bytes = (byte[]) object;
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
	}
	else if (object instanceof String) {
		try {
			bytes = ((String) object).getBytes(this.defaultCharset);
		}
		catch (UnsupportedEncodingException e) {
			throw new MessageConversionException(
					"failed to convert to Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		messageProperties.setContentEncoding(this.defaultCharset);
	}
	else if (object instanceof Serializable) {
		try {
			bytes = SerializationUtils.serialize(object);
		}
		catch (IllegalArgumentException e) {
			throw new MessageConversionException(
					"failed to convert to serialized Message content", e);
		}
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
	}
	if(bytes ! =null) {
		messageProperties.setContentLength(bytes.length);
		return new Message(bytes, messageProperties);
	}
	throw new IllegalArgumentException(getClass().getSimpleName()
			+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
Copy the code

The above logic explicitly states that only byte arrays, string strings, and serializable objects are accepted.

  • So we pass an exception that an unserialized object will take an invalid argument

Naturally, we wonder if there is any other MessageConverter to support friendly objects of any kind

5. Customize MessageConverter

Next we want to solve this problem by customizing a Json serialized MessageConverter

A simpler implementation (serialization/deserialization using FastJson)

public static class SelfConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        messageProperties.setContentType("application/json");
        return new Message(JSON.toJSONBytes(object), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        returnJSON.parse(message.getBody()); }}Copy the code

Redefine a rabbitTemplate and set its message converter to a custom SelfConverter

@Bean
public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new SelfConverter());
    return rabbitTemplate;
}
Copy the code

Then test it again

@Service
public class JsonPublisher {
    @Autowired
    private RabbitTemplate jsonRabbitTemplate;
      
    private String publish1(String ans) {
        Map<String, Object> msg = new HashMap<>(8);
        msg.put("msg", ans);
        msg.put("type"."json");
        msg.put("version".123);
        System.out.println("publish: " + msg);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
        return msg.toString();
    }

    private String publish2(String ans) {
        BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18."SELF_JSON" + ans);
        System.out.println("publish: " + nonSerDO);
        jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO);
        returnnonSerDO.toString(); }}Copy the code

The push messages received within MQ are as follows

6. Jackson2JsonMessageConverter

Although the above implementation of Json format message conversion, but relatively simple; And so the function of the basic general, according to the Spring buckets of consistent style of work, must be readily available, yes, this is Jackson2JsonMessageConverter

So our posture can also be as follows

/ / define RabbitTemplate
@Bean
public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}


// Test the code
@Autowired
private RabbitTemplate jacksonRabbitTemplate;
private String publish3(String ans) {
    Map<String, Object> msg = new HashMap<>(8);
    msg.put("msg", ans);
    msg.put("type"."jackson");
    msg.put("version".456);
    System.out.println("publish: " + msg);
    jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
    return msg.toString();
}
Copy the code

Here is what the message serialized by Jackson looks like, a little different from our custom, with headers and content_encoding

7. Summary

The main points of this post are as follows

  • throughRabbitTemplate#convertAndSendTo implement message distribution
  • throughMessagePostProcessorAttributes from the defined message (note that messages are persistent when delivered by default)
  • The default message encapsulation class isSimpleMessageConverter, only supports distribution of byte arrays, strings, and serializable objects; Method calls that do not meet the above three conditions throw exceptions
  • We can do that by implementingMessageConverterInterface to define its own message encapsulation class to solve the above problem

To ensure that messages are properly received by brocker, both message confirmation and transaction cases are provided. What should message producers do if these two methods are used?

Limited by space, the next post will cover the usage of sending messages in the context of message acknowledgement/transaction mechanisms

II. The other

Series of blog posts & project source code

Series of blog posts

  • 【MQ series 】 Springboot + RabbitMQ
  • RabbitMq Series RabbitMq

Program source code

  • Project: github.com/liuyueyi/sp…
  • Source: github.com/liuyueyi/sp…

1. An ashy Blog

As far as the letter is not as good, the above content is purely one’s opinion, due to the limited personal ability, it is inevitable that there are omissions and mistakes, if you find bugs or have better suggestions, welcome criticism and correction, don’t hesitate to appreciate

Below a gray personal blog, record all the study and work of the blog, welcome everyone to go to stroll

  • A grey Blog Personal Blog blog.hhui.top
  • A Grey Blog-Spring feature Blog Spring.hhui.top