This is the 21st day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

A, components,

Impala is a distributed, massively parallel processing (MPP) database engine that includes multiple processes.

Impala is similar to Hive, not a database but a data analysis tool

Execute in linux123

$ ps -ef | grep impala
Copy the code

As shown in figure:

  1. impalad
  • The role name isImpala Daemon, is the process running on each node, isImpalaThe process name of the core component of theImpalad;
  • Role: Responsible for reading and writing data files and receiving data fromImpala-shell , JDBC , ODBCQuery requests, and cluster othersImpaladThe query tasks are completed in parallel and distributed, and the query results are returned to the central coordinator.
  • In order to ensure thatImpaladProcesses know othersImpaladOf their health,ImpaladThe process will always be withstatestoreKeep up the correspondence.
  • ImpaladThe service consists of three modules:Query Planner,Query CoordinatorQuery ExecutorThe first two modules make up the front end and are responsible for receivingSQLQuery request, parseSQLIt is converted into an execution plan and sent to the back end for execution.
  1. statestored
  • statestoreMonitoring clusterImpaladAnd synchronize the cluster health information toImpalad
  • statestoreThe process is calledstatestored
  1. catalogd
  • ImpalaperformSQLWhen the statement causes metadata to change,catalogThe service is responsible for synchronizing these metadata changes to othersImpaladProcesses (log validation, monitoringstatestoreProcess log)
  • catalogThe process name of the service iscatalogd
  • Because a cluster needs onecatalogdAnd astatestoredProcess, andcatalogdThe process passes all requestsstatestoredProcess sent, so the official recommendation to letstatestoredProcess andcatalogdProcesses schedule the same node.

Second, the query

The query process is as follows:

  1. ClientSubmit a task

The Client sends an SQL query request to any Impalad node and returns a queryId for subsequent Client operations.

  1. Generate standalone and distributed execution plans

After SQL is submitted to Impalad node, Analyser performs lexical analysis, syntax analysis and semantic analysis of SQL successively. Get the metadata from the MySQL meta-database and the data address from the HDFS name node to get all the data nodes that store the data related to this query

  • Single machine execution plan: Based on the previous stepSQLAnalysis of statements byPlannerMr. Single execution plan, the execution plan is availablePlanNodeThis process will also perform some of the tree compositionSQLOptimization, for exampleJoinOrder changes, predicates push down, etc.
  • Distributed parallel physical plan: A single machine execution plan is converted into a distributed parallel physical execution plan. The physical execution plan consists of oneFragmentComposition,FragmentThere are data dependencies between them, and some of them need to be added to the original execution planExchangeNodeDataStreamSinkInformation, etc.
  • Fragment : sqlA subtask of the generated distributed execution plan;
  • DataStreamSink: Transfers the currentFragmentOutput data to different nodes
  1. Task scheduling and distribution

Coordinator Sends fragments to different Impalad nodes for execution based on data partition information. The Impalad node receives the Fragment request and sends it to the Executor for execution.

  1. FragmentData dependencies between

The execution output of each Fragment is sent from the DataStreamSink to the next Fragment. During the Fragment running, it does not continuously report the current running status to the Coordinator node.

  1. The results summary

SQL query usually requires a separate Fragment to summarize results. The Fragment runs only on a Coordinator node and converts the final execution results of multiple nodes into ResultSet information.

  1. To get the results

The client invokes the interface that obtains the ResultSet to read the query result.

(1) Query plan example

select t1.n1, t2.n2, count(1) as c
from t1 join t2 on t1.id = t2.id
join t3 on t1.id = t3.id
where t3.n3 between'a'and'f'group by t1.n1, t2.n2
order by c desc
limit 100;
Copy the code

(2) Single-node execution plan

QueryPlanner generates a standalone execution plan.

As shown in figure:

Analyze the single machine execution plan above

  1. The data required in t1 table is scanned first. If the data file is stored in column format, the required column ID can be easily scanned

  2. N1 needs to perform Join operation with T2 table, and t2 table is similar to T1 table to obtain the required data column ID, N2

  3. T1 is associated with t2, and then with T3, where Impala uses predicates to push down and scan T3 for only the data required for join

  4. Aggregation is performed on group BY, and a specified amount of data is eventually sorted and returned.

(3) Distributed parallel planning

The so-called distributed parallel execution plan: it is based on the single execution plan combined with the characteristics of distributed data storage, according to the calculation requirements of the task to divide the single execution plan into multiple sub-tasks, each sub-task can be executed in parallel.

The single execution plan above was converted to a distributed parallel execution plan.

Distributed parallel execution plan, as shown in figure:

Flow chart, as follows:

A distributed execution plan involves joining multiple tables, and Impala determines the Join mode based on the size of the table.

There are two types of Hash Join and Broadcast Join

As can be seen from the above distributed execution plan, table T1 and T2 are larger and table T3 is smaller. Therefore, Hash Join is used for Join Impala of T1 and T2. Broadcast is used for Join Impala of T3. Broadcast the T3 table directly to the node where you want to Join.

Distributed parallel planning process:

  1. T1 and T2 use Hash Join. In this case, T1 and T2 need to be split into different Impalad processes based on their ID values, but the same ID is hashed to the same Impalad process, so that each join is followed by a part of the total data

  2. The result data after T1 and T2 Join is then joined to T3. In this case, T3 uses Broadcast mode to Broadcast all its data (ID column) to the Impala node

  3. After T1, T2, and T3 Join, local pre-aggregation is performed according to Group BY. The pre-aggregation result of each node is only a part of the final result (different nodes may have the same Group BY value), so global aggregation needs to be performed again.

  4. The Merge operation is performed on different nodes based on the Hash of the Merge column. In general, Impala selects the previous local aggregation node to perform the global aggregation in order to reduce network transfer of data.

  5. After global aggregation, the same key exists on only one node. After sorting and TopN calculation for each node, the result of each global aggregation node is returned to the Coordinator for merge, sort, and limit calculation, and the result is returned to the user.