This article, from OPPO Internet Technology team, is the third in a series of articles on Spark Streaming and TiSpark.
Series 1: Analyze Hadoop shards of Spark data partitions
Series 2: Analyze the Spark RDD partition of Spark data partition
1. Kafka +Spark Streaming
Spark Streaming Receives data from Kafka and converts it to DStream, or discrete data stream, in Spark Streaming.
There are two ways to receive data:
- The old method of receiving using Receiver;
- The new method of using Direct pull (introduced in Spark 1.3);
1.1 the Receiver way
Currently, Spark does not support this mode.
Receiver model of parallelism by spark. Streaming. BlockInterval decision, default is 200 ms.
In Receiver mode, block. Batch data is encapsulated into an RDD. The blocks correspond to partitions in the RDD.
BatchInterval Under certain circumstances:
- Reducing spark.streaming.Interval increases the number of partitions in DStream.
- Spark.streaming.Interval must be at least 50ms.
1.2 Direct way
Spark creates as many RDD partitions as Kafka partitions and reads data from Kafka in parallel. So there is a one-to-one mapping between Kafka partition and RDD partition.
DirectKafkaInputDStream periodically generates RDD of type KafkaRDD.
Let’s start by looking at how KafkaRDD partitions:
It generates KafkaRDDPartition based on the offset information parameter received from initialization. Each partition corresponds to a section of data for a Topic partition in Kafka. This section of data is represented by OffsetRange, which holds the location of the data.
Compute: DirectKafkaInputDStream compute: DirectKafkaInputDStream compute
We can see from the source code analysis: Partition calculation method is to create an OffsetRange for each Partition of the topic, all OffsetRange generated a KafkaRDD.
KafkaRDD’s getPartitions method
Each OffsetRange generates a Partition.
How to increase the number of RDD partitions so that each partition can process more data?
Through source code analysis, can be adjusted to small Kafka message Topic partition number; You can increase the parallelism of RDD by increasing the number of Topic partitions in Kafka messages.
2. TiSpark
2.1 TiDB architecture
TiDB cluster is divided into three components:
TiDB Server is responsible for receiving SQL requests, processing SQL related logic, and finding the TiKV address required for storage calculation through PD, obtaining data through interaction with TiKV, and finally returning the structure.
TiDB Server does not store data, but only computes data. It can be extended to unlimited levels and access addresses provided by load balancing components such as LVS, HAProxy, and F5.
2.2 TiKV Server
TiKV is responsible for data storage and externally is a distributed key-value storage engine that provides things.
The basic unit of data storage is Region. Each Region is responsible for storing data in a Key Range (from StartKey to EndKey). Each TiKV node is responsible for multiple regions, and data load balancing among multiple TiKV is scheduled by PD. Scheduling is performed based on Region.
2.3 Region
TiDB data is distributed in regions. A Region contains a range of data, usually 96MB in size. The meta information of the Region contains StartKey and EndKey attributes.
When a key >= StartKey && key < EndKey: we know the Region where the key is located, and then we can read the key data from the Region by searching the TiKV address of the Region.
The Region where the key resides is obtained by sending a request to PD. GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error)
By calling this interface, we can locate the Region in which the key is located.
If you need to get multiple regions in a range: We call GetRegion several times, starting with the StartKey of the Region, and the EndKey of each Region is used as the StartKey of the next request until the EndKey of the returned Region is greater than the EndKey of the requested Region.
There is an obvious problem with the above execution process: every time we read data, we need to access PD first, which will bring great pressure to PD and affect the performance of the request.
To solve this problem, TiKV-Client implements a RegionCache component that caches Region information.
If the Region where the key resides needs to be located: If the RegionCache matches, PD access is not required.
RegionCache Contains two data structures to store Region information:
- The map; You can use map to quickly find regions based on region IDS.
- B-tree; With b-tree, you can find the Region containing the key based on a key.
Strictly speaking, Region information stored on PD is also a layer cache. The latest Region information is stored on the TiKV-server. Each TiKV-server decides when to split a Region.
When the Region changes, the PD reports the Region information to the TIDB-server. The PD uses the Region information to meet the tiDB-server query requirements.
After we obtain the Region information from the cache and send the request, tiKV-server verifies the Region information to ensure that the requested Region information is correct.
If the Region is split or migrated, Region information changes. The requested Region information will expire, and tiKV-server will return a Region error.
If we encounter a Region error, we need to clean the RegionCache, retrieve the latest Region information, and resend the request.
2.4 TiSpark Architecture diagram
TiSpark deeply integrates the Spark Catalyst engine, enabling Spark to efficiently read data stored in TiKV for distributed computing.
The getPartitions method in TiRDD is analyzed below:
Through source code analysis: First, splitRangeByRegion is used to obtain keyWithRegionTasks, and a TiPartition is created for each RegionTask.
The number of TiSpark partitions is the same as the number of TiKV regions. To improve the parallelism of TiSpark tasks, modify the following parameters:
- Region – Max – size;
- Region – the split – size;
3. The end
Through the analysis of the above situations, as long as we can correctly understand the relationship between partition and task in various scenarios, and then to the actual parameters of the impact of partition tuning, can also make the task of large data can also be fast, and can clearly answer the questions of data analysts.