There is plenty of information available on the web about how to get started and use RabbitMQ that will not be covered here. See resources at the end of this article. Double encapsulation of rabbitMQ’s Golang client to decouple the receiving and receiving of messages in a subscribe/publish mode, allowing multiple types of message processing, such as storing and forwarding the same message separately.
(1) RabbitMQ call side encapsulation
1. Define the call-side interface
// Defines our interface for connecting and consuming messages.
type RabbitmqClient interface {
ConnectToBroker(connectionString string)
Publish(msg []byte, exchangeName , exchangeType ,bindingKey string) error
PublishOnQueue(msg []byte, queueName string) error
Subscribe(exchangeName , exchangeType , consumerName ,
bindingKey string, handlerFunc func(amqp.Delivery)) error
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
Close(a)
}
Copy the code
2. Implement the class diagram structure of the interface
See Github for a concrete implementation.
(2) Use receiver decoupling
Define a receiver, including the message processing function, declare the binding exchange and message queue parameters, so that multiple receivers can be defined to asynchronously perform different operations on the message
// Define the receiver to decouple it from the client
type Receiver struct {
// Information about the receiver
ExchangeName string
ExchangeType string
QueueName string
BindingKey string
ConsumerName string
Deliveries chan amqp.Delivery
handlerFunc func(msg amqp.Delivery)// Define a processing method}Copy the code
(3) consumer end encapsulation, combination of calling end and receiver
Define the consumer to implement a subscription and publish mode, and hold rabbitMQ clients and multiple receivers so that multiple receivers can receive messages.
1. Class diagram structure
2. Code implementation
// Define the consumer side, which holds the caller and receiver
type Consumer struct {
Client RabbitmqClient // a client
Receivers []*Receiver
}
func (c *Consumer)Add(rec ... *Receiver){
// Add a receiver
c.Receivers=append(c.Receivers,rec...)
}
// Subscribes the sink to the switch
func (c *Consumer)Subscribe(a){
for _,receiver:=range c.Receivers{
err:=c.Client.Subscribe(receiver.ExchangeName,
receiver.ExchangeType,
receiver.ConsumerName,
receiver.BindingKey,
receiver.handlerFunc)
iferr ! =nil {
log.Printf("Subscribe error: %s %s ",receiver,err)
}
}
}
// Subscribe to a specific queue
func (c *Consumer)SubscribeToQueue(a){
for _,receiver:=range c.Receivers{
err:=c.Client.SubscribeToQueue(receiver.QueueName,
receiver.ConsumerName,
receiver.handlerFunc)
iferr ! =nil {
log.Printf("SubscribeToQueue error: %s %s ",receiver,err)
}
}
}
Copy the code
(4) Test
Define a topic-type exchange that uses the RoutingKey to send messages of a specific type.
func TestConsumer(t *testing.T) {
var receiver *Receiver
var client *MessagingClient
var consumer Consumer
var exname="chat" // Switch name
var extype=amqp.ExchangeTopic // Use the tPOIC switch type
var routingkey="*.waiwen" // Message routing key, which represents the message sent to Waiwen
var queueName="waiwen" // Name a queue name
client=&MessagingClient{}
//defer client.Close()
var connectionStr="It: / / admin: [email protected]:5672 /" / / links
client.ConnectToBroker(connectionStr)
go func(a) {
var body=[]byte("hello waiwen")
err:=client.Publish(body,exname,extype,"xiaoming.waiwen",queueName)
iferr! =nil{
log.Printf("publish msg error : %s \n",err)
}else{
log.Printf("A message was sent: %s \n", body)
}
}()
receiver=&Receiver{
ExchangeType:extype,
ExchangeName:exname,
ConsumerName:"",
QueueName:queueName,
BindingKey:routingkey,
Deliveries:make(chan amqp.Delivery),
handlerFunc: func(msg amqp.Delivery) {
log.Printf("get message from queue: %v \n".string(msg.Body))
},
}
consumer=Consumer{
Client:client,
Receivers:[]*Receiver{},
}
consumer.Add(receiver)
consumer.Subscribe()
select{}}Copy the code
2. Test Results:
(5) Reference
Thanks for sharing!