preface
Message queue middleware is an important component in distributed system, which mainly solves application coupling, asynchronous message, traffic cutting and other problems
Achieve high-performance, highly available, scalable and ultimately consistent architectures. RabbitMQ is an open source message that implements the Advanced Message Queuing Protocol (AMQP). It has the advantages of high system throughput, reliability, message persistence and free of charge, and is widely used in software projects.
Project introduction
This project uses Springboot to integrate RabbitMQ, showing you how to design and gracefully integrate RabbitMQ-related components, and implement a dead letter queue to implement delayed message queues.
Project design and practice
configuration
Maven rely on
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1. RELEASE</version>
<relativePath/>
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code
The configuration file
Spring. The rabbitmq. Host = 192.168.202.128 spring. The rabbitmq. Port = 5672 spring. The rabbitmq. Username = guest spring.rabbitmq.password=guestCopy the code
Component design and implementation
Exchange
Define switch name, type, persistence, delayed switch name, and other properties.
public interface IRabbitMqExchange {
/** * Exchange name ** /
String exchangeName(a);
/** * Exchange type DIRECT(" DIRECT "), FANOUT(" FANOUT "), TOPIC(" TOPIC "), HEADERS(" HEADERS ") ** /
default String type(a){return "topic"; }/** * Whether to persist */
default boolean durable(a){return true; }/** * Delete */ when all queues are finished using exchange
default boolean autoDelete(a){return false; }/** * Whether direct binding is allowed * if true, direct binding to the exchange is not allowed */
default boolean internal(a){ return false; }/** * Other parameters */
default Map<String, Object> arguments(a){ return null; }
/** * delay Exchange ** /
default String delayExchangeName(a) {return "delay."+exchangeName();}
}
Copy the code
Routing (Routing)
public interface IRabbitMqRouting {
/** * rabbitmq route key ** /
String routingKey(a);
}
Copy the code
Queue (Queue)
Define attributes such as queue name, persistence, delay queue name, and so on
public interface IRabbitMqQueue {
/** * Queue name */
String queueName(a);
/** * whether to persist ** /
default boolean durable(a) {return true; }/** ** exclusive ** /
default boolean exclusive(a){return false; }/** * Whether to automatically delete ** /
default boolean autoDelete(a){return false; }/**
* 其他属性设置
* */
default Map<String, Object> arguments(a) { return null; }
/** * Default delay queue name ** /
default String delayQueueName(a){return "delay."+this.queueName();}
}
Copy the code
Binding
The Exchange – Routing – message Queue binding is defined and whether delayed messages are supported.
public interface IRabbitMqBinding {
/** * The exchange to be bound ** /
IRabbitMqExchange exchange(a);
/** * The routing to be bound ** /
IRabbitMqRouting routing(a);
/** * The queue to bind ** /
IRabbitMqQueue queue(a);
/** * Whether the message queue allows delay ** /
boolean allowDelay(a);
}
Copy the code
Default registrar
The registration of switch, message queue and binding relationship is realized. If the binding is defined to support delayed messages, an additional delayed switch and a dead letter queue are registered to implement delayed message push.
public class DefaultRabbitMqRegister implements IRabbitMqRegister.SmartLifecycle {
ConnectionFactory connectionFactory;
Channel channel;
public DefaultRabbitMqRegister(a) {}public DefaultRabbitMqRegister(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@PostConstruct
public void init(a) {
channel = connectionFactory.createConnection().createChannel(false);
}
@Override
public void registerExchange(IRabbitMqExchange... exchanges) throws IOException {
for(IRabbitMqExchange exchange : exchanges) { channel.exchangeDeclare(exchange.exchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments()); }}@Override
public void registerQueue(IRabbitMqQueue... queues) throws IOException {
for(IRabbitMqQueue queue : queues) { channel.queueDeclare(queue.queueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), queue.arguments()); }}@Override
public void registerBinding(IRabbitMqBinding... bindings) throws IOException {
for (IRabbitMqBinding binding : bindings) {
channel.queueBind(binding.queue().queueName(), binding.exchange().exchangeName(), binding.routing().routingKey());
if(binding.allowDelay()) { registerDelayBinding(binding); }}}/** * create an internal dead-letter queue to implement the delay queue */
private void registerDelayBinding(IRabbitMqBinding binding) throws IOException {
IRabbitMqExchange exchange = binding.exchange();
// Register a delayed message switch
channel.exchangeDeclare(exchange.delayExchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments());
// Register a dead letter queue and forward the message to the original Router queue
IRabbitMqQueue queue = binding.queue();
Map<String, Object> arguments = queue.arguments();
if (arguments == null) {
arguments = new HashMap<>(4);
}
arguments.put("x-dead-letter-exchange", binding.exchange().exchangeName());
arguments.put("x-dead-letter-routing-key", binding.routing().routingKey());
channel.queueDeclare(queue.delayQueueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), arguments);
// Bind the switch to the queue
channel.queueBind(queue.delayQueueName(), exchange.delayExchangeName(), binding.routing().routingKey());
}
private List<MessageListenerContainer> listenerContainers = new LinkedList<>();
@Override
public void listenerQueue(IRabbitMqListener listener, IRabbitMqQueue... queues) {
String[] queueNames = new String[queues.length];
for (int idx = 0; idx < queues.length; idx++) {
queueNames[idx] = queues[idx].queueName();
}
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// Configure manual confirmation
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(queueNames);
container.setMessageListener(listener);
listenerContainers.add(container);
}
@Override
public void start(a) {
for(MessageListenerContainer container : listenerContainers) { container.start(); }}@Override
public void stop(a) {}@Override
public boolean isRunning(a) {
return false;
}
@Override
public boolean isAutoStartup(a) {
return true;
}
@Override
public void stop(Runnable runnable) {}@Override
public int getPhase(a) {
return 9999; }}Copy the code
Message listener
public interface IRabbitMqListener {
/** * Process rabbitMq messages ** /
boolean handleMessage(Object obj);
}
Copy the code
Abstract implementation class (the concrete consumer inherits this abstract class and overrides the handleMessage() method to implement the consumption logic)
public abstract class AbstractMessageListener implements ChannelAwareMessageListener.IRabbitMqListener {
private Logger logger = LoggerFactory.getLogger(AbstractMessageListener.class);
private MessageConverter messageConverter = new Jackson2JsonMessageConverter();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long tag = message.getMessageProperties().getDeliveryTag();
try {
Object obj = messageConverter.fromMessage(message);
boolean handleResult = handleMessage(obj);
if (handleResult) {
channel.basicAck(tag, false);
} else {
logger.error("Message: {}", message);
channel.basicNack(tag, false.false); }}catch (Exception e) {
channel.basicNack(tag, false.false);
logger.error("Message:" + message + ""+ e.getMessage(), e); }}}Copy the code
Message sending service class
It can send messages and delay messages
public class RabbitMqServiceImpl implements IRabbitMqService.RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(RabbitMqServiceImpl.class);
@Autowired
protected RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(a) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}
@Override
public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId);
}
@Override
public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg, long delay) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
if (delay > 0) {
MessagePostProcessor processor = (Message message) -> {
message.getMessageProperties().setExpiration(delay + "");
return message;
};
rabbitTemplate.convertAndSend(exchange.delayExchangeName(), routing.routingKey(), msg, processor, correlationId);
} else{ rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId); }}/** * Message sent callback **@paramCorrelationId Message Id *@paramIndicates whether the ACK was successful@paramCause Error cause */
@Override
public void confirm(CorrelationData correlationId, boolean ack, String cause) {
if (ack) {
logger.info("Message sent successfully correlationId: {} cause: {}", correlationId, cause);
} else {
logger.error("Failed to send message correlationId: {} cause: {}", correlationId, cause); }}@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("returnedMessage message: {} replyCode: {} exchange: {} routingKey: {}", message, replyCode, exchange, routingKey); }}Copy the code
In actual combat
Use enumerations to define the message queue configuration
Define the test Exchange: mq.exchange.test
/** * RabbitMq Exchange definition ** /
public enum RabbitMqExchange implements IRabbitMqExchange {
MQ_EXCHANGE_TEST("mq.exchange.test");private String exchangeName;
@Override
public String exchangeName(a) {
return this.exchangeName;
}
RabbitMqExchange(String exchangeName){
this.exchangeName = exchangeName; }}Copy the code
Define test Queue:mq.queue.test
public enum RabbitMqQueue implements IRabbitMqQueue {
MQ_QUEUE_TEST("mq.queue.test");
private String queueName;
@Override
public String queueName(a) {
return this.queueName;
}
RabbitMqQueue(String queueName){
this.queueName = queueName; }}Copy the code
Define the test Routing:mq.routing.test
/** * RabbitMq routing (route definition) ** /
public enum RabbitMqRouting implements IRabbitMqRouting {
MQ_ROUTING_TEST("mq.routing.test");
private String routingKey;
@Override
public String routingKey(a) {
return this.routingKey;
}
RabbitMqRouting(String routingKey){
this.routingKey = routingKey; }}Copy the code
Define binding relationships:
/** * RabbitMq Exchange Routing Queue binding ** /
public enum RabbitMqBinding implements IRabbitMqBinding {
MQ_BINDING_TEST(RabbitMqExchange.MQ_EXCHANGE_TEST,RabbitMqRouting.MQ_ROUTING_TEST,RabbitMqQueue.MQ_QUEUE_TEST,true);
/** * exchange */
IRabbitMqExchange exchange;
/** * routing */
IRabbitMqRouting routing;
/** * queue */
IRabbitMqQueue queue;
/** * Whether to allow delay */
boolean allowDelay = false;
RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue){
this.exchange = exchange;
this.routing = routing;
this.queue = queue;
}
RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue,boolean allowDelay){
this.exchange = exchange;
this.routing = routing;
this.queue = queue;
this.allowDelay = allowDelay;
}
@Override
public IRabbitMqExchange exchange(a) {
return this.exchange;
}
@Override
public IRabbitMqRouting routing(a) {
return this.routing;
}
@Override
public IRabbitMqQueue queue(a) {
return this.queue;
}
@Override
public boolean allowDelay(a) {
return this.allowDelay; }}Copy the code
Test Consumer class
public class TestConsumer extends AbstractMessageListener {
Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
public boolean handleMessage(Object obj) {
logger.info("Rabbitmq consumers start spending, message content:" +obj.toString());
return true; }}Copy the code
Start the project
Log in to the RabbitMQ console and you have automatically created switches and delay switches, message queues and dead-letter queues
Test sending a message
@Test
public void testSendMq(a){
logger.info("Producer sends message to MQ");
rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"Test send message");
}
Copy the code
Test sending delay messages (60 seconds)
@Test
public void testSendDelayMq(a){
logger.info("Producer sends delayed message to MQ");
rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"Test sending delayed message 60s".60*1000);
}
Copy the code
The code for
This project contains many instances of wheels, kneel star
Github.com/pengziliu/G…