1. Data preparation

This document describes the Spark SQL multi-table join. You need to prepare test data in advance. Create employee and department datafames and register them as temporary views as follows:

val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()

val empDF = spark.read.json("/usr/file/json/emp.json")
empDF.createOrReplaceTempView("emp")

val deptDF = spark.read.json("/usr/file/json/dept.json")
deptDF.createOrReplaceTempView("dept")
Copy the code

The main fields of the two tables are as follows:

Emp table | - ENAME: employee's name | - DEPTNO: department number | - EMPNO: employee number | - HIREDATE: induction time | - JOB: | - MGR position: the superior Numbers | - SAL: Salary | - COMM: bonusCopy the code
Table | - DEPTNO dept department: the department number | - DNAME: department name | - LOC: department of the cityCopy the code

Emp. json, dept.json can be downloaded in the resources directory of this repository.

2. Connection type

Spark supports multiple connection types:

  • Inner Join;
  • Outer Join Full Outer Join;
  • Left Outer Join Left Outer Join;
  • Right Outer Join: Right Outer Join;
  • Left Semi Join:
  • Left anti-join: Left anti-join;
  • Natural Join
  • Cross (or Cartesian) Join:

The inner connection, outer connection and Cartesian product are the same as those in ordinary relational database, as shown in the figure below:

The left half join and the left reverse join are equivalent to the IN and NOT IN statements IN a relational database:

-- LEFT SEMI JOIN
SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno
-- equivalent to the following IN statement
SELECT * FROM emp WHERE deptno IN (SELECT deptno FROM dept)

-- LEFT ANTI JOIN
SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno
-- equivalent to the following IN statement
SELECT * FROM emp WHERE deptno NOT IN (SELECT deptno FROM dept)
Copy the code

Sample code for all connection types is as follows:

2.1 INNER JOIN

// 1. Define the join expression
val joinExpression = empDF.col("deptno") === deptDF.col("deptno")
// 2
empDF.join(deptDF,joinExpression).select("ename"."dname").show()

// The equivalent SQL is as follows:
spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

2.2 FULL OUTER JOIN

empDF.join(deptDF, joinExpression, "outer").show()
spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

2.3 the LEFT OUTER JOIN

empDF.join(deptDF, joinExpression, "left_outer").show()
spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

2.4 RIGHT OUTER JOIN

empDF.join(deptDF, joinExpression, "right_outer").show()
spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

2.5 LEFT SEMI JOIN

empDF.join(deptDF, joinExpression, "left_semi").show()
spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

2.6 LEFT ANTI the JOIN

empDF.join(deptDF, joinExpression, "left_anti").show()
spark.sql("SELECT * FROM emp LEFT ANTI JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

2.7 CROSS JOIN

empDF.join(deptDF, joinExpression, "cross").show()
spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

2.8 NATURAL JOIN

A natural join is to look for fields in both tables that have the same data type and column names, and then automatically join them and return all results that match the criteria.

spark.sql("SELECT * FROM emp NATURAL JOIN dept").show()
Copy the code

The result of a natural join query is as follows. The result of a natural join query is as follows:

spark.sql("SELECT * FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
Copy the code

Natural connections are not recommended because they often produce unexpected results.

Execution of connection

Shuffle Join is triggered when a large table is connected to a large table. All partitioned nodes of the two tables communicate with each other in an all-to-all manner. This query is usually expensive and imposes a heavy burden on network I/OS.

If the data amount of the small table is smaller than the memory space of the Worker Node, Spark will consider broadcasting the data of the small table to each Worker Node and perform join calculation inside each working Node. This reduces the IO of the network, but increases the CPU burden of each Worker Node.

Whether to use broadcast Join mode depends on the program’s internal judgment on small tables. To explicitly use broadcast Join mode, you can use the broadcast method in the DataFrame API to specify the small tables to be broadcast:

empDF.join(broadcast(deptDF), joinExpression).show()
Copy the code

The resources

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02

See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series