Do you really understand data migration?

Daily operation and maintenance, troubleshooting => Didi Open source LogiKM one-stop Kafka monitoring and control platform

If you don’t want to read the article, you can watch the accompanying video. (The following videos will be uploaded to the public account, CSDN, B station and other platforms with the same name [Shi Zhenzhen’s grocery store])

This video is about:

  1. Tutorial on partitioning copy redistribution
  2. Redistribution considerations
  3. LogiKm simplifies the redistribution process and makes it more efficient
  4. Migration across replicas

Partition copy reassignment operation + Precautions + simplify operation with LogIKM

Script parameters

parameter describe example
--zookeeper Connect the zk --zookeeper localhost:2181, localhost:2182
--topics-to-move-json-file Specifies a JSON file with the topic configuration content --topics-to-move-json-file config/move-json-file.json The Json file format is as follows:
--generate An attempt is made to give a policy for replica redistribution, but the command does not actually execute
--broker-list Specify BrokerList to try to give an allocation policy, and--generate Collocation is used --broker-list 0,1,2,3
--reassignment-json-file Specifies the JSON file to reassign, and--executeCollocation is used The format of a JSON file is as follows:
--execute Begins to perform the reassignment task, and--reassignment-json-fileCollocation is used
--verify Check whether the task is successfully executed--throttleIf traffic is limited, this command also removes the limit. This command is very important. Not removing the flow limiting will affect the normal synchronization between replicas
--throttle Migration Process The rate at which processes are now transferred between brokers, in bytes/ SEC -- throttle 500000
--replica-alter-log-dirs-throttle Inter-path migration of intra-broker copies Data traffic limiting function: limits the bandwidth limit of data copying from one directory to another in bytes/ SEC --replica-alter-log-dirs-throttle 100000
--disable-rack-aware If rack awareness is disabled, no rack information is referenced during allocation
--bootstrap-server This parameter is mandatory for cross-path replica migration

1. Introduction to the script

This script is a tool that Kafka provides to reassign partitions;

1.1 Generating the Recommended Configuration Script

The key parameters--generate

Before reassigning a partition copy, it is best to obtain a properly allocated file in the following manner; Write a move-json-file.json file; This file is the calculation that tells you which topics you want to reassign

{
  "topics": [
    {"topic": "test_create_topic1"}
  ],
  "version": 1
}
Copy the code

Then execute the following script,–broker-list “0,1,2,3”, which parameters you want to assign to Brokers;

sh bin/kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json - broker - list "0,1,2,3" -- the generate

It’s printed when it’s done

Current partition replica Assignment // Current partition replica assignment {"version": 1,"partitions": [{"topic":"test_create_topic1"."partition": 2."replicas": [1]."log_dirs": ["any"] {},"topic":"test_create_topic1"."partition": 1,"replicas": [3]."log_dirs": ["any"] {},"topic":"test_create_topic1"."partition": 0."replicas": [2]."log_dirs": ["any"]}]} Proposed partition reassignment configuration {"version": 1,"partitions": [{"topic":"test_create_topic1"."partition": 2."replicas": [2]."log_dirs": ["any"] {},"topic":"test_create_topic1"."partition": 1,"replicas": [1]."log_dirs": ["any"] {},"topic":"test_create_topic1"."partition": 0."replicas": [0]."log_dirs": ["any"]}]}
Copy the code

It is important to note that the partition movement has not yet started, it just tells you the current allocation and recommendations. Save the current allocation in case you want to roll it back

1.2. Execute the Json file

The key parameter –execute saves the desired reassignment mode file in a JSON file reassignment-json-file.json


{"version": 1,"partitions": [{"topic":"test_create_topic1"."partition": 2."replicas": [2]."log_dirs": ["any"] {},"topic":"test_create_topic1"."partition": 1,"replicas": [1]."log_dirs": ["any"] {},"topic":"test_create_topic1"."partition": 0."replicas": [0]."log_dirs": ["any"]}]}

Copy the code

Then perform

sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute

Kafka provides traffic limits for replicas between brokers, limiting the bandwidth limit for replicas to travel from one machine to another, which is useful when rebalancing the cluster, introducing new brokers, and adding or removing brokers. The impact on users is guaranteed because it limits these intensive data operations, such as our migration operation above with a stream-limiting option — Throttle 50000000

> sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000
Copy the code

You can see that you are throttling all the data you are throttling 50000000 B/s

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.
Copy the code

It is important to note that if you transfer copy contains cross paths when transfer (the same Broker multiple paths) so this current limiting measures will not take effect, you need to add | – up – the alter – log – dirs – throttle This traffic limiting parameter limits traffic limiting for direct migration between different paths within the same Broker.

If you want to change the limit during rebalancing, increase the throughput so that it completes faster. You can re-run the execute command with the same reassignment-json-file

1.3. Verify

The key parameter –verify is used to check the status of partition reallocation and the -throttle limit is removed. Otherwise, the traffic of periodic replication operations may be limited.

sh bin/kafka-reassign-partitions.sh --zookeeper xxxx:2181 --reassignment-json-file config/reassignment-json-file.json --verify

Note: If BrokerId does not exist, this copy will fail, but the rest will not be affected; For example,

2. Copy expansion

Kafka does not provide a dedicated script for scaling replicas, unlike the Kafka-topic.sh script, which can be scaled to partitions. Want to expand the copy, can only be the curve to save the country; Use kafka-reassign-partitions. Sh to reassign copies

2.1 Copy Capacity Expansion

Let’s say we are in a situation where we have 3 partitions and 1 copy. To provide availability, I want to increase the number of copies to 2;

2.1.1 Calculating the copy allocation mode

We use step 1.1’s –generate to get the current allocation and get the following JSON

{
	"version": 1."partitions": [{
		"topic": "test_create_topic1"."partition": 2."replicas": [2]."log_dirs": ["any"] {},"topic": "test_create_topic1"."partition": 1."replicas": [1]."log_dirs": ["any"] {},"topic": "test_create_topic1"."partition": 0."replicas": [0]."log_dirs": ["any"]]}}Copy the code

If we want to make all replicas 2, we can change the value in “Replicas “: [], which contains the list of brokers and the Leader first. So we modify the JSON file to look like this according to the allocation rules we want

{
	"version": 1."partitions": [{
		"topic": "test_create_topic1"."partition": 2."replicas": [2.0]."log_dirs": ["any"."any"] {},"topic": "test_create_topic1"."partition": 1."replicas": [1.2]."log_dirs": ["any"."any"] {},"topic": "test_create_topic1"."partition": 0."replicas": [0.1]."log_dirs": ["any"."any"]]}}Copy the code

The number of replicas in log_dirs must match the number of replicas. Or remove the log_dirs option. This log_dirs is the absolute path where copies are migrated across paths

2.1.2 – execute

If you want to change the limit during rebalancing, increase the throughput so that it completes faster. You can re-run the execute command with the same reassignment-json-file:

2.1.2 validation – verify

When it’s done, the number of copies increases;

2.2 Copy reduction

Copy capacity reduction and expansion is the same meaning; When the number of copies allocated is less than the previous number, the extra copies will be deleted; For example, I just added a new copy, want to restore to a copy

Execute the following JSON file

{
  "version": 1."partitions": [{
    "topic": "test_create_topic1"."partition": 2."replicas": [2]."log_dirs": ["any"] {},"topic": "test_create_topic1"."partition": 1."replicas": [1]."log_dirs": ["any"] {},"topic": "test_create_topic1"."partition": 0."replicas": [0]."log_dirs": ["any"]]}}Copy the code

After executing, you can see that the other copies are marked for deletion; It’ll be cleaned up in a minuteIn such a way, although we have achieved the expansion of the volume of copies, but the allocation of copies needs to be controlled by ourselves, to achieve load balancing and so on; That must be no Kafka automatically help us to allocate a bit more reasonable; So do we have any good ways to give us a properly allocated Json file? PS:

We have previously analyzed [Kafka source] when creating a Topic is how partition and copy of the allocation rules so we allocate such a process with the same rules to the allocation of Ok? – generate nature also call the method, AdminUtils. AssignReplicasToBrokers (brokerMetadatas, the assignment. The size, the replicas. The size)

For specific implementation operations, see “Kafka Thinking” : The minimal cost of scaling copy design

Write your own project to implement a similar approach. If you find it difficult, you can use LogIKM’s new copy function to do it for you. (The future will come)

3. Expand the partition capacity

The kafka partition expansion is implemented by the kafka-topis.sh script. Volume reduction is not supported

Partition expansion please see [kafka source] TopicCommand alter source parsing (partition expansion)

4. Migrate partitions

Partition migration is the same as above, see section 1.1,1.2,1.3;

5. Copies are migrated across paths

Why is it that online Kafka machines are often “lopsided” with unevenly distributed disk usage? This is because Kafka only ensures that the number of partitions is evenly distributed across the disks, but it has no way of knowing how much space each partition actually takes, so it is possible that some partitions will have a large number of messages that will consume a large amount of disk space. Before 1.1, there was nothing you could do about this, because Kafka only supported the redistribution of partitioned data between brokers, not between disks on the same broker. Version 1.1 officially supports migration of copies between different paths

How do you store partitions in multiple paths on a Broker?

You just need to connect multiple folders to the configuration

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=kafka-logs-5,kafka-logs-6,kafka-logs-7,kafka-logs-8

Copy the code

Note that different paths on the same Broker only store different partitions, not copies on the same Broker; Otherwise the copy is meaningless (disaster recovery)

How about cross-path migration?

The migrated JSON file has a parameter log_dirs; The default is “log_dirs” if the request is not passed: [“any”] (the number of arrays must be the same as the number of copies) but if you want cross-path migration, just fill in the absolute path here, such as below

Example of a migrated JSON file

{
  "version": 1."partitions": [{
    "topic": "test_create_topic4"."partition": 2."replicas": [0]."log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-5"] {},"topic": "test_create_topic4"."partition": 1."replicas": [0]."log_dirs": ["/Users/xxxxx/work/IdeaPj/source/kafka/kafka-logs-6"]]}}Copy the code

And then execute the script

sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx --reassignment-json-file config/reassignment-json-file.json --execute --bootstrap-server
xxxxx:9092 --replica-alter-log-dirs-throttle 10000
Copy the code

Note — Bootstrap-server must pass this parameter in case of cross-path migration

If need current limit Combined with parameter | – up – the alter – log – dirs – throttle; Unlike –throttle, replica-alter-log-dirs-throttle limits the migration traffic of different paths within the Broker.

The source code parsing

Source code parsing see article ReassignPartitionsCommand kafka source 】 【 source code analysis (copy of scalability, data migration, partition redistribution, copy across the migration path)







Didi Open source Logi-Kafkamanager one-stop Kafka monitoring and control platform