This is the 21st day of my participation in the August More Text Challenge
I made up a hole in writing a test solution for multiple versions in parallel.
This article focuses on service reuse and isolation in Kafka; The main problem to be solved is that in multiple iterative environments; Enabling both providers and consumers to send and consume messages correctly; This is a little more complicated than Dubbo’s service routing and isolation
1. Problem description
Concept description: Stable version: ABC is a set of stable services shared globally. Iteration version: A1, C1, C2 belong to the iteration version of their corresponding system. For example, A new iteration service A1 is deployed based on the requirement change of system A.
Requirements: MQ provider services provide outgoing messages that are consumed by as many consumers of the same version as possible;
1.1. The entry point is stable service
Above, suppose the entry is stable service A, sending A message; Then ABC is consumed by each other in the message link; Nothing to do with the iteration
1.2. The entry point is iterative services
In the figure above, assume that the entry is the iterating service A1 sending a message; In the whole link, the services of the same iteration version should be consumed as much as possible.
- A1 message
A1 sends a message; If we look for system B and find that there is only stable VERSION B and no iteration version, we will let B consume. A1 sends a message; C also has a subscription, and then found that C system has iteration C1, which is the same as A1 version, let C1 consume; C and C2 don’t consume; 2. User B sends a message. User B also sends a message after consuming the message from A1. System A has consumption, so the message sent by B should be consumed by A1 instead of A. In the same way, C1 should consume instead of C or C2
- C1 message
C1 sends a message to A1 to consume; C1 sends a message to B to consume;
1.3. The dubbo service comes in an iteration
Above D1 calls THE DUbbo interface of B and passes the version number; B The message sent at this time also belongs to the iterative message; It’s the same as 2;
2. Solutions
We explained how to do this in Dubbo in a previous article; Reroute dubbo based on version via SPI;
In Kafka, however, there is no such thing as consumer routing, so there is no control over which service consumes the message;
So below, I give some solutions of my own, if you feel that there is a problem, welcome criticism and correction;
Design scheme:
Key steps of the scheme:
- When sending a message, add Version information to the Header
- Send a message Send two messages with the same message body but different Topic. The Topic of the iteration message is prefixed with VERSION: the corresponding VERSION _
- The iteration service starts with javaAgent to modify all listening topics. Prefix VERSION: corresponding VERSION _
- The iteration message corresponding to the iteration service consumption
- Check whether the stable service needs to consume messages. Check whether the current message Header does not contain Version. Directly consume the current message Header that contains Version, and then check whether the corresponding iteration service exists. 6. To consume a message, save the Version to ThreadLocal. 7. When using ThreadLocal, there are problems with value transfer in the case of thread pools. TransmittableThreadLocal with JavaAgent Whole-process code 0 intrusion; Kafka’s two interceptors are enhanced and configured using Javaagent
How do I determine if an iteration service exists
In the design above, the Kafka Consumner interceptor has two ways of deciding whether to consume
1. Method 1: For current news consumption group currentGroupId = KafkaUtils. GetConsumerGroupId () for all consumer groups adminClient. ListConsumerGroups () and then look for all consumer groups have VERSION:1_currentGroupId consumer group; If so, the message will be consumed by the iterative service. Stable environment does not need to consume; There is also an important part, that is, how to make all the consumer group names of the iteration service prefix, of course, or through javaAgent to enhance it, find the appropriate modification point, modify the consumer group name; The appropriate modification point is naturally where the consumer group name is configured; There is a unified consumer group name; Each Listener can also be configured with a separate consumer group name. Find the Listener annotation to enhance; Disadvantages: One disadvantage of this approach is that if the iterative service is down, the message will be asked for stable service consumption;
2. Method 2 (recommended) Reads an external configuration that maintains which service has the iterated service; That’s convenient; Disadvantages: It is necessary to maintain such a configuration advantage: avoids the disadvantages of method 1; There is no need to use JavaAgent to change the consumer group name;
3. Problems needing attention
When we pass version, the entry is usually an HTTP interface; However, if the entry point is not HTTP, it is internal to the system, so that the external version information can not pass in;
Name A situation in the travel industry: A: order service B: order service C: order/driver service
In A requirement, A and C are changed; B. There are iterative services A1, C1; Assuming they communicate using MQ; We expect the following flow A1 —->B—–>C1
However, A1 tells B that there is an order coming in, and B will save the information given by A1 into Redis. B: there is a thread that keeps retrieving data from Redis for matching with the driver. Send a message after the match is successful. This link to B breaks down; If you save the version of ThreadLocal to redis, there is no further operation. The matching thread of B gets the stable version; The message sent by the successful natural match is the stable message. So instead of receiving C1, you receive C;
How to solve this type of problem;
In this case, we should also make an iteration of B, B1; Then the flow path is A1-B1-C1; That’s right;
Also note: DB isolation;