A list,

1.1 Multi-data source support

Spark supports the following six core data sources. In addition, the Spark community provides hundreds of data source reading methods, which can meet most application scenarios.

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

Note: All of the following test files can be downloaded from the Resources directory of this repository

1.2 Read Data Format

All read apis follow the following call format:

/ / formatDataFrameReader.format(...) .option("key"."value").schema(...) .load()/ / sample
spark.read.format("csv")
.option("mode"."FAILFAST")          // Read mode
.option("inferSchema"."true")       // Whether to automatically infer schema
.option("path"."path/to/file(s)")   // File path
.schema(someSchema)                  // Use the predefined schema
.load()
Copy the code

The read mode has the following three options:

Read mode describe
permissive When a corrupted record is encountered, all its fields are set to NULL and all corrupted records are placed in a string column named _corruption T_record
dropMalformed Delete improperly formatted lines
failFast Fail immediately when malformed data is encountered

1.3 Write Data Format

/ / formatDataFrameWriter.format(...) .option(...) .partitionBy(...) .bucketBy(...) .sortBy(...) .save()/ / sample
dataframe.write.format("csv")
.option("mode"."OVERWRITE")         / / write mode
.option("dateFormat"."yyyy-MM-dd")  // Date format
.option("path"."path/to/file(s)")
.save()
Copy the code

The write data mode has the following four options:

Scala/Java describe
SaveMode.ErrorIfExists If a file already exists in the given path, an exception is thrown, which is the default mode for writing data
SaveMode.Append Data is written appending
SaveMode.Overwrite Data is written as an overlay
SaveMode.Ignore If the file already exists in the given path, nothing is done


Second, the CSV

CSV is a common text file format in which each line represents a record and each field in the record is separated by a comma.

2.1 Reading a CSV File

Automatic inference type read Read examples:

spark.read.format("csv")
.option("header"."false")        // Whether the first line in the file is the column name
.option("mode"."FAILFAST")      // Whether to fail quickly
.option("inferSchema"."true")   // Whether to automatically infer schema
.load("/usr/file/csv/dept.csv")
.show()
Copy the code

Using predefined types:

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
// Predefined data format
val myManualSchema = new StructType(Array(
    StructField("deptno", LongType, nullable = false),
    StructField("dname", StringType,nullable = true),
    StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode"."FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
Copy the code

2.2 Writing a CSV file

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
Copy the code

You can also specify a specific delimiter:

df.write.format("csv").mode("overwrite").option("sep"."\t").save("/tmp/csv/dept2")
Copy the code

2.3 Optional Configuration

To save space, see section 9.1 at the end of this article for all read and write configuration items.


Third, the JSON

3.1 Reading a JSON File

spark.read.format("json").option("mode"."FAILFAST").load("/usr/file/json/dept.json").show(5)
Copy the code

Note that a single data record spanning multiple lines is not supported by default (as shown below). This can be changed by setting multiLine to true, which defaults to false.

// Single line is supported by default
{"DEPTNO": 10."DNAME": "ACCOUNTING"."LOC": "NEW YORK"}

// Multiple lines are not supported by default
{
  "DEPTNO": 10."DNAME": "ACCOUNTING"."LOC": "NEW YORK"
}
Copy the code

3.2 Writing a JSON File

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
Copy the code

3.3 Optional Configuration

To save space, see section 9.2 at the end of this article for all read and write configuration items.


Four, Parquet

Parquet is an open source, column-oriented data store that provides a variety of storage optimizations to allow reading of individual columns rather than entire files, saving storage space and improving read efficiency. It is the default file format of Spark.

4.1 Reading the Parquet file

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
Copy the code

2.2 Writing to the Parquet file

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
Copy the code

2.3 Optional Configuration

The Parquet file has its own storage rules, so it has few optional configuration items. The commonly used ones are as follows:

Read and write operations Configuration items An optional value The default value describe
Write compression or codec None,

uncompressed,

bzip2,

deflate, gzip,

lz4, or snappy
None Compressed file format
Read mergeSchema true, false Depending on the configuration itemspark.sql.parquet.mergeSchema When true, the Parquet data source consolidates all the schemas collected by the data files, otherwise the Schema is selected from the summary file, or from the random data file if no summary file is available.

More optional configuration can refer to the official document: spark.apache.org/docs/latest…


Five, the ORC

ORC is a self-describing, type-aware column file format that is optimized for reading and writing large data and is a common file format for big data.

5.1 Reading ORC Files

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
Copy the code

4.2 Writing ORC Files

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
Copy the code


Six, SQL Databases

Spark also supports data reads and writes with traditional relational databases. However, Spark does not provide the database driver by default. To use the Spark database driver, you need to upload the database driver to the jars directory in the installation directory. The following example uses the Mysql database. Before using the Mysql database, you need to upload the mysql-connector-java-x.x.x.jar to the jars directory.

6.1 Reading Data

Help_keyword = help_keyword_id; help_keyword_id = name;

spark.read
.format("jdbc")
.option("driver"."com.mysql.jdbc.Driver")            / / driver
.option("url"."JDBC: mysql: / / 127.0.0.1:3306 / mysql")   // Database address
.option("dbtable"."help_keyword")                    / / the name of the table
.option("user"."root").option("password"."root").load().show(10)
Copy the code

Read data from query results:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url"."JDBC: mysql: / / 127.0.0.1:3306 / mysql")
.option("driver"."com.mysql.jdbc.Driver")
.option("user"."root").option("password"."root")
.option("dbtable", pushDownQuery)
.load().show()

/ / output
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|             10|      ALTER|
|             11|    ANALYSE|
|             12|    ANALYZE|
|             13|        AND|
|             14|    ARCHIVE|
|             15|       AREA|
|             16|         AS|
|             17|   ASBINARY|
|             18|        ASC|
|             19|     ASTEXT|
+---------------+-----------+
Copy the code

You can also use the following notation for data filtering:

val props = new java.util.Properties
props.setProperty("driver"."com.mysql.jdbc.Driver")
props.setProperty("user"."root")
props.setProperty("password"."root")
val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'")   // Specify data filtering criteria
spark.read.jdbc("JDBC: mysql: / / 127.0.0.1:3306 / mysql"."help_keyword", predicates, props).show() 

/ / output:
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|            604|       WHEN|
+---------------+-----------+
Copy the code

NumPartitions can be used to specify the degree of parallelism for reading data:

option("numPartitions".10)
Copy the code

Here, in addition to specifying partitions, you can also set upper and lower bounds, with any value less than the lower bound assigned to the first partition and any value greater than the upper bound assigned to the last partition.

val colName = "help_keyword_id"   // The column used to judge the upper and lower bounds
val lowerBound = 300L    / / lower
val upperBound = 500L    / / the upper bound
val numPartitions = 10   // Partition overview
val jdbcDf = spark.read.jdbc("JDBC: mysql: / / 127.0.0.1:3306 / mysql"."help_keyword",
                             colName,lowerBound,upperBound,numPartitions,props)
Copy the code

To verify the contents of a partition, use the operator mapPartitionsWithIndex as follows:

jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
    val buffer = new ListBuffer[String]
    while (iterator.hasNext) {
        buffer.append(index + "Partition." + iterator.next())
    }
    buffer.toIterator
}).foreach(println)
Copy the code

The result is as follows: The help_keyword table contains only about 600 entries, which should be distributed evenly across 10 partitions. Instead, partition 0 contains 319 entries, because the lower limit is set so that all entries less than 300 are restricted to the first partition, partition 0. Similarly, all data greater than 500 is allocated to partition 9, the last partition.

6.2 Writing Data

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url"."JDBC: mysql: / / 127.0.0.1:3306 / mysql")
.option("user"."root").option("password"."root")
.option("dbtable"."emp")
.save()
Copy the code


Seven, the Text

Text files have no advantages in read/write performance and cannot express clear data structures, so they are rarely used. Read/write operations are as follows:

7.1 Reading Text Data

spark.read.textFile("/usr/file/txt/dept.txt").show()
Copy the code

7.2 Writing Text Data

df.write.text("/tmp/spark/txt/dept")
Copy the code


Advanced data read and write features

8.1 read in parallel

Multiple Executors can’t read the same file at the same time, but they can read different files at the same time. This means that when you read data from a folder containing multiple files, each of those files will become a partition in the DataFrame and be read by available Executors in parallel.

8.2 parallel write

The number of files or data written depends on the number of partitions owned by the DataFrame at the time the data is written. By default, one file is written per data partition.

8.3 Partition Writing

The concepts of partition and bucket are the same as those of partition tables and bucket tables in Hive. All data are split and stored according to certain rules. Note that partitions specified by partitionBy are not the same concept as partitions in RDD: partitions are represented as subdirectories of the output directory, and data is stored in the corresponding subdirectories.

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
Copy the code

The output is as follows: You can see that the output is divided into three subdirectories according to the department number. The output files are in the subdirectories.

8.3 Write data by bucket

Bucket write hashes data based on the specified number of columns and buckets. Currently, bucket write can only be saved as a table, which is actually a bucket table of Hive.

val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
Copy the code

8.5 File Size Management

If the number of small files generated by the write is too high, then there is a lot of metadata overhead. Spark and HDFS cannot handle this problem, which is called a small file problem. At the same time, the data file should not be too large, otherwise there will be unnecessary performance overhead in the query, so the file size should be controlled within a reasonable range.

We have already shown that you can indirectly control file size by controlling the number of generated files through the number of partitions. Spark 2.2 introduces a new way to control file size in a more automated way, the maxRecordsPerFile parameter, which allows you to control file size by controlling the number of records written to the file.

 // Spark will ensure that the file contains a maximum of 5000 recordsDf. Write. Option (" maxRecordsPerFile ",5000)
Copy the code


9. Optional Configuration Appendix

9.1 READING and Writing CSV Optional configuration

Read/write operation Configuration items An optional value The default value describe
Both seq Any character .(comma) The separator
Both header true, false false Whether the first line in the file is the name of the column.
Read escape Any character \ Escape character
Read inferSchema true, false false Whether the column type is automatically inferred
Read ignoreLeadingWhiteSpace true, false false Whether to skip the space before the value
Both ignoreTrailingWhiteSpace true, false false Whether to skip the space after the value
Both nullValue Any character “” Declare which character in the file represents a null value
Both nanValue Any character NaN Which value is declared to represent NaN or the default value
Both positiveInf Any character Inf Is infinite
Both negativeInf Any character -Inf Minus infinity
Both compression or codec None,

uncompressed,

bzip2, deflate,

gzip, lz4, or

snappy
none File compression format
Both dateFormat Anything that can be converted to Java

String of SimpleDataFormat
yyyy-MM-dd The date format
Both timestampFormat Anything that can be converted to Java

String of SimpleDataFormat
Yyyy – MMdd ‘T’ HH: mm: ss. SSSZZ Timestamp format
Read maxColumns Arbitrary integer 20480 Declares the maximum number of columns in a file
Read maxCharsPerColumn Arbitrary integer 1000000 Declares the maximum number of characters in a column.
Read escapeQuotes true, false true Whether quotes in lines should be escaped.
Read maxMalformedLogPerPartition Arbitrary integer 10 Specifies the maximum number of malformed data allowed in each partition, beyond which malformed data will not be read
Write quoteAll true, false false Specifies whether all values should be enclosed in quotes, rather than just escaping values with quoted characters.
Read multiLine true, false false Whether to allow multiple rows across domains for each complete record

9.2 JSON Read/Write Optional configuration

Read/write operation Configuration items An optional value The default value
Both compression or codec None,

uncompressed,

bzip2, deflate,

gzip, lz4, or

snappy
none
Both dateFormat Any string that can be converted to Java’s SimpleDataFormat yyyy-MM-dd
Both timestampFormat Any string that can be converted to Java’s SimpleDataFormat Yyyy – MMdd ‘T’ HH: mm: ss. SSSZZ
Read primitiveAsString true, false false
Read allowComments true, false false
Read allowUnquotedFieldNames true, false false
Read allowSingleQuotes true, false true
Read allowNumericLeadingZeros true, false false
Read allowBackslashEscapingAnyCharacter true, false false
Read columnNameOfCorruptRecord true, false Value of spark.sql.column&NameOf
Read multiLine true, false false

9.3 Database Read/Write Optional configuration

The attribute name meaning
url Database address
dbtable The name of the table
driver Database driver
partitionColumn,

lowerBound, upperBoun
Total number of partitions, upper bound, lower bound
numPartitions Maximum number of partitions that can be used for table read and write parallelism. If the number of partitions to be written exceeds this limit, you can call coalesce(numPartition) to reset the number of partitions.
fetchsize How many rows of data are fetched per round trip. This option only applies to reading data.
batchsize How many rows are inserted per round trip? This option only applies to writes. The default value is 1000.
isolationLevel Transaction isolation level: Can be NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ or SERIALIZABLE, which is the standard transaction isolation level.

The default value is READ_UNCOMMITTED. This option only applies to data reads.
createTableOptions Customize configurations for creating tables while writing data
createTableColumnTypes Customize column types to create columns when writing data

Database to read and write more configuration can refer to the official document: spark.apache.org/docs/latest…

The resources

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
  2. Spark.apache.org/docs/latest…