takeaway

In a database system, when we receive a query request, the executor is responsible for parsing the SQL statement, generating the execution plan, and then implementing our query request step by step. Distributed database has distributed execution plan and has higher scalability compared with traditional stand-alone database. This article will introduce the distributed plan generation mechanism of NewSQL Cloud Stream database.

The database server can be divided into an Execution Engine and a Storage Engine. The executor is responsible for parsing SQL commands and executing queries. Database after receiving a query request, the need to parse SQL statements, this a series of text processing of structured data parsed into easy to program, then generate a logical implementation plan, and then converted to the physical storage structure of data related physical execution plan, obtain the required data from the target node, so as to complete the whole process of data query.

Logical plan vs. physical plan

Before an actuator can execute, it needs the support of a plan. Plans are divided into logical plans and physical plans. The relationship between logical planning and physical planning is just like that when we want to travel, what means of transportation we choose is equivalent to logical planning. In this step, for example, we choose an airplane. Choosing which airline is the physical equivalent of planning. Finally, when you actually go on a trip, it’s like performing. Figure 1 shows the basic architecture diagram for SQL statement execution, from which you can clearly see the optimizer and executor executing throughout the process.

Figure 1 basic framework diagram of SQL execution

The logical plan and physical plan are responsible for generating functions such as execution plans and index selection, and can be thought of as optimizers. For example, execute a join on two tables:

Select * from t1 join t2 using(ID) where t1.c = 10 and t2.d = 20;
Copy the code

It can either take out the ID of the record c=10 from T1 first, and then associate it with T2 according to the ID, and judge whether the value of D in T2 is equal to 20, or take out the ID of the record c=20 from T2 first. Then, the ID is associated with T1, and the value of D in T2 is equal to 10. The logic of the two execution methods is the same, but the efficiency of the execution is different, and the optimizer can determine the use of the estimated cost. In a distributed database, the physical plan can also determine on which node the operator is executed based on the node where the data span is to be used, thus achieving distributed execution. The physical plans associated with distribution are detailed in the distributed execution overview.

In terms of execution plans, cloud Stream databases are the same when logical plans are generated, and quite different when physical plans are generated from logical plans compared to stand-alone databases. When a physical plan is generated from a logical plan, table information and node information are added according to the data distribution of each node to generate a distributed physical plan.

Distributed execution

The key idea of distributed execution is how to change the logical execution plan to the physical execution plan, which mainly involves two aspects of processing, one is the distributed processing of computing, the other is the distributed processing of data.

Once the physical plan is generated, the system needs to break it down and distribute it to run among nodes. Each node is responsible for scheduling processors and Inputs locally. Nodes also need to be able to communicate with each other to connect the output router to the input. In particular, a Streaming interface is required to connect these components. To avoid additional synchronization costs, you need an execution environment that is flexible enough to accommodate all of these operations so that different nodes can start the corresponding data processing work independently of the initial scheduling of the execution plan, without being affected by other orchestration of the Gateway node.

The Gateway node in the database cluster creates a scheduler that takes a set of flows, sets input and output information, creates a local processor, and starts executing. As Node processes input and output data, we need to have some control over the Flow so that we can reject certain requests from the Request.

Each Flow represents a complete fragment of execution across nodes in the overall physical plan, made up of Processors and Streams that pull data from that segment, compute data, and eventually output data. As shown in Figure 2:

Figure 2 Flow execution Flow chart

For cross-node execution, the Gateway node will serialize the corresponding FlowSpec as SetupFlowRequest and send it to the remote node through GRPC. After receiving it, the remote node will restore the Flow. And create the processor and stream (TCP channel) to complete the construction of the execution framework, and then start the multi-node computing initiated and driven by the gateway node. The Flow is asynchronously scheduled through the Box cache pool to realize the parallel execution of the entire distributed framework.

For local execution, which is parallel execution, each processor, synchronizer, and Router can run as a Goroutine, interlinked by a channel. These channels buffer channels to keep producers and consumers in sync.

In order to realize distributed concurrent execution, the concept of Router is introduced in the database execution, and three data redistribution modes are realized according to the data distribution characteristics of complex operators such as JOIN and AGGREGATOR. These are mirror_router, hash_router, and range_router. Through data redistribution, the processor operator is divided into two stages of execution. In the first stage, part of the data is processed at the node where the data resides, and the results after processing are redistributed according to the operator type. Then, in the second stage, a single operator and multiple nodes are jointly executed.

Advantages of distributed execution planning

Compared with stand-alone databases, the distributed execution plan of Cloud Stream database makes the database have higher scalability, can effectively manage more data, and can support a larger data set with read and write.

The scalability of a single-node database is limited. In a single-node scenario, the database has great limitations in managing large-scale data, and the read and write efficiency decreases with the increase of table data.

However, in distributed execution plan, table data is scattered on multiple nodes, which greatly reduces the data amount of a single node. In the scenario of storing the same data amount, cloud Stream database can make full use of storage and computing resources of multiple nodes. In single-player scenarios, for example, for a complex table, may be in the amount of data from 5 million to 10 million lines efficiency attenuation is very serious, but in a distributed scenario, balanced load when the table is assigned to multiple nodes, each node of the actual data increment is not much, will not have a single scenario of a sudden decline in performance.

Distributed execution plan can also achieve read and write, distributed execution of computing. Each node can perform independent calculations on its own node, and synthesize the final results to obtain accurate calculation results. Distributed execution has significant advantages over standalone database execution in most scenarios.