This is the 22nd day of my participation in the August More Text Challenge

Introduction to the

Rabbitmq is one of the open source message queues developed using Erlang

  • Messaging: Also known as messaging, this refers to communication between degrees using message data rather than programs calling programs directly
  • Queue: Queue meaning, the message in the order of first in first out, in order

Application scenarios

  • Asynchronous processing: Results that are not returned in real time are returned by other methods, such as self-service shopping counters, where shopping is done first and settlement is notified later
  • Application decoupling: Services call each other for notifications, which can be used to decouple messages and reduce interface stress
  • Peak traffic clipping: Generally, in the second kill activity, selective discarding part of the request, according to the service capacity to accept the number of requests

The installation

For quick and convenient, here use docker quick installation, previous docker quick installation software portal: juejin.cn/post/684490…

Install the rabbitMQ UI image (3.8.5-management) and use a variable to set the administrator account secret

docker run -itd --rm --hostname rabbitmq-server \
-v /data/docker/rabbitmq:/var/lib/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin \
-p 15672:15672 -p 5672:5672 rabbitmq:3.8. 5-management
Copy the code

The client

The rabbitmq Golang client used to be github.com/streadway/a… Maintenance. It’s now in the hands of the authorities, and the name has been changed.

Official client: github.com/rabbitmq/am…

The client is very mature, has been a project for over 10 years and has been maintained, currently supports mq version 3.9.3, and the installed VERSION 3.8 mq is certainly fine

example

The following example is from the official website, and some of the methods have been modified

Official website example address: github.com/rabbitmq/ra…

The global variable

To avoid code redundancy and decoupling, some variables and methods are named generically or globally

var (
   conn *amqp.Connection
   err error
)
func init() {
   conn, err = amqp.Dial("amqp://admin:admin@ip:5672/")
   iferr ! = nil { log.Fatal("Failed to connect to MQ:", err)
   }
}
Copy the code

Production, consumption

One in, one out, one on one workflow, this is a simple message queue pattern

“P” is our producer and “C” is our consumer. The middle box is a queue – RabbitMQ represents the message buffer reserved for consumers. Images from RabbitMQ official

For testing purposes, the production and message functions are written as two functions that run directly in main

Send a message

Send a message with the queue name hello, or/if no MQ vm is specified

func send() {
   ch, _ := conn.Channel()
   defer ch.Close()
   // Queue configuration
   q, _ := ch.QueueDeclare("hello".false.false.false.false, nil)
   body: ="Hello World!"
   // Send a message
   err = ch.Publish("", q.Name, false.false, amqp.Publishing{
      ContentType: "test/plain".Body:        []byte(body),
   })
   iferr ! = nil { log.Println("Failed to send message:", err)
   }
   log.Println("Successful delivery of the message, message content:", body)
}
Copy the code

Receives the message

A coroutine is used to receive messages, because there is only one, and it is read quickly, but the program does not exit because an unbuffered chan is used and it is not closed. If you want to automatically exit after spending, you can comment forever

func receive() {
   ch, _ := conn.Channel()
   defer ch.Close()
   // Queue configuration
   q, _ := ch.QueueDeclare("hello".false.false.false.false, nil)
   // Consume messages
   msgs, _ := ch.Consume(q.Name, "".true.false.false.false, nil)
   forever := make(chan bool)
   // Start a coroutine, a function without arguments
   go func() {
      for d := range msgs {
         log.Printf("Received message: %s",d.Body)
      }
   }()
   log.Printf("Waiting for message, press Ctrl+ C to exit")
   // block wait
   <- forever
}
Copy the code

Task queue

When requests are very dense, tasks can be encapsulated into messages and sent to the queue, and then multiple workers share a queue at the same time and return after processing

Multiple consumers share a work queue – image from RabbitMQ website

Multitasking needs to be packaged, and arguments are passed at startup, otherwise the default message is Hello, defined in the bodyFrom function

go run newTask.go First message.
go run newTask.go Second message..
go run newTask.go Third message...
go run newTask.go Fourth message....
go run newTask.go Fifth message.....
Copy the code

multitasking

func newTask() {
   ch, _ := conn.Channel()
   defer ch.Close()

   q, _ := ch.QueueDeclare("task_queue".true.false.false.false, nil)
   body := bodyFrom(os.Args)
   log.Println(body)
   err := ch.Publish("", q.Name, false.false, amqp.Publishing{
      DeliveryMode: amqp.Persistent,
      ContentType:  "test/plain".Body:         []byte(body),
   })
   iferr ! = nil { log.Println("Failed to send message:", err)
   }
   log.Println("Message sent successfully, message content:", body)
}
Copy the code

Multiple consumers need to start more than one in IDEA, otherwise it cannot be reflected. After starting two workers, it will be found that messages come in rotation, and messages will be given to one in sequence. Two workers always receive the same number of messages (deviation 1), which is due to amqp.Persistent set in message mode

Ack (false) is a manual message confirmation mechanism to ensure consumption. If the worker dies and the message is not processed, it will be handed over to another worker for processing. The automatic confirmation mechanism needs to pass true into the third party parameter in Consume method

Many consumers

func worker() {
   ch, _ := conn.Channel()
   defer ch.Close()
   q, _ := ch.QueueDeclare("task_queue".true.false.false.false,nil,)
   _ = ch.Qos(1.0.false)
   msgs, _ := ch.Consume(q.Name, "".false.false.false.false, nil)
   forever := make(chan bool)
   go func() {
      for d := range msgs {
         log.Printf("Received message content: %s", d.Body)
         dotCount := bytes.Count(d.Body,[]byte("."))
         t := time.Duration(dotCount)
         time.Sleep(t * time.Second)
         log.Printf("Complete")
         d.Ack(false)
      }
   }()
   <- forever
}
Copy the code

Release subscription

Rabbitmq complete messaging model:

  1. The producer never sends any messages directly to the queue
  2. Producers can only send messages to the switch
  3. The exchange receives the message from the producer and pushes it to the queue

The delivery type can be Direct, Topic, headers, or FANout

Fanout type producer

Now it’s time to call the method that declares the exchange

This approach is similar to broadcasting in that any consumer listening on the switch can receive a message

func emitLog() {
   ch, _ := conn.Channel()
   defer ch.Close()

   _ = ch.ExchangeDeclare("logs"."fanout".true.false.false.false, nil)
   body := bodyFrom(os.Args)
   err = ch.Publish("logs"."".false.false, amqp.Publishing{ContentType: "test/plain".Body: []byte(body)})
   iferr ! = nil { log.Println("Failed to send message:", err)
   }
   log.Println("Message sent successfully, message content:", body)
}
Copy the code

consumers

The main thing here is to bind the queue to the switch.

The consumer needs to start the listener first and then the producer, otherwise the message is safely discarded because there is no listener

Multiple workers are allowed to start and listen to the same switch at the same time and consume the same data

func reveiveLogs() {
   ch, _ := conn.Channel()
   defer ch.Close()
   _ = ch.ExchangeDeclare("logs"."fanout".true.false.false.false, nil)
   q, _ := ch.QueueDeclare("".false.false.true.false, nil)
   _ = ch.QueueBind(q.Name, ""."logs".false, nil)
   msgs, _ := ch.Consume(q.Name, "".true.false.false.false, nil)
   forever := make(chan bool)

   go func() {
      for d := range msgs {
         log.Printf("[x] %s", d.Body)
      }
   }()
   <-forever
}
Copy the code

After running, you can see related information on the administration page

routing

Routing in RabbitMQ is the process of distributing messages from the switch to queues, depending on the key

Q1 and Q2 are bound to exchange X. Q1 is bound to orage, and Q2 is bound to black and green. Messages with other keys will be discarded

There is also a binding similar to the Fanout exchange that implements the Fanout exchange on the Direct exchange, as shown in the figure below

producers

Message production can be written in a loop that runs several times

func emitLogDirect() {
   ch, _ := conn.Channel()
   defer ch.Close()
   _ = ch.ExchangeDeclare("logs_direct"."direct".true.false.false.false,nil)
   body := bodyFrom(os.Args)
   err = ch.Publish("logs_direct"."green".false.false,amqp.Publishing{ContentType:"text/plan".Body:[]byte(body)})
   iferr ! = nil { log.Println("Failed to send message:", err)
   }
   log.Println("Message sent successfully, message content:", body)
   _ = ch.Publish("logs_direct"."black".false.false,amqp.Publishing{ContentType:"text/plan".Body:[]byte(body)})
   _ = ch.Publish("logs_direct"."orange".false.false,amqp.Publishing{ContentType:"text/plan".Body:[]byte(body)})
}
Copy the code

consumers

The consumer needs to start before the producer in order to monitor the logs_direct exchange, otherwise the message is automatically discarded. Additionally, the following elimination keys have only green and black, and all orange will be discarded

func reveiveLogsDirect() {
   ch, _ := conn.Channel()
   defer ch.Close()
   _ = ch.ExchangeDeclare("logs_direct"."direct".true.false.false.false,nil)
   q, _ := ch.QueueDeclare("".false.false.true.false, nil)
   _ = ch.QueueBind(q.Name, "green"."logs_direct".false, nil)
   _ = ch.QueueBind(q.Name, "black"."logs_direct".false, nil)
   msgs, _ := ch.Consume(q.Name, "".true.false.false.false, nil)
   forever := make(chan bool)
   go func() {
      for d := range msgs {
         log.Printf(" [x] %s", d.Body)
      }
   }()
   <-forever
}
Copy the code

The logs_direct binding Routing key is displayed on the management interface

conclusion

  • Queue patterns, while possible, are not the core messaging idea of RabbitMQ

  • Fanout exchange mode is suitable for broadcasting, notifying all monitors that the same message has been received

  • The Direct exchange mode, combined with routing, can flexibly define which consumers receive which messages, or only receive the messages they need

  • The topic exchange pattern, which is not mentioned in this article, can be more flexible, supporting routing keys as well as message content separation or specific string identification for consumption