This is the fourth day of my participation in the August Text Challenge.More challenges in August

PS: The function and characteristics of message queue have been introduced previously. This chapter realizes a simple message queue by referring to distributed Message Middleware Practice – Ni Wei.

Message processing center

Let’s take this message queue diagram, put it here, and the main idea is:

At the heart of the message queue is the Broker, which is the data processing center. So start with the Broker implementation

The Broker implementation

/ * * *@Package: com.anzhi.simplemq
 * @ClassName: Broker
 * @Author: AZ
 * @CreateTime: 2021/8/2 and *@Description: * /
public class Broker {
    // Set the maximum size of the message queue
    private final static int MAX_SIZE = 3;

    ArrayBlockingQueue: a bounded blocking queue
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);

    // Production message
    public static void produce(String msg){
        if(messageQueue.offer(msg)){
            System.out.println("Successfully delivered message to message processing center:" + msg + ", the current number of temporary messages is:" + messageQueue.size());
        }else{
            System.out.println("Temporary messages in the message processing center have reached the maximum load. No more messages can be added!");
        }
        System.out.println("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =");
    }
    
    // Consume messages
    public static String consume(a) {
        String msg = messageQueue.poll();
        if(msg ! =null) {// If the consumption condition is met, a message is fetched from the container
            System.out.println("Already consumed message:" + msg + ", the current number of temporary messages is:" + messageQueue.size());
        }else{
            System.out.println("There are no messages to consume in the message processing center! ");
        }
        System.out.println("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =");
        
        returnmsg; }}Copy the code

The logic of the code is very simple, so I won’t repeat it,

BrokerServer

With the logic to process the data, we also need to provide a service to expose the method for others to use, so we need to implement the BrokerServer class to provide the Broker service externally.

/ * * *@Package: com.anzhi.simplemq
 * @ClassName: BrokerServer
 * @Author: AZ
 * @CreateTime: 2021/8/2 10:53
 * @Description: * /
public class BrokerServer implements Runnable{
    public static int SERVICE_PORT = 9999;

    private final Socket socket;

    public BrokerServer(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run(a) {
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream());
            while (true) {
                String str = in.readLine();
                if (str == null) {
                    continue;
                }
                System.out.println("Received raw data:" + str);

                if (str.equals("CONSUME")) {   //CONSUME means to CONSUME a message
                    Consume a message from the message queue
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                } else {
                    // All other cases indicate that the production message is placed in the message queueBroker.produce(str); }}}catch (Exception e){
            System.out.println("Message queue service exception"+ e.getMessage()); }}public static void main(String[] args) {
        try {
            ServerSocket server = new ServerSocket(SERVICE_PORT);
            while (true) {
                BrokerServer brokerServerv = new BrokerServer(server.accept());
                newThread(brokerServerv).start(); }}catch (Exception e){
            System.out.println("Calling message service exception:"+ e.getMessage()); }}}Copy the code

Socket is used here to provide services without using a framework. By implementing the Runnable interface, the thread is started and an infinite loop is used to keep the program from exiting. When the main method is started, a message queue center service is started

Client Consumer, Produce

With a server, it is natural to need a corresponding client to send and receive messages.

/ * * *@Package: com.anzhi.SimpleMQTest
 * @ClassName: ProduceClinet
 * @Author: AZ
 * @CreateTime: 2021/8/2 all *@Description: Production message */
public class ProduceClinet {
    public static void main(String[] args) throws Exception{
        MqClient.produce("Hello MQ !"); }}Copy the code

Error: Connection reset This is because the client is disconnected after sending the message, and the server is reading and writing, causing an exception. Look at TCP three or four times. This is executed multiple times and the queue is full. Connection reset error description

Connected to the target VM, address: '127.0.0.1:9237', transport: 'socket' Successfully delivered message to message processing center: Hello MQ! , the current staging is the number of messages: 1 = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = message queue service exceptions Connection reset to receive the raw data: Hello MQ! Successfully delivered message to message processing center: Hello MQ! , the current staging is the number of messages: 2 = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = message queue service exceptions Connection reset to receive the raw data: Hello MQ! Successfully delivered message to message processing center: Hello MQ! , the current staging is the number of messages: 3 = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = message queue service exceptions Connection reset to receive the raw data: Hello MQ! Temporary messages in the message processing center reached the maximum load, can not continue to add messages! = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = receive the raw data: Hello MQ! Temporary messages in the message processing center reached the maximum load, can not continue to add messages! = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = receive the raw data: Hello MQ! Temporary messages in the message processing center reached the maximum load, can not continue to add messages! = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = message queue service exceptions Connection reset to receive the raw data: Hello MQ! Temporary messages in the message processing center reached the maximum load, can not continue to add messages! = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =Copy the code

We then start consuming the messages in the queue

/ * * *@Package: com.anzhi.SimpleMQTest
 * @ClassName: ConsumerClient
 * @Author: AZ
 * @CreateTime: 2021/8/2 14:58
 * @Description: * /
public class ConsumerClient {
    public static void main(String[] args) {
        MqClient.consume();
        while (true) {}}}Copy the code

Consumption results are as follows

Raw data received: CONSUME CONSUME messages: Hello MQ! , the current staging is the number of messages: 2 = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = receive the raw data: CONSUME has consumption news: Hello MQ! , the current staging is the number of messages: 1 = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = receive the raw data: CONSUME has consumption news: Hello MQ! , the current staging is the number of messages: 0 = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = receive the raw data: CONSUME no news for consumption in the message processing center! = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =Copy the code

A simple message queue implementation is now complete. (PS: not considering other too complex issues and business scenarios)

Summary: The above code execution order is roughly as follows: