Message queue middleware is an important component in distributed system, which mainly solves application coupling, asynchronous message, traffic cutting and other problems. An architecture that helps achieve high performance, high availability, scalability and ultimate consistency
In addition to ActiveMQ, RabbitMQ, RocketMQ, ZeroMQ, Kafka, there are many other competitors in message queues. In this article we will not explain the differences between them, but will only take a detailed look at ActiveMQ and its use in.NET
Application scenarios of message queues
Asynchronous tasks
For example, in the following scenarios: Nowadays, many websites or apps use the verification code mechanism when registering. Therefore, when the server receives a request from the client to obtain the verification code, the server performs the following operations
- Send SMS immediately in current thread (blocks current thread for a short while)
- Create a new thread to send SMS messages (just create a Task in.NET)
- Delegate the task to another service (forward to message queue, let message queue handle)
So, which of these approaches is better?
- The first one is that the real time is definitely better. The request is processed immediately after receiving it, but it blocks the current thread, causing requests from other clients to be blocked (we may not feel it at all when there are few requests).
- Second: create a thread in the current process to process, not as real-time as the first, but it does not block requests from other clients. However, the number of threads that can be created in a process is limited, so there are bottlenecks
- Third: use other scenarios-specific services, which have the worst real-time performance (but if the server is configured, we may not feel the difference), but which are the most used and have the best performance when they go online (stability, scalability).
Therefore, if the version is officially launched (such as the version used to verify the market at the beginning of the project, it is often for the sake of speed rather than architecture, so the first or second solution may be chosen) and the service with high peak value, the third solution is undoubtedly the best. Because stability is very important for online services
For tasks such as sending SMS messages, which are not so real-time, using message queues is quite appropriate. The main service does not need to interfere with the specific task of sending an SMS after handing it over to the message queue. If required, the processing results of the master service subscription task (send successfully or fail) are sufficient. In this way, the master service can continue to handle requests from other clients, and with message queues participating, the master service is less stressed
Of course, there are many more scenarios like this in real projects, such as logging, and as we all know, writing files (disk I/O) is time-consuming. As a result, many large services now have dedicated log servers to handle logs sent by other servers. Kafka can be used to do this (because it is designed to handle logs).
Message service
For example, in today’s microservices and distributed clusters, the communication between nodes can be handled by message queues. What are the specific ways to use more scenarios from the following two options
- P2P (Point to Point) point-to-point mode
- Publish/Subscribe(Pub/Sub) Publish/Subscribe mode
These two models will be explained in detail later in the case studies
ActiveMQ
ActiveMQ is the most popular and powerful open source message bus produced by Apache. ActiveMQ is a JMS Provider implementation that fully supports JMS1.1 and J2EE 1.4 specifications. Although JMS specifications have been issued for a long time, JMS still plays a special role in today’s J2EE applications. In addition, it is used in many large websites or services
It has the following characteristics
- Multiple languages and protocols to write client languages: Java, C, C++, C#, Ruby, Perl, Python, PHP; Application protocols: OpenWire, Stomp REST, WS Notification, XMPP, AMQP
- Full support for JMS1.1 and J2EE 1.4 specifications (persistence, XA messaging, transactions)
- With Spring support, ActiveMQ can be easily embedded into systems that use Spring and also supports the latest Spring features
- Having passed the tests of common J2EE servers such as Geronimo, JBoss 4, GlassFish, WebLogic, with JCA 1.5 Resource Adaptors configured, ActiveMQ can be automatically deployed to any J2EE 1.4 compliant commercial server
- Supports multiple transport protocols: IN-VM, TCP, SSL, NIO, UDP, JGroups, and JXTA
- Support for high-speed message persistence through JDBC and Journal
- It is designed to ensure high performance clustering, client-server and point-to-point communication
- Support for Ajax
- Support integration with Axis
- You can easily call the embedded JMS provider for testing
The advantage of it
- Stability: failure reconnection mechanism, persistence service, fault tolerance mechanism, multiple recovery mechanism
- High efficiency: supports a variety of transport protocols such as TCP, SSL, NIO, UDP, etc., forwarding cluster messages between multiple agents to prevent message loss, supports super fast JDBC message persistence and efficient logging system
- Extensible: Advanced features of ActiveMQ can be expressed in the form of configuration. It is well implemented, such as cursors, fault tolerance mechanism, message group and monitoring service. Meanwhile, it extends many mature frameworks
- Advanced features: Message Groups, Virtual Destinations, Wildcards, Composite Destinations
ActiveMQ installation configuration on Windows
There are plenty of tutorials on the web, but we won’t provide them here, just some mobile-friendly links to help you install the configuration
- www.cnblogs.com/yangw/p/591…
- www.cnblogs.com/chy123/p/87…
- www.cnblogs.com/donsenChen/…
- Blog.csdn.net/j080624/art…
ActiveMQ in C#
First of all, you need to download the.net driver from the Apache website, which can also be downloaded from the following link
Mirrors.hust.edu.cn/apache/acti…
To use ActiveMQ in your project, you need to import two DLL files from the package downloaded above: Apache.nms.Activemq.dll and apache.nms.dll
P2P model cases
P2P mode consists of three roles: Queue, Sender, and Receiver. Each message is sent to a specific queue from which the receiver retrieves the message. Queues hold messages until they are consumed or timed out
P2P features:
- Only one consumer per message (that is, once consumed, the message is removed from the message queue) : after running multiple consumers, a message is received by only one consumer, and no other consumers can receive it
- The receiver must successfully reply to the queue after receiving the message: we can change this by specifying the reply mode, which is automatic by default
Therefore, P2P mode should be used if you want every message sent to be processed successfully
The base class for the sample code is as follows
public abstract class ActiveMQBase {
protected IConnectionFactory factory;
protected IConnection connection;
protected ISession session;
public virtual void Init() {
try {
// The default port is 61616
factory = new ConnectionFactory("tcp://localhost:61616");
connection = factory.CreateConnection();
connection.Start();
session = connection.CreateSession();
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}"); }}public abstract void Run();
// Release related resources
public virtual void Release() {
try {
if(session ! =null) session.Close();
if(connection ! =null) connection.Close();
} finally {
session = null;
connection = null;
factory = null; }}}Copy the code
The producers are as follows
public class ActiveMQP2PDemoProducer : ActiveMQBase {
private IMessageProducer messageProducer;
private ActiveMQQueue demoQueue;
public override void Init() {
base.Init();
try {
// Specify a queue for point-to-point communication
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
// Create a producer object
messageProducer = session.CreateProducer(demoQueue);
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}"); }}public override void Run() {
while (true) {
Console.WriteLine("Please enter a message and exit.");
string line = Console.ReadLine();
if (line.Equals("exit", StringComparison.InvariantCultureIgnoreCase)) {
break;
}
MessageProvider creates a text message. There are several ways to create a message in MessageProvider
// Be flexible in the actual project
ITextMessage message = messageProducer.CreateTextMessage(line);
// To send messages, you can call other overloads to set persistence, priority, etcmessageProducer.Send(message); }}public override void Release() {
base.Release();
try {
if(demoQueue ! =null) demoQueue.Dispose();
if(messageProducer ! =null) messageProducer.Close();
} finally {
demoQueue = null;
messageProducer = null; }}}Copy the code
The consumers are as follows
public class ActiveMQP2PDemoComsumer : ActiveMQBase {
private IMessageConsumer messageConsumer;
private ActiveMQQueue demoQueue;
public override void Init() {
base.Init();
try {
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
// Create the consumer of the message
messageConsumer = session.CreateConsumer(demoQueue);
// Add a listener. This event is triggered when a message arrives
messageConsumer.Listener += this.MessageConsumer_Listener;
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}"); }}private void MessageConsumer_Listener(IMessage message) {
// Parse the received message
if (message is ITextMessage msg) {
Console.WriteLine($"Received Message: {msg.Text}"); }}public override void Run() {
// This is used to prevent the console from terminating to ensure that the message can be processed correctly
Console.WriteLine("Please enter a message and exit.");
string line = Console.ReadLine();
}
public override void Release() {
base.Release();
try {
if(demoQueue ! =null) demoQueue.Dispose();
if(messageConsumer ! =null){
messageConsumer.Listener -= this.MessageConsumer_Listener; messageConsumer.Close(); }}finally {
demoQueue = null;
messageConsumer = null; }}}Copy the code
Use as follows
// Initialize the producer
ActiveMQP2PBase demo = new ActiveMQP2PDemoProducer();
// ActiveMQP2PBase demo = new ActiveMQP2PDemoComsumer();
demo.Init();
demo.Run();
demo.Release();
Copy the code
In the ActiveMQ management interface, it can be seen as follows, indicating that the messages sent by producers have been consumed by consumers
The Pub/Sub model
Pub/Sub mode: contains three role topics, Publisher and Subscriber. Multiple publishers send messages to a Topic, and the system delivers these messages to multiple subscribers, which can be thought of as a many-to-many relationship between producers and consumers
The characteristics of the Pub/Sub
- Each message can have multiple consumers
- In order to consume messages, the subscriber must remain running
- To mitigate such strict time dependencies, JMS allows subscribers to create a persistent subscription. This way, even if the subscriber does not run, it will receive the publisher’s message after it runs
Therefore, the Pub/Sub model can be used if messages that are allowed to be sent can be consumed by one or more consumers, or not consumed at all
In C#, it doesn’t differ much from the use of P2P, just initialize the code producer and consumer as described above
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
Copy the code
This part right over here
demoTopic = new ActiveMQTopic("DEMO_TOPIC");
Copy the code
You can view the following data on the administrator page
As you can see from the example, P2P is Queue based, while Pub/Sub is topic-based.
In Pub/Sub mode, many-to-many communication can be achieved, that is, multiple producers and consumers can receive messages as soon as they arrive.
In P2P mode, it can allow multiple producers and consumers. Unlike Pub/Sub, if there are multiple consumers, if a message arrives, they take turns consuming the message, rather than each consumer receiving the message. That is, there is only one consumer for a message
Because in C#, the two modes are used in very different ways, and the behavior of running them is quite different. Therefore, in real projects, we need to pay attention to the distinction between the two to avoid unnecessary confusion
Some problems in the actual project
What about ActiveMQ server outage If we want to recover data after a server outage, we need to persist messages
In general, nonpersistent messages are stored in memory, and persistent messages are stored in files. Their maximum limits are configured in the
node of the configuration file
However, when non-persistent messages accumulate to a certain extent and memory runs out, ActiveMQ will write non-persistent messages in memory to temporary files to free up memory. The difference is that persistent messages are recovered from the file after a restart, while non-persistent temporary files are deleted directly.
Therefore, in order to ensure the reliability of data
- Use persistent messages whenever possible (messages that are not important may not be persistent)
- You can increase the limits on persistent and nonpersistent files to maximize service availability
Lost a message
This is also a matter of persisting messages. In this case, we can
- Try to persist messages
- If you don’t want to persist, then you should handle non-persistent messages as quickly as possible
- With transactions, it is guaranteed that messages will not be lost if the connection is closed
Persistent messages are slow
By default, nonpersistent messages are sent asynchronously; Persistent messages are sent synchronously. When a slow hard disk is encountered, the message sending speed is also slow
However, if the transaction is enabled, messages are sent asynchronously, which greatly improves efficiency. So when sending persistent messages, we should make sure that transactions are enabled. We also recommend that transactions be enabled when non-persistent messages are sent
User-defined ActiveMQ Redelivery Policy
Through the ConnectionFactory. RedeliveryPolicy attribute set
CollisionAvoidancePercent
: The default value is 0.15. This parameter specifies the percentage of the conflict prevention rangeUseCollisionAvoidance
Parameter takes effect only whenMaximumRedeliveries
: The default value is 6. The maximum number of retransmission times. An exception will be thrown when the reconnection times reaches the maximum. If the value is -1, the number of retransmission times is not limited. If the value is 0, retransmission is not performedInitialRedeliveryDelay
: Default value 1000, initial retransmission delayUseCollisionAvoidance
Default value:false
To enable the conflict prevention functionUseExponentialBackOff
Default value:false
To increase the delay time in exponential incrementsBackOffMultiplier
: The default value is 5. The value must be greater than 1 and be enabledUseExponentialBackOff
Parameter takes effect only when.
Multi-consumer concurrent processing
When there are multiple consumers and a large amount of data is accumulated in ActiveMQ, there may be a situation where only one consumer consumes and other consumers do not “work”
In this case, we just need to set the Prefetch value of ActiveMQ to a smaller value. In Queue mode, the default value is 1000; It’s 32766 under Topic. Through the ConnectionFactory. PrefetchPolicy Settings
This is the end of this article and we will cover some other scenarios of ActiveMQ, such as distributed clustering. Welcome to continue to pay attention to the public account [hey hey learning diary], Thank you~