Welcome toTencent Cloud + community, get more Tencent mass technology practice dry goods oh ~
This article is compiled by Tnecesoc.
introduce
Microservices are about dividing the business domain of an application into distinct scenarios with well-defined scope and running these scenarios in separate processes such that any persistent relationships that cross boundaries must rely on final consistency rather than ACID-like transactions or foreign key constraints. Many of these concepts derive from, or are inspired by, domain Driven Design (DDD). But DDD is a topic that will take an entire blog series to cover, so I won’t mention it here.
In the context of our Go microservices blog series and microservices architecture, one way to achieve loose coupling between services is to introduce messaging mechanisms to communicate between services that do not need to follow strict request/response message exchanges or similar mechanisms. It is important to note that introducing a messaging mechanism is just one of many strategies that can be used to achieve loose coupling between services.
As we saw in Chapter 8 of the blog series, in Spring Cloud the Spring Cloud Config server uses RabbitMQ as a dependency on the runtime, RabbitMQ should therefore be a good message broker.
For this chapter in this series of blogs, we will have our “Account Service” put a message into the RabbitMQ switch when reading a specific account object. This message will be processed by a brand new microservice that we will write in this blog. We will also put some of the Go language code in a common library to enable code reuse across microservices scenarios.
Remember the system picture from Part 1? Here’s what it looks like after completing part of the chapter:
We have a lot of work to do before we can finish this part, but we can do it.
The source code
There’s a lot of new code in this section, and we can’t put it all in a blog post. To get the full source code, download it with the git clone command and switch to the branch in Chapter 9:
git checkout P9
Copy the code
Send a message
We will implement a simple simulation use case: we want to trigger the “VIP Offer” service when some “VIP” accounts are read in the “Account Service”. This service generates an “offer” for the account holder under certain circumstances. In a properly designed domain model, the accounts object and the VIP Offer object are two separate domains, and they should know as little about each other as possible.
The Account Service should never directly access the VIP Service’s storage space (that is, offers). In this case, we should pass a message to the “VIP service” on RabbitMQ and delegate the business logic and persistent storage entirely to the “VIP Service”.
We will use the AMQP protocol for all communications, which is an application layer protocol as an ISO standard that enables the messaging of the system to be interoperable. Here we will use the streadway/AMQP library that we used for configuration Update in Chapter 8.
Let’s recap the relationships between exchanges in AMQP and publishers, consumers, and queues:
A publisher publishes a message to an exchange point, which sends a copy of the message to a queue (or to multiple queues) based on routing rules or binding information for registered consumers. There’s a good explanation for this answer on Quora.
Code related to messaging
Since we want to use new and existing code to load the configuration we need from the Spring Cloud configuration file in our existing Account Service and new VIP Service, we will create our first shared library here.
Start by creating a new folder common under /goblog to hold reusable content:
mkdir -p common/messaging
mkdir -p common/config
Copy the code
We put all the CODE related to AMQP in the Messaging folder and the configuration files in the Config folder. You can copy the contents of/goblog accountservice/config/goblog/common/config – please remember, this requires us to update the previous import from the account service configuration code import statements. Take a look at the full source code to see how this section is written.
The message-related code is encapsulated in a file that defines the interface and actual implementation that the application uses to connect, publish, and subscribe to messages. In fact, the streadway/AMQP we use already provides a lot of template code to implement AMQP messaging, so we won’t go into the details of this section.
Create a new *.*go file in /goblog/common/ Messaging: messagingClient.go.
Let’s take a look at the main features:
// Define interfaces for connecting, publishing, and consuming messagestype IMessagingClient interface {
ConnectToBroker(connectionString string)
Publish(msg []byte, exchangeName string, exchangeType string) error
PublishOnQueue(msg []byte, queueName string) error
Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
Close()
}
Copy the code
The above code defines the message interface we use. This is what our “Account Service” and “VIP Service” will deal with when it comes to messaging, removing most of the system’s complexity through abstraction. Note that I chose two variants of “Produce” and “Consume” to be used in conjunction with the subscribe/publish topic and the Direct/Queue messaging pattern.
Next, we’ll define a structure that will hold Pointers to amqp.Connection, and we’ll add the necessary methods so that it can (implicitly, as Go has always done) implement the interface we just declared.
// The interface implementation encapsulates a pointer to amqp.connectiontype MessagingClient struct {
conn *amqp.Connection
}
Copy the code
The implementation of the interface is very verbose, and only two of these -connecttobroker () and PublishToQueue() are given here:
func (m *MessagingClient) ConnectToBroker(connectionString string) {
if connectionString == "" {
panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?")
}
var err error
m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString))
iferr ! = nil { panic("Failed to connect to AMQP compatible broker at: " + connectionString)
}
}
Copy the code
This is how we get the Connection pointer, such as amqp.Dial. If we lose the configuration file or can’t connect to the repeater, the microservice will throw a Panic exception and ask the container coordinator to create a new instance from scratch. The connectionString argument passed here looks like this:
amqp://guest:guest@rabbitmq:5672/
Note that the RabbitMQ broker is run in service Docker Swarm mode.
The PublishOnQueue() function is quite long – it is more or less different from the official streadway sample, after all, it simplifies some of its arguments. To publish a message to a named queue, we need to pass these parameters:
- Body – exists as an array of bytes. This can be JSON, XML, or some binary file.
- QueueName – The name of the queue to which the message is to be sent.
For more details on the switch, see the RabbitMQ documentation.
func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error {
if m.conn == nil {
panic("Tried to send message before connection was initialized. Don't do that.")} ch, err := m.conn.channel () // Defer ch.close () with parameters to declare a queue. If the corresponding queue does not exist, Create a queue, err := ch.QueueDeclare(queueName, // queue namefalse, // Whether persistentfalse// Whether to delete when not in usefalse// Whether exclusivefalseErr = ch.publish (); err = ch.publish ();"", // Target the default switch queue.name, // route keyword, such as queue Namefalse, // Must be publishedfalse, // amqp.Publishing{ContentType:"application/json", Body: Body, // JSON Body, given in byte[]}) fmt.printf ("A message was sent to queue %v: %v", queueName, body)
return err
}
Copy the code
There is a little more template code here, but it should be easy to understand. This code will declare a queue (or create one if one doesn’t exist) and publish our message to it as a byte array.
The code to publish the message to a named exchange is more complex because it requires a template code to declare the exchange, and the queue, and bind them together. A complete source code sample is available here.
Next, as a result of our “MessageClient” actual users would be/goblog accountservice/service/handlers. Go, we will add a field inside, And send a message to the “is VIP” check method hard-coded into the program if the requested account has ID “10000” :
var DBClient dbclient.IBoltClient
var MessagingClient messaging.IMessagingClient // NEW
func GetAccount(w http.ResponseWriter, r *http.Request) {
...
Copy the code
And then:
. NotifyVIP (Account) // Send VIP messages in parallel // If there is such an account, then make it a JSON and enclose the header and other contents to package the data, _ := json.Marshal(Account) writeJsonResponse(w, http.statusok, data)} Func notifyVIP(Account Model.account) {if account.Id == "10000" {
go func(account model.Account) {
vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()}
data, _ := json.Marshal(vipNotification)
err := MessagingClient.PublishOnQueue(data, "vipQueue")
iferr ! = nil { fmt.Println(err.Error()) } }(account) } }Copy the code
This is a good opportunity to demonstrate the inline anonymous function used to call a new goroutine, using the go keyword. We can’t block the “main” coroutine just because we want to execute an HTTP handler to send a message, so this is also a good time to add a bit of parallelism.
Main. go also needs to be updated so that the AMQ connection can be initialized at startup using the configuration information loaded and injected into Viper.
// Call the function func inside the main methodinitializeMessaging() {
if! viper.IsSet("amqp_server_url") {
panic("No 'amqp_server_url' set in configuration, cannot start")
}
service.MessagingClient = &messaging.MessagingClient{}
service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
}
Copy the code
This is not interesting – we get the service.MessagingClient instance by creating an empty messaging structure and calling ConnectToBroker with the property value obtained from Viper. If our configuration does not have broker_URL, then throw a Panic exception because the program cannot run when it is impossible to connect to the repeater.
Update the configuration
We added the amQP_broker_URL property to our.yML configuration file in Part 8, so this step has already been done.
Broker_url: AMQp ://guest:[email protected]:5672 _(dev)_ Broker_URL: AMQp :// Guest :guest@rabbitmq:5672 _(test) _Copy the code
Note that we filled the “test” configuration file with Swarm service name “rabbitmq”, not Swarm LAN IP address as seen on my computer. (Your actual IP address should vary, but 192.168.99.100 seems to be standard when running Docker Toolbox).
It is not recommended to fill the configuration file with the user name and password in plain text. In a real-world environment, we would normally use the built-in encryption capabilities in the Spring Cloud Config server we saw in Part 8.
Unit testing
Of course, we should write at least one unit test to make sure that the GetAccount function in Handlers. go tries to send a message when someone requests a very special account identified by “10000”.
To do this, we need to implement a mock IMessagingClient and a new test case in handlers_test.go. Let’s start with the simulation. Here we use the third-party tool Shanshanization to generate a simulated implementation of the IMessagingClient interface (GOPATH must be set before the shell runs the following commands) :
> go get github.com/vektra/mockery/... / >cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging
> ./$GOPATH/bin/mockery -all -output .
Generating mock for: IMessagingClient
Copy the code
Now you have a mock implementation file imessagingClient.go in the current folder. I don’t like the name of this file, and I don’t like the camel’s name, so let’s rename it to make it more obvious that it’s a mock implementation, and follow the style of the file names in this series of blogs:
mv IMessagingClient.go mockmessagingclient.go
Copy the code
We may need to tweak the import statement slightly in the generated file and remove some aliases. Beyond that, we’ll take a black-box approach to the simulation implementation – just assuming it will work when we start testing.
Take a look at the source code for the mock implementation generated here as well, which is very similar to what we wrote manually in Chapter 4.
Add a new test case to handlers_test.go:
Var anyString = mock.AnythingOfType("string")
var anyByteArray = mock.AnythingOfType("[]uint8") / / = = [] byte func TestNotificationIsSentForVIPAccount (t * testing in t) {/ / configure DBClient simulation realization mockRepo On ("QueryAccount"."10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil)
DBClient = mockRepo
mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil)
MessagingClient = mockMessagingClient
Convey("Given a HTTP req for a VIP account", t, func() {
req := httptest.NewRequest("GET"."/accounts/10000", nil)
resp := httptest.NewRecorder()
Convey("When the request is handled by the Router".func() {
NewRouter().ServeHTTP(resp, req)
Convey("Then the response should be a 200 and the MessageClient should have been invoked".func() {
So(resp.Code, ShouldEqual, 200)
time.Sleep(time.Millisecond * 10) // Sleep since the Assert below occurs in goroutine
So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue)
})
})})
}
Copy the code
The details are in the notes. Here, TOO, I don’t like the idea of artificially sleeping for 10 ms before asserting the post-state of numberOfCalls, but since the simulation is called in a coroutine separate from the “main thread,” we need to let it hang for a little while while the main thread completes some work. It is also hoped that there will be a better idiomatic way to unit test coroutines and channels.
I admit – the process of using this way of testing is more verbose than using Mockito when writing unit test cases for Java applications. Still, I found it readable and easy to write.
Then run the test and make sure it passes:
go test. /...Copy the code
run
Start by running the SpringCloud.sh script to update the configuration server. Then run copyall.sh and wait a few seconds for it to finish updating our “Account Service”. Then we use curl to get our “special” account.
> curl http://$ManagerIP:6767/accounts/10000
{"id":"10000"."name":"Person_0"."servedBy":"10.255.0.11"}
Copy the code
Hopefully, we should be able to open the RabbitMQ admin console. Then see if we get a message on the queue named vipQueue:
Open http://192.168.99.100:15672/#/queues
Copy the code
At the bottom of the figure above, we see that “vipQueue” has 1 message. Call the “Get Message” function in the RabbitMQ management console again and we should see this Message:
Write consumers on Go – “VIP Service”
Finally, it’s time to write a new microservice from scratch that will show how to use RabbitMQ messages. We’ll apply everything we’ve learned so far in this series, including:
- The HTTP server
- Performance monitoring
- Centralized configuration
- Reuse messaging mechanism code
If you have executed Git Checkout P9, you should see “vipService” in the root/goblog folder.
I won’t go through every line here, because there are parts of it that duplicate “AccountService.” We’ll focus on the “consumption style” of the message we just sent. A few caveats:
- At this point two new.yml files have been added to config-repo. They are vipservice-dev.yml and vipservice-test.yml*. *
- Copyall.sh is also updated to build and deploy “AccountService” and our newly written “VipService.”
Consume a message
We’ll use the code in the /goblog/common/ Messaging and SubscribeToQueue functions, for example:
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
Copy the code
To do this we need to provide these parameters:
- Queue name (for example, “vip_queue”)
- Consumer name
- A callback function that is called when a message from the response queue is received – just as we consumed configuration updates in Chapter 8
There’s nothing to be said for the implementation of the SubscribeToQueue function that binds our callback function to the queue. Here’s the source code, if you need to take a look.
Let’s take a quick look at the VIP service main.go to see how we set things up:
var messagingClient messaging.IMessagingConsumer
func main() {
fmt.Println("Starting " + appName + "...")
config.LoadConfigurationFromBranch(viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch")) initializeMessaging() // Make sure to close the connection to handleSigterm(func() {
ifmessagingClient ! = nil { messagingClient.Close() } }) service.StartWebServer(viper.GetString("server_port")} // Upon receipt"vipQueue"Func onMessage(delivery amqp.delivery) {fmt.printf ("Got a message: %v\n", string(delivery.Body))
}
func initializeMessaging() {
if! viper.IsSet("amqp_server_url") {
panic("No 'broker_url' set in configuration, cannot start")
}
messagingClient = &messaging.MessagingClient{}
messagingClient.ConnectToBroker(viper.GetString("amqp_server_url"))
// Call the subscribe method with queue name and callback function
err := messagingClient.SubscribeToQueue("vip_queue", appName, onMessage)
failOnError(err, "Could not start subscribe to vip_queue")
err = messagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent)
failOnError(err, "Could not start subscribe to " + viper.GetString("config_event_bus") + " topic")}Copy the code
Familiar, right? We will most likely return to the method of setting up and launching the microservices we add in later chapters. This is also part of the basics.
The onMessage function logs only the body of any “VIP” messages we receive. If we were to implement more simulation use cases, we would have to introduce some fancy logic to determine whether the account holder qualifies for the “super-awesome Buy all our Stuff (TM)” treatment, and possibly write to the “VIP Offer database” as well. If you’re interested, try implementing this logic and submit a pull Request.
One last word about this code. With the help of this code, we can press Ctrl + C to kill an instance of a service, or we can wait for Docker Swarm to kill an instance of a service.
func handleSigterm(handleExit func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
handleExit()
os.Exit(1)
}()
}
Copy the code
This code is no more readable than the others, and all it does is register pipe “C” as a listener for os.interrupt and syscall.sigterm, and obstructively listen for messages on “C” until an arbitrary signal is received. This allows us to be sure that the handleExit() function here will be called whenever an instance of the microservice is killed. If you are still not sure, try Ctrl + C or Docker Swarm Scaling. The kill command also works, but kill -9 does not. Therefore, it is best not to end anything with kill -9 unless absolutely necessary.
The handleExit() function calls the Close() function we declared on the IMessageConsumer interface, which ensures that the AMQP connection is closed properly.
Deploy and run
The copyall.sh script here has been updated. If you follow the steps above and ensure consistency across Github’s P9 branch, you’re ready to run. After deployment, executing the Docker service ls should print something like this:
> docker service ls
ID NAME REPLICAS IMAGE
kpb1j3mus3tn accountservice 1/1 someprefix/accountservice
n9xr7wm86do1 configserver 1/1 someprefix/configserver
r6bhneq2u89c rabbitmq 1/1 someprefix/rabbitmq
sy4t9cbf4upl vipservice 1/1 someprefix/vipservice
u1qcvxm2iqlr viz 1/1 manomarks/visualizer:latest
Copy the code
Or you can use the Dvizz Docker Swarm service renderer to check the status:
Check the log
Since the Docker service logs feature was marked as an experimental feature in 1.13.0, we must look at the “vipService” logs in the old-fashioned way.
First, execute docker ps to find the CONTAINER ID:
> docker ps
CONTAINER ID IMAGE
a39e6eca83b3 someprefix/vipservice:latest
b66584ae73ba someprefix/accountservice:latest
d0074e1553c7 someprefix/configserver:latest
Copy the code
Make a note of the CONTAINER ID of vipService and execute docker logs -f to check its logs:
> docker logs -f a39e6eca83b3
Starting vipservice...
2017/06/06 19:27:22 Declaring Queue ()
2017/06/06 19:27:22 declared Exchange, declaring Queue ()
2017/06/06 19:27:22 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus')
Starting HTTP service at 6868
Copy the code
Open another command line window and curl our special account object.
> curl http://$ManagerIP:6767/accounts/10000
Copy the code
If all is well, we should see the response queue message in the log of the original window.
Got a message: {"accountId":"10000"."readAt":"The 2017-02-15 20:06:27. 033757223 + 0000 UTC"}
Copy the code
The work queue
The pattern used to distribute work across service instances leverages the concept of work queues. Each “VIP message” should be handled by a “VipService” instance.
So let’s see what happens when we extend “vipService” to two instances using the Docker service scale command:
> docker service scale vipservice=2
Copy the code
The new “VipService” instance should be ready to deploy in seconds.
Since we use the direct/queue sending method in AMQP, we should see a round-robin distribution situation. Delete from curl: delete from curl: delete from curl
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
> curl http://$ManagerIP:6767/accounts/10000
Copy the code
Check our original “vipService” log again:
> docker logs -f a39e6eca83b3
Got a message: {"accountId":"10000"."readAt":"The 2017-02-15 20:06:27. 033757223 + 0000 UTC"}
Got a message: {"accountId":"10000"."readAt":"The 2017-02-15 20:06:29. 073682324 + 0000 UTC"}
Copy the code
As expected, we see that the first instance processes two of the four messages. If we execute a Docker logs on another “vipService” instance, we should see two messages there as well.
Test consumers
In fact, I didn’t really come up with a good way to unit test AMQP consumers without spending a lot of time emulating an AMQP library. A test is prepared in messagingClient_test.go to test the subscriber’s wait for incoming messages and a loop of processing. But it’s not worth mentioning.
In order to test the messaging mechanism more fully, I will probably review the topic of integration testing in a future blog post. Use the Docker Remote API or Docker Compose for go tests. The test will start services such as RabbitMQ that allow us to send and receive messages in the test code.
Footprint and performance
We’re not doing performance testing this time. A quick glance at memory usage after sending and receiving some messages is sufficient:
The CONTAINER CPU % MEM USAGE/LIMIT vipservice. 1. 0.00% tt47bgnmhef82ajyd9s5hvzs1 MiB / 1.859 1.955 GiB Accountservice. 1. 0.00% w3l6okdqbqnqz62tg618szsoj MiB / 1.955 3.434 GiB the rabbitmq. 1. 0.51% i2ixydimyleow0yivaw39xbom 129.9 MiB / 1.955 GiBCopy the code
This is the memory usage after processing a few requests. The new “VipService” is not as complex as “AccountService,” so it should use less memory after startup.
The profile
This is probably the longest part of the series! We’ve done this in this chapter:
- Take a closer look at RabbitMQ and AMQP protocols.
- Added a new “vipService”.
- Extract code related to messaging (and configuration) into reusable subprojects.
- Publish/subscribe messages based on AMQP protocol.
- Simulation code is generated using SHANzhai.
Question and answer
Microservices Architecture: How is data sharing across services implemented?
reading
Communicate between microservices
RabbitMQ and AMQP protocol
Building microservices using Akka HTTP: the CDC approach
Has been authorized by the author tencent cloud + community release, the original link: https://cloud.tencent.com/developer/article/1149121?fromSource=waitui
Welcome toTencent Cloud + communityOr pay attention to the wechat public account (QcloudCommunity), the first time to get more massive technical practice dry goods oh ~