In this article, we begin our practical journey of Spring AMQP project.
Send email to blog.720uenterp and choose Spring AMQP to install RabbitMQ
introduce
Through this project you will learn how to use Spring Boot to integrate Spring AMQP and send emails using RabbitMQ’s message queue mechanism. The message producer is responsible for sending the user’s mail message to the message queue, and the message consumer gets the mail message from the message queue for sending. This process, you can think of as the post office: when the mail you’re about to post is in the mailbox, you can be sure that the mailman will eventually deliver it to the recipient.
To prepare
This tutorial assumes RabbitMQ is already installed and running on localhost on standard port 5672. If you use different hosts, ports, connection Settings will need to be adjusted.
host = localhost
username = guest
password = guest
port = 5672
vhost = /Copy the code
Field trip
The preparatory work
This tutorial will build two projects: email-server-producer and email-server-consumer. Email-server-producer is a message producer project, and email-server-consumer is a message consumer project.
At the end of the tutorial, I’ll submit the complete code to Github, so you can read the tutorial with the source code and get better results.
Now begin the journey. We used Spring Boot to integrate Spring AMQP and built dependencies through Maven. (Due to space issues, I won’t paste the full pop.xml configuration, you can check out the full configuration file at Github source.)
<dependencies>
<! -- spring boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>${javax.mail.version}</version>
</dependency>
</dependencies>Copy the code
Building message Producers
We configure message producers using Java Config.
@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("mq.host").trim());
connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
connectionFactory.setUsername(env.getProperty("mq.username").trim());
connectionFactory.setPassword(env.getProperty("mq.password").trim());
return connectionFactory;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
return new CachingConnectionFactory(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() throws Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public AmqpAdmin amqpAdmin() throws Exception {
return new RabbitAdmin(cachingConnectionFactory());
}
@Bean
Queue queue() {
String name = env.getProperty("mq.queue").trim();
// Whether to persist
boolean durable = StringUtils.isNotBlank(env.getProperty("mq.queue.durable").trim())?
Boolean.valueOf(env.getProperty("mq.queue.durable").trim()) : true;
// A private queue that can only be used by the creator
boolean exclusive = StringUtils.isNotBlank(env.getProperty("mq.queue.exclusive").trim())?
Boolean.valueOf(env.getProperty("mq.queue.exclusive").trim()) : false;
// Whether the queue is automatically deleted when all consumer clients are disconnected
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.queue.autoDelete").trim())?
Boolean.valueOf(env.getProperty("mq.queue.autoDelete").trim()) : false;
return new Queue(name, durable, exclusive, autoDelete);
}
@Bean
TopicExchange exchange() {
String name = env.getProperty("mq.exchange").trim();
// Whether to persist
boolean durable = StringUtils.isNotBlank(env.getProperty("mq.exchange.durable").trim())?
Boolean.valueOf(env.getProperty("mq.exchange.durable").trim()) : true;
// Whether the queue is automatically deleted when all consumer clients are disconnected
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.exchange.autoDelete").trim())?
Boolean.valueOf(env.getProperty("mq.exchange.autoDelete").trim()) : false;
return new TopicExchange(name, durable, autoDelete);
}
@Bean
Binding binding() {
String routekey = env.getProperty("mq.routekey").trim();
returnBindingBuilder.bind(queue()).to(exchange()).with(routekey); }}Copy the code
Queues, switches, and bindings are defined. In fact, this way Spring AMQP automatically creates a queue or exchange when it doesn’t exist. If you do not want to create them automatically, you can enable queues and switches in the RabbitMQ admin background and comment out the queue() and exchange() methods. In addition, we have removed the configuration information for creating queues or switches from the configuration file application.properties for better scaling. RabbitMQ configuration information is also included.
mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/
mq.exchange=email_exchange
mq.exchange.durable=true
mq.exchange.autoDelete=false
mq.queue=email_queue
mq.queue.durable=true
mq.queue.exclusive=false
mq.queue.autoDelete=false
mq.routekey=email_routekeyCopy the code
In addition, suppose a producer sends to a switch and a consumer receives messages from a queue. At this point, binding queues to the exchange is critical to connecting these producers and consumers. In Spring AMQP, we define a Binding class to represent these connections. We use BindingBuilder to build a “streaming API” style.
BindingBuilder.bind(queue()).to(exchange()).with(routekey);Copy the code
Now that we’re close to the end, we need to define a way to put the sending mail task into the message queue. At this point, in order to better extend, we define an interface and an implementation class, programming based on the interface.
public interface EmailService {
/** * The sending task stores the message queue * @param message * @throws Exception */
void sendEmail(String message) throws Exception;
}Copy the code
Its implementation class overrides the sendEmail() method to transcode the message and write it to the message queue.
@Service
public class EmailServiceImpl implements EmailService{
private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class);
@Resource( name = "rabbitTemplate" )
private RabbitTemplate rabbitTemplate;
@Value("${mq.exchange}")
private String exchange;
@Value("${mq.routekey}")
private String routeKey;
@Override
public void sendEmail(String message) throws Exception {
try {
rabbitTemplate.convertAndSend(exchange, routeKey, message);
}catch (Exception e){
logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e)); }}}Copy the code
So, let’s simulate another RESTful API call scenario to simulate a real scenario.
@RestController()
@RequestMapping(value = "/v1/emails")
public class EmailController {
@Resource
private EmailService emailService;
@RequestMapping(method = RequestMethod.POST)
public JSONObject add(@RequestBody JSONObject jsonObject) throws Exception {
emailService.sendEmail(jsonObject.toJSONString());
returnjsonObject; }}Copy the code
Finally, write a main method to get the Spring Boot service up and running.
@RestController
@EnableAutoConfiguration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class WebMain {
public static void main(String[] args) throws Exception { SpringApplication.run(WebMain.class, args); }}Copy the code
At this point, we’re done. We can send an HTTP request via Postman. (Postman is a powerful Chrome plugin for debugging and sending HTTP requests to web pages.)
{
"to":"[email protected]"."subject":"email-server-producer"."text":"< HTML > Mail test
Hello! This is mail test.
"
}Copy the code
See the illustration.
Take a look at RabbitMQ’s administrative background, where an unprocessed message appears. (Address: http://localhost:15672/#/queues)
And, just be sure not to send test messages to my E-mail inbox, which will remain “buoyed” by mail.
Building message consumers
After the message producer is complete, let’s build a message consumer project. Similarly, message consumers are configured using Java Config.
@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("mq.host").trim());
connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
connectionFactory.setUsername(env.getProperty("mq.username").trim());
connectionFactory.setPassword(env.getProperty("mq.password").trim());
return connectionFactory;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
return new CachingConnectionFactory(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() throws Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public AmqpAdmin amqpAdmin() throws Exception {
return new RabbitAdmin(cachingConnectionFactory());
}
@Bean
public SimpleMessageListenerContainer listenerContainer(
@Qualifier("mailMessageListenerAdapter") MailMessageListenerAdapter mailMessageListenerAdapter) throws Exception {
String queueName = env.getProperty("mq.queue").trim();
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(cachingConnectionFactory());
simpleMessageListenerContainer.setQueueNames(queueName);
simpleMessageListenerContainer.setMessageListener(mailMessageListenerAdapter);
// Set the manual ACK
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
returnsimpleMessageListenerContainer; }}Copy the code
You’re smart enough to see the difference. There is an additional listenerContainer() method in this code. Yes, it is a listener container that listens to message queues for message processing. Notice how we set up the manual ACK. By default, it uses auto reply, in which the message queue deletes the message from the message queue immediately after it is sent. If the consumer does not send an ACK due to an outage or connection failure, RabbitMQ will re-send the message to the next consumer listening on the queue to ensure the reliability of the message.
Of course, we also define the application.properties configuration file.
mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/
mq.queue=email_queueCopy the code
In addition, we created a MailMessageListenerAdapter class to consume news.
@Component("mailMessageListenerAdapter")
public class MailMessageListenerAdapter extends MessageListenerAdapter {
@Resource
private JavaMailSender mailSender;
@Value("${mail.username}")
private String mailUsername;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// Parse the RabbitMQ message body
String messageBody = new String(message.getBody());
MailMessageModel mailMessageModel = JSONObject.toJavaObject(JSONObject.parseObject(messageBody), MailMessageModel.class);
// Send an email
String to = mailMessageModel.getTo();
String subject = mailMessageModel.getSubject();
String text = mailMessageModel.getText();
sendHtmlMail(to, subject, text);
/ / ACK manually
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){ e.printStackTrace(); }}/** * Send email * @param to * @param subject * @param text * @throws Exception */
private void sendHtmlMail(String to, String subject, String text) throws Exception {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage);
mimeMessageHelper.setFrom(mailUsername);
mimeMessageHelper.setTo(to);
mimeMessageHelper.setSubject(subject);
mimeMessageHelper.setText(text, true);
// Send an emailmailSender.send(mimeMessage); }}Copy the code
In the onMessage() method, we do three things:
- Parse the message body from the RabbitMQ message queue.
- Sends an email to the target mailbox based on the message body.
- Manually acknowledge the ACK and let the message queue delete the message.
Here, the jsonObject.toJavaObject () method uses FastJSON to convert the JSON string into the entity object MailMessageModel. Note that @data is an annotation to the Lombok class library.
@Data
public class MailMessageModel {
@JSONField(name = "from")
private String from;
@JSONField(name = "to")
private String to;
@JSONField(name = "subject")
private String subject;
@JSONField(name = "text")
private String text;
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Email{from:").append(this.from).append(",");
sb.append("to:").append(this.to).append(",");
sb.append("subject:").append(this.subject).append(",");
sb.append("text:").append(this.text).append("}");
returnsb.toString(); }}Copy the code
Spring has good support for Java Mail. There are several types of messages: simple text messages, HTML text messages, embedded images messages, messages containing attachments. Here, we’ve wrapped a simple sendHtmlMail() to send the mail.
Oh, and we’re missing a mail configuration class.
@Configuration
@PropertySource(value = {"classpath:mail.properties"})
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class EmailConfig {
@Autowired
private Environment env;
@Bean(name = "mailSender")
public JavaMailSender mailSender() {
// Create a Mail sender, which mainly provides the Mail sending interface, transparent creation of Java Mail MimeMessage, and Mail sending configuration
JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
// If the email address is a common email address, for example, non-SSL authentication
mailSender.setHost(env.getProperty("mail.host").trim());
mailSender.setPort(Integer.parseInt(env.getProperty("mail.port").trim()));
mailSender.setUsername(env.getProperty("mail.username").trim());
mailSender.setPassword(env.getProperty("mail.password").trim());
mailSender.setDefaultEncoding("utf-8");
// Configure the mail server
Properties props = new Properties();
// Ask the server to authenticate the user name and password
props.put("mail.smtp.auth"."true");
props.put("mail.smtp.timeout"."25000");
mailSender.setJavaMailProperties(props);
returnmailSender; }}Copy the code
This configuration information is maintained in the configuration file mail.properties.
mail.host=smtp163..com
mail.port=25Mail. username= username mail.password= passwordCopy the code
Finally, let’s write a main method to get the Spring Boot service running.
At this point, we have also completed a message consumer project that will continuously process mail messages from the message queue.
The source code
Related examples Complete code: github.com/lianggzone/…
(after)
More wonderful articles, all in the “server-side thinking” wechat public account!