“This article has participated in the call for good writing activities, click to view: the back end, the big front end double track submission, 20,000 yuan prize pool waiting for you to challenge!”

Let’s learn RabbitMQ ii: The basic application of the six RabbiMQ modes

Hi, everyone, I am Nezha. We will start our study of open source components from today. We will summarize and share while learning

The article is outlined as follows:

  • RabbitMQ member composition
  • Six modes of operation for RabbitMQ

RabbitMQ member composition

  • Producers producer
  • Consumers consumer
  • Switch exchange

Used to receive and distribute messages

  • The message the message
  • Queue queue

Used to store messages for producers

  • Channel is the channel closer

Channel used by message push

  • Connect the connections

TCP connection established between the producer or consumer and Rabbit

  • The routing key routingKey

Used to allocate producer data to the switch

  • The binding key BindingKey

Used to bind messages from the exchange to queues

  • Connection manager ConnectionFactory

A manager used in application code to establish a connection between the application and Rabbit

Six modes of operation for RabbitMQ

Single mode

  • The message producer queues the message
  • The consumer of the message listens to the message queue and consumes it if there is a message in the queue

The directory is as follows:

. ├ ─ ─ consumer. Go ├ ─ ─. Mod ├ ─ ─. Sum ├ ─ ─ main. Go └ ─ ─ XMTMQ └ ─ ─ XMTMQ. GoCopy the code

The actual encoding is as follows:

The coding idea of each mode is as follows:

Producer/consumer

  • The server to connect to RabbitMQ
  • Initialize the connection connection
  • Initialize the channel channel
  • Initialize the Exchange switch
  • Initialize the queue
  • Use the route key, bind the queue bind, key
  • Messages are produced/consumed

Message XMTMQ. Go

package xmtmq

import (
   "github.com/streadway/amqp"
   "log"
)
/ / single mode
// Define the RabbitMQ data structure
// go get github.com/streadway/amqp

type RabbitMQ struct {
   conn      *amqp.Connection / / the connection
   channel   *amqp.Channel    / / channel
   QueueName string           / / the queue name
   Exchange  string           / / switches
   Key       string           / / routing key
   MQUrl     string           // VIRTUAL machine address of MQ
}

// New a RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.QueueName == ""  || rbt.MQUrl == "" {
      log.Panic("please check QueueName,Exchange,MQUrl ...")
   }

   conn, err := amqp.Dial(rbt.MQUrl)
   iferr ! =nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn

   channel, err := rbt.conn.Channel()
   iferr ! =nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}


func RabbitMQFree(rbt *RabbitMQ){
   if rbt == nil{
      log.Printf("rbt is nil,free failed")
      return
   }
   rbt.channel.Close()
   rbt.conn.Close()
}
func (rbt *RabbitMQ) Init(a) {
   // Request a queue
   _, err := rbt.channel.QueueDeclare(
      rbt.QueueName, / / the queue name
      true.// Whether to persist
      false.// Whether to delete it automatically
      false.// Whether exclusive
      false.// Whether to block
      nil.// Other parameters
   )
   iferr ! =nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return}}// Production message

func (rbt *RabbitMQ) Produce(data []byte) {

   // Add data to the queue
   err := rbt.channel.Publish(
      rbt.Exchange,        / / switches
      rbt.QueueName,       / / the queue name
      false.// If true, the message will be returned to the sender if the exchange type and routekey rules cannot find a matching queue
      false.// If true, when Exchange sends a message to a queue and finds no consumers on the queue, the message is returned to the sender
      amqp.Publishing{
         ContentType: "text/plain",
         Body:        data,
      },
   )
   iferr ! =nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }
   return
}

// Consume messages
func (rbt *RabbitMQ) Consume(a) {

   // Consumption data
   msg, err := rbt.channel.Consume(
      rbt.QueueName,    / / the queue name
      "xmt".// The name of the consumer
      true.// Whether to answer automatically
      false.// Whether exclusive
      false.If true, messages sent by producers in the same Conenction cannot be passed to consumers in the Connection
      false.// Whether to block
      nil.// Other attributes
   )

   iferr ! =nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }

   for data := range msg {
      log.Printf("received data is %v".string(data.Body))
   }

}
Copy the code

main.go

package main

import (
   "fmt"
   "log"
   "time"
   "xmt/xmtmq"
)

/* RabbimtMQ Single mode case application scenario: simple message queue use, one producer one consumer produces messages */

func main(a) {
	// Set the log
   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }
    
   xmtmq.NewRabbitMQ(rbt)

   var index = 0

   for {
       // Production message
      rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index)))
      log.Println("Sent successfully", index)
      index++
      time.Sleep(1 * time.Second)
   }

}
Copy the code

consumer.go

package main

import (
   "log"
   "xmt/xmtmq"
)

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt)
   rbt.Consume()
}
Copy the code

When running, open two terminals

Terminal 1: Go run main.go

Terminal 2: Go Run consumer.go

The work mode

Multiple consumers consume messages in the same queue, and queues send messages to consumers on average in the form of polling. Resources here are competitive

When producers can produce messages faster than consumers can consume them, consider using the Work mode to increase processing speed and load

Work is similar to Single, but with a few more consumers

In single mode, open a terminal 3: Go Run consumer

Publish/subscribe mode

Publish/subscribe, with one more switch than the Work Queues, where resources are shared

Used in the scene

  • E-mail group
  • Group chat
  • Broadcasting (advertising, etc)

The directory is the same as the above code:

xmtmq.go

Start using the exchange, FANout type of the switch

The production end sends messages to the switch and the switch sends messages to bound queues. Each bound queue can receive messages sent by the production end

package xmtmq

import (
   "github.com/streadway/amqp"
   "log"
)

/ / the publish model
// Define the RabbitMQ data structure
// go get github.com/streadway/amqp

type RabbitMQ struct {
   conn      *amqp.Connection / / the connection
   channel   *amqp.Channel    / / channel
   QueueName string           / / the queue name
   Exchange  string           / / switches
   Key       string           / / routing key
   MQUrl     string           // VIRTUAL machine address of MQ
}

// New a RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" {
      log.Panic("please check Exchange,MQUrl ...")
   }

   conn, err := amqp.Dial(rbt.MQUrl)
   iferr ! =nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn

   channel, err := rbt.conn.Channel()
   iferr ! =nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}

func RabbitMQFree(rbt *RabbitMQ) {
   if rbt == nil {
      log.Printf("rbt is nil,free failed")
      return
   }

   rbt.channel.Close()
   rbt.conn.Close()
}

func (rbt *RabbitMQ) Init(a) {
   1. Create a switch
   err := rbt.channel.ExchangeDeclare(
      rbt.Exchange,        / / switches
      amqp.ExchangeFanout, // Switch type
      true.// Whether to persist
      false.// Whether to delete it automatically
      false.//true indicates that the exchange cannot be used by the client to push messages. It is only used for exchange binding
      false.// Whether to block
      nil.// Other attributes
   )
   iferr ! =nil {
      log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
      return}}// The production message publish

func (rbt *RabbitMQ) PublishMsg(data []byte) {

   // add data to queue
   err := rbt.channel.Publish(
      rbt.Exchange, / / switches
      ""./ / the queue name
      false.// If true, the message will be returned to the sender if the exchange type and routekey rules cannot find a matching queue
      false.// If true, when Exchange sends a message to a queue and finds no consumers on the queue, the message is returned to the sender
      amqp.Publishing{
         ContentType: "text/plain",
         Body:        data,
      },
   )
   iferr ! =nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }
   return

}

// Consume messages
func (rbt *RabbitMQ) SubscribeMsg(a) {

   // create a queue
   q, err := rbt.channel.QueueDeclare(
      "".// The name of the queue is generated randomly
      true.false.false.false.nil.)iferr ! =nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }

   // 2
   err = rbt.channel.QueueBind(
      q.Name,       // Queue name
      "".// In publish mode, key is null
      rbt.Exchange, // Switch name
      false.// Whether to block
      nil.// Other attributes
   )
   iferr ! =nil {
      log.Printf("rbt.channel.QueueBind error : %v", err)
      return
   }

   // 3. Consumption data
   msg, err := rbt.channel.Consume(
      q.Name, / / the queue name
      "xmt".// The name of the consumer
      true.// Whether to answer automatically
      false.// Whether exclusive
      false.If true, messages sent by producers in the same Conenction cannot be passed to consumers in the Connection
      false.// Whether to block
      nil.// Other attributes
   )

   iferr ! =nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }

   for data := range msg {
      log.Printf("received data is %v".string(data.Body))
   }

}
Copy the code

main.go

package main

import (
   "fmt"
   "log"
   "time"
   "xmt/xmtmq"
)

/* RabbimtMQ publish mode publish mode */

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt := &xmtmq.RabbitMQ{
      Exchange:  "xmtPubEx",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt)
   rbt.Init()

   var index = 0

   for {
      rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index)))
      log.Println("Sent successfully", index)
      index++
      time.Sleep(1 * time.Second)
   }

   xmtmq.RabbitMQFree(rbt)

}
Copy the code

consumer.go

package main

import (
   "log"
   "xmt/xmtmq"
)

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt)
   rbt.SubscribeMsg()
   xmtmq.RabbitMQFree(rbt)
}
Copy the code

Perform the same operations as above

Terminal 1: Go run main.go

Terminal 2: Go Run consumer.go

Terminal 3: Go Run consumer.go

The obvious difference between the effect and the single and Work modes mentioned above is that in the case of publish and subscribe, the producer produces the message, and the corresponding consumer consumes the content he produces

Routing mode

The message producer sends the message to the switch according to the route, which is a string. The message currently generated carries the route character (object method). The switch can only match the message queue corresponding to the key according to the route key

** Application scenarios: ** Obtain the corresponding function string from the system code logic, and throw the message task into the corresponding queue business scenarios, such as error processing, processing specific messages, etc

Producer processing process:

Declare a queue and declare a switch -> Create a connection -> Create a channel -> Channel Declaration Switch -> Channel declaration queue -> Bind a queue to a switch through a channel and specify a RoutingKey for that queue -> Make a message -> Send a message and specify a RoutingKeyCopy the code

Consumer processing process:

Declare queues and declare switches -> Create connections -> Create channels -> Channel declaration switches -> Channel declaration queues -> Bind queues to switches through channels and specify routingKey (wildcard) -> Override message consumption methods -> Execute message methodsCopy the code

The directory structure is as follows:

. ├ ─ ─ consumer2. Go ├ ─ ─ consumer. Go ├ ─ ─. Mod ├ ─ ─. Sum ├ ─ ─ main. Go └ ─ ─ XMTMQ └ ─ ─ XMTMQ. GoCopy the code

xmtmq.go

  • The switch type is Direct

  • Using routing keys

package xmtmq

import (
   "github.com/streadway/amqp"
   "log"
)

/ / routing model
// Define the RabbitMQ data structure
// go get github.com/streadway/amqp

type RabbitMQ struct {
   conn      *amqp.Connection / / the connection
   channel   *amqp.Channel    / / channel
   QueueName string           / / the queue name
   Exchange  string           / / switches
   Key       string           / / routing key
   MQUrl     string           // VIRTUAL machine address of MQ
}

// New a RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" {
      log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")
   }

   conn, err := amqp.Dial(rbt.MQUrl)
   iferr ! =nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn

   channel, err := rbt.conn.Channel()
   iferr ! =nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}

func RabbitMQFree(rbt *RabbitMQ) {
   if rbt == nil {
      log.Printf("rbt is nil,free failed")
      return
   }

   rbt.channel.Close()
   rbt.conn.Close()
}

func (rbt *RabbitMQ) Init(a) {
   1. Create a switch
   err := rbt.channel.ExchangeDeclare(
      rbt.Exchange, / / switches
      amqp.ExchangeDirect,     // Switch type
      true.// Whether to persist
      false.// Whether to delete it automatically
      false.//true indicates that the exchange cannot be used by the client to push messages. It is only used for exchange binding
      false.// Whether to block
      nil.// Other attributes
   )
   iferr ! =nil {
      log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
      return
   }

   // create a queue
   _, err = rbt.channel.QueueDeclare(
      rbt.QueueName, // The name of the queue is generated randomly
      true.false.false.false.nil.)iferr ! =nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }

   // 3
   err = rbt.channel.QueueBind(
      rbt.QueueName, // Queue name
      rbt.Key,       // routing, where key is required
      rbt.Exchange,  // Switch name
      false.// Whether to block
      nil.// Other attributes
   )
   iferr ! =nil {
      log.Printf("rbt.channel.QueueBind error : %v", err)
      return}}// The production message publish

func (rbt *RabbitMQ) ProduceRouting(data []byte) {

   // add data to queue
   err := rbt.channel.Publish(
      rbt.Exchange, / / switches
      rbt.Key,      // key
      false.// If true, the message will be returned to the sender if the exchange type and routekey rules cannot find a matching queue
      false.// If true, when Exchange sends a message to a queue and finds no consumers on the queue, the message is returned to the sender
      amqp.Publishing{
         ContentType: "text/plain",
         Body:        data,
      },
   )
   iferr ! =nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }

   return
}

// Consume messages
func (rbt *RabbitMQ) ConsumeRoutingMsg(a) {

   // 4. Consumption data
   msg, err := rbt.channel.Consume(
      rbt.QueueName, / / the queue name
      "".// The name of the consumer
      true.// Whether to answer automatically
      false.// Whether exclusive
      false.If true, messages sent by producers in the same Conenction cannot be passed to consumers in the Connection
      false.// Whether to block
      nil.// Other attributes
   )

   iferr ! =nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }


   for data := range msg {
      log.Printf("received data is %v".string(data.Body))
   }

}
Copy the code

main.go

package main

import (
   "fmt"
   "log"
   "time"
   "xmt/xmtmq"
)

/* RabbimtMQ Routing case Application scenarios: Obtain the function string from the system code logic and throw the message task to the corresponding queue service scenarios, such as processing errors, processing production messages such as specific messages */

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt1 := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key: "xmt1",
      QueueName: "Routingqueuexmt1",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt1)
   rbt1.Init()


   rbt2 := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key: "xmt2",
      QueueName: "Routingqueuexmt2",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt2)
   rbt2.Init()


   var index = 0

   for {
      rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1 %d ", index)))
      log.Println("Xmt1 sent successfully", index)

      rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2 %d ", index)))
      log.Println("Xmt2 sent successfully", index)


      index++
      time.Sleep(1 * time.Second)
   }


   xmtmq.RabbitMQFree(rbt1)
   xmtmq.RabbitMQFree(rbt2)

}
Copy the code

consumer.go

package main

import (
   "log"
   "xmt/xmtmq"
)

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key: "xmt1",
      QueueName: "Routingqueuexmt1",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt)
   rbt.ConsumeRoutingMsg()
   xmtmq.RabbitMQFree(rbt)
}
Copy the code

consumer2.go

package main

import (
   "log"
   "xmt/xmtmq"
)

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt := &xmtmq.RabbitMQ{
      Exchange: "xmtPubEx2",
      Key:      "xmt2",
      QueueName: "Routingqueuexmt2",
      MQUrl:    "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt)
   rbt.ConsumeRoutingMsg()
   xmtmq.RabbitMQFree(rbt)
}
Copy the code

Topic schema

Topic mode, where a message is fetched by multiple consumers and the destination queue of the message can be a wildcard of the BindingKey

The Topics pattern is actually one of the routing patterns

The biggest difference between the two is that the Topics mode sends and consumes messages matched by wildcards

  • The asterisk means you can match a word
  • The # sign means zero or more words can be matched

The encoded case remains the same as the above routing pattern, except that exchange is of topic type

Here are some of the patterns mentioned aboveswitchesandThe queue

The RPC model

RPC remote procedure call, the client remote call server method, using MQ can achieve RPC asynchronous call

The directory structure is:

. ├ ─ ─ consumer. Go ├ ─ ─. Mod ├ ─ ─. Sum ├ ─ ─ main. Go └ ─ ─ XMTMQ └ ─ ─ XMTMQ. GoCopy the code
  • The client is both a producer and a consumerRPCRequest queue sendingRPCCalls the message while listeningRPCThe response queue
  • Server listeningRPCRequest a message from the queue, execute the server-side method upon receipt of the message, and get the result returned by the method
  • The server willRPCThe result of the method is sent toRPCResponse queue.
  • Client listeningRPCResponse queue, receivedRPCThe results

xmtmq.go

package xmtmq

import (
   "github.com/streadway/amqp"
   "log"
   "math/rand"
)

/ / the RPC model
// Define the RabbitMQ data structure
// go get github.com/streadway/amqp

type RabbitMQ struct {
   conn      *amqp.Connection / / the connection
   channel   *amqp.Channel    / / channel
   QueueName string           / / the queue name
   Exchange  string           / / switches
   Key       string           / / routing key
   MQUrl     string           // VIRTUAL machine address of MQ
}

// New a RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
   if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
      log.Panic("please check QueueName,Exchange,MQUrl ...")
   }

   conn, err := amqp.Dial(rbt.MQUrl)
   iferr ! =nil {
      log.Panicf("amqp.Dial error : %v", err)
   }
   rbt.conn = conn

   channel, err := rbt.conn.Channel()
   iferr ! =nil {
      log.Panicf("rbt.conn.Channel error : %v", err)
   }
   rbt.channel = channel
}

func RabbitMQFree(rbt *RabbitMQ) {
   if rbt == nil {
      log.Printf("rbt is nil,free failed")
      return
   }
   rbt.channel.Close()
   rbt.conn.Close()
}

// Production message

func (rbt *RabbitMQ) Produce(data []byte) {

   // Request a queue
   q, err := rbt.channel.QueueDeclare(
      rbt.QueueName, / / the queue name
      true.// Whether to persist
      false.// Whether to delete it automatically
      false.// Whether exclusive
      false.// Whether to block
      nil.// Other parameters
   )
   iferr ! =nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }

   err = rbt.channel.Qos(1.0.false)
   iferr ! =nil {
      log.Printf("rbt.channel.Qos error : %v", err)
      return
   }

   d, err := rbt.channel.Consume(
      q.Name,
      "".false.false.false.false.nil)
   iferr ! =nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }

   for msg := range d {
      log.Println("received msg is ".string(msg.Body))
      err := rbt.channel.Publish(
         "",
         msg.ReplyTo,
         false.false,
         amqp.Publishing{
            ContentType:   "test/plain",
            CorrelationId: msg.CorrelationId,
            Body:          data,
         })
      iferr ! =nil {
         log.Printf("rbt.channel.Publish error : %v", err)
         return
      }
      msg.Ack(false)
      log.Println("svr response ok ")}return
}
func randomString(l int) string {
   bytes := make([]byte, l)
   for i := 0; i < l; i++ {
      bytes[i] = byte(rand.Intn(l))
   }
   return string(bytes)
}

// Consume messages
func (rbt *RabbitMQ) Consume(a) {

   // Request a queue
   q, err := rbt.channel.QueueDeclare(
      ""./ / the queue name
      true.// Whether to persist
      false.// Whether to delete it automatically
      false.// Whether exclusive
      false.// Whether to block
      nil.// Other parameters
   )
   iferr ! =nil {
      log.Printf("rbt.channel.QueueDeclare error : %v", err)
      return
   }

   // Consumption data
   msg, err := rbt.channel.Consume(
      q.Name, / / the queue name
      "xmt".// The name of the consumer
      true.// Whether to answer automatically
      false.// Whether exclusive
      false.If true, messages sent by producers in the same Conenction cannot be passed to consumers in the Connection
      false.// Whether to block
      nil.// Other attributes
   )
   iferr ! =nil {
      log.Printf("rbt.channel.Consume error : %v", err)
      return
   }
   id := randomString(32)
   err = rbt.channel.Publish(
      "",
      rbt.QueueName,
      false.false,
      amqp.Publishing{
         ContentType:   "test/plain",
         CorrelationId: id,
         ReplyTo:       q.Name,
         Body:          []byte("321"})),iferr ! =nil {
      log.Printf("rbt.channel.Publish error : %v", err)
      return
   }

   for data := range msg {
      log.Printf("received data is %v".string(data.Body))
   }
}
Copy the code

main.go

package main

import (
   "fmt"
   "log"
   "xmt/xmtmq"
)

/* RabbimtMQ RPC mode case Application scenario: Simple message queue usage, one producer one consumer produces messages */

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt)

   rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))}Copy the code

consumer.go

package main

import (
   "log"
   "math/rand"
   "time"
   "xmt/xmtmq"
)

func main(a) {

   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
   rand.Seed(time.Now().UTC().UnixNano())
   rbt := &xmtmq.RabbitMQ{
      QueueName: "xmtqueue",
      MQUrl:     "It: / / guest: [email protected]:5672 / XMTMQ",
   }

   xmtmq.NewRabbitMQ(rbt)
   rbt.Consume()
}
Copy the code

Let’s run consumers first, let’s run a few more, and you can see that we already have data in our queue, so we’re running 2 consumers, so this is 2

Run the producer command to see that the producer consumes the message sent by the consumer. Use CorrelationId to find the queue monitored by the corresponding consumer and send the data to the queue

When the queue that the consumer listens to has data, the consumer takes it out for consumption

conclusion

RabbitMQ works in six modes:

  • Single mode
  • The work mode
  • Publish/subscribe mode
  • Routing mode
  • Topic schema
  • The RPC model

References:

RabbitMQ Tutorials

Welcome to like, follow and favorites

Friends, your support and encouragement, I insist on sharing, improve the quality of the power

All right, that’s it for this time

Technology is open, our mentality, should be more open. Embrace change, live in the sun, and strive to move forward.

I am Nezha, welcome to like, see you next time ~