A list,
1.1 Multi-data source support
Spark supports the following six core data sources. In addition, the Spark community provides hundreds of data source reading methods, which can meet most application scenarios.
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
Note: All of the following test files can be downloaded from the Resources directory of this repository
1.2 Read Data Format
All read apis follow the following call format:
/ / formatDataFrameReader.format(...) .option("key"."value").schema(...) .load()/ / sample
spark.read.format("csv")
.option("mode"."FAILFAST") // Read mode
.option("inferSchema"."true") // Whether to automatically infer schema
.option("path"."path/to/file(s)") // File path
.schema(someSchema) // Use the predefined schema
.load()
Copy the code
The read mode has the following three options:
Read mode | describe |
---|---|
permissive |
When a corrupted record is encountered, all its fields are set to NULL and all corrupted records are placed in a string column named _corruption T_record |
dropMalformed |
Delete improperly formatted lines |
failFast |
Fail immediately when malformed data is encountered |
1.3 Write Data Format
/ / formatDataFrameWriter.format(...) .option(...) .partitionBy(...) .bucketBy(...) .sortBy(...) .save()/ / sample
dataframe.write.format("csv")
.option("mode"."OVERWRITE") / / write mode
.option("dateFormat"."yyyy-MM-dd") // Date format
.option("path"."path/to/file(s)")
.save()
Copy the code
The write data mode has the following four options:
Scala/Java | describe |
---|---|
SaveMode.ErrorIfExists |
If a file already exists in the given path, an exception is thrown, which is the default mode for writing data |
SaveMode.Append |
Data is written appending |
SaveMode.Overwrite |
Data is written as an overlay |
SaveMode.Ignore |
If the file already exists in the given path, nothing is done |
Second, the CSV
CSV is a common text file format in which each line represents a record and each field in the record is separated by a comma.
2.1 Reading a CSV File
Automatic inference type read Read examples:
spark.read.format("csv")
.option("header"."false") // Whether the first line in the file is the column name
.option("mode"."FAILFAST") // Whether to fail quickly
.option("inferSchema"."true") // Whether to automatically infer schema
.load("/usr/file/csv/dept.csv")
.show()
Copy the code
Using predefined types:
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
// Predefined data format
val myManualSchema = new StructType(Array(
StructField("deptno", LongType, nullable = false),
StructField("dname", StringType,nullable = true),
StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode"."FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
Copy the code
2.2 Writing a CSV file
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
Copy the code
You can also specify a specific delimiter:
df.write.format("csv").mode("overwrite").option("sep"."\t").save("/tmp/csv/dept2")
Copy the code
2.3 Optional Configuration
To save space, see section 9.1 at the end of this article for all read and write configuration items.
Third, the JSON
3.1 Reading a JSON File
spark.read.format("json").option("mode"."FAILFAST").load("/usr/file/json/dept.json").show(5)
Copy the code
Note that a single data record spanning multiple lines is not supported by default (as shown below). This can be changed by setting multiLine to true, which defaults to false.
// Single line is supported by default
{"DEPTNO": 10."DNAME": "ACCOUNTING"."LOC": "NEW YORK"}
// Multiple lines are not supported by default
{
"DEPTNO": 10."DNAME": "ACCOUNTING"."LOC": "NEW YORK"
}
Copy the code
3.2 Writing a JSON File
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
Copy the code
3.3 Optional Configuration
To save space, see section 9.2 at the end of this article for all read and write configuration items.
Four, Parquet
Parquet is an open source, column-oriented data store that provides a variety of storage optimizations to allow reading of individual columns rather than entire files, saving storage space and improving read efficiency. It is the default file format of Spark.
4.1 Reading the Parquet file
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
Copy the code
2.2 Writing to the Parquet file
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
Copy the code
2.3 Optional Configuration
The Parquet file has its own storage rules, so it has few optional configuration items. The commonly used ones are as follows:
Read and write operations | Configuration items | An optional value | The default value | describe |
---|---|---|---|---|
Write | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
None | Compressed file format |
Read | mergeSchema | true, false | Depending on the configuration itemspark.sql.parquet.mergeSchema |
When true, the Parquet data source consolidates all the schemas collected by the data files, otherwise the Schema is selected from the summary file, or from the random data file if no summary file is available. |
More optional configuration can refer to the official document: spark.apache.org/docs/latest…
Five, the ORC
ORC is a self-describing, type-aware column file format that is optimized for reading and writing large data and is a common file format for big data.
5.1 Reading ORC Files
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
Copy the code
4.2 Writing ORC Files
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
Copy the code
Six, SQL Databases
Spark also supports data reads and writes with traditional relational databases. However, Spark does not provide the database driver by default. To use the Spark database driver, you need to upload the database driver to the jars directory in the installation directory. The following example uses the Mysql database. Before using the Mysql database, you need to upload the mysql-connector-java-x.x.x.jar to the jars directory.
6.1 Reading Data
Help_keyword = help_keyword_id; help_keyword_id = name;
spark.read
.format("jdbc")
.option("driver"."com.mysql.jdbc.Driver") / / driver
.option("url"."JDBC: mysql: / / 127.0.0.1:3306 / mysql") // Database address
.option("dbtable"."help_keyword") / / the name of the table
.option("user"."root").option("password"."root").load().show(10)
Copy the code
Read data from query results:
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url"."JDBC: mysql: / / 127.0.0.1:3306 / mysql")
.option("driver"."com.mysql.jdbc.Driver")
.option("user"."root").option("password"."root")
.option("dbtable", pushDownQuery)
.load().show()
/ / output
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 19| ASTEXT|
+---------------+-----------+
Copy the code
You can also use the following notation for data filtering:
val props = new java.util.Properties
props.setProperty("driver"."com.mysql.jdbc.Driver")
props.setProperty("user"."root")
props.setProperty("password"."root")
val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") // Specify data filtering criteria
spark.read.jdbc("JDBC: mysql: / / 127.0.0.1:3306 / mysql"."help_keyword", predicates, props).show()
/ / output:
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 604| WHEN|
+---------------+-----------+
Copy the code
NumPartitions can be used to specify the degree of parallelism for reading data:
option("numPartitions".10)
Copy the code
Here, in addition to specifying partitions, you can also set upper and lower bounds, with any value less than the lower bound assigned to the first partition and any value greater than the upper bound assigned to the last partition.
val colName = "help_keyword_id" // The column used to judge the upper and lower bounds
val lowerBound = 300L / / lower
val upperBound = 500L / / the upper bound
val numPartitions = 10 // Partition overview
val jdbcDf = spark.read.jdbc("JDBC: mysql: / / 127.0.0.1:3306 / mysql"."help_keyword",
colName,lowerBound,upperBound,numPartitions,props)
Copy the code
To verify the contents of a partition, use the operator mapPartitionsWithIndex as follows:
jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "Partition." + iterator.next())
}
buffer.toIterator
}).foreach(println)
Copy the code
The result is as follows: The help_keyword table contains only about 600 entries, which should be distributed evenly across 10 partitions. Instead, partition 0 contains 319 entries, because the lower limit is set so that all entries less than 300 are restricted to the first partition, partition 0. Similarly, all data greater than 500 is allocated to partition 9, the last partition.
6.2 Writing Data
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url"."JDBC: mysql: / / 127.0.0.1:3306 / mysql")
.option("user"."root").option("password"."root")
.option("dbtable"."emp")
.save()
Copy the code
Seven, the Text
Text files have no advantages in read/write performance and cannot express clear data structures, so they are rarely used. Read/write operations are as follows:
7.1 Reading Text Data
spark.read.textFile("/usr/file/txt/dept.txt").show()
Copy the code
7.2 Writing Text Data
df.write.text("/tmp/spark/txt/dept")
Copy the code
Advanced data read and write features
8.1 read in parallel
Multiple Executors can’t read the same file at the same time, but they can read different files at the same time. This means that when you read data from a folder containing multiple files, each of those files will become a partition in the DataFrame and be read by available Executors in parallel.
8.2 parallel write
The number of files or data written depends on the number of partitions owned by the DataFrame at the time the data is written. By default, one file is written per data partition.
8.3 Partition Writing
The concepts of partition and bucket are the same as those of partition tables and bucket tables in Hive. All data are split and stored according to certain rules. Note that partitions specified by partitionBy are not the same concept as partitions in RDD: partitions are represented as subdirectories of the output directory, and data is stored in the corresponding subdirectories.
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
Copy the code
The output is as follows: You can see that the output is divided into three subdirectories according to the department number. The output files are in the subdirectories.
8.3 Write data by bucket
Bucket write hashes data based on the specified number of columns and buckets. Currently, bucket write can only be saved as a table, which is actually a bucket table of Hive.
val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
Copy the code
8.5 File Size Management
If the number of small files generated by the write is too high, then there is a lot of metadata overhead. Spark and HDFS cannot handle this problem, which is called a small file problem. At the same time, the data file should not be too large, otherwise there will be unnecessary performance overhead in the query, so the file size should be controlled within a reasonable range.
We have already shown that you can indirectly control file size by controlling the number of generated files through the number of partitions. Spark 2.2 introduces a new way to control file size in a more automated way, the maxRecordsPerFile parameter, which allows you to control file size by controlling the number of records written to the file.
// Spark will ensure that the file contains a maximum of 5000 recordsDf. Write. Option (" maxRecordsPerFile ",5000)
Copy the code
9. Optional Configuration Appendix
9.1 READING and Writing CSV Optional configuration
Read/write operation | Configuration items | An optional value | The default value | describe |
---|---|---|---|---|
Both | seq | Any character | . (comma) |
The separator |
Both | header | true, false | false | Whether the first line in the file is the name of the column. |
Read | escape | Any character | \ | Escape character |
Read | inferSchema | true, false | false | Whether the column type is automatically inferred |
Read | ignoreLeadingWhiteSpace | true, false | false | Whether to skip the space before the value |
Both | ignoreTrailingWhiteSpace | true, false | false | Whether to skip the space after the value |
Both | nullValue | Any character | “” | Declare which character in the file represents a null value |
Both | nanValue | Any character | NaN | Which value is declared to represent NaN or the default value |
Both | positiveInf | Any character | Inf | Is infinite |
Both | negativeInf | Any character | -Inf | Minus infinity |
Both | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
none | File compression format |
Both | dateFormat | Anything that can be converted to Java String of SimpleDataFormat |
yyyy-MM-dd | The date format |
Both | timestampFormat | Anything that can be converted to Java String of SimpleDataFormat |
Yyyy – MMdd ‘T’ HH: mm: ss. SSSZZ | Timestamp format |
Read | maxColumns | Arbitrary integer | 20480 | Declares the maximum number of columns in a file |
Read | maxCharsPerColumn | Arbitrary integer | 1000000 | Declares the maximum number of characters in a column. |
Read | escapeQuotes | true, false | true | Whether quotes in lines should be escaped. |
Read | maxMalformedLogPerPartition | Arbitrary integer | 10 | Specifies the maximum number of malformed data allowed in each partition, beyond which malformed data will not be read |
Write | quoteAll | true, false | false | Specifies whether all values should be enclosed in quotes, rather than just escaping values with quoted characters. |
Read | multiLine | true, false | false | Whether to allow multiple rows across domains for each complete record |
9.2 JSON Read/Write Optional configuration
Read/write operation | Configuration items | An optional value | The default value |
---|---|---|---|
Both | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
none |
Both | dateFormat | Any string that can be converted to Java’s SimpleDataFormat | yyyy-MM-dd |
Both | timestampFormat | Any string that can be converted to Java’s SimpleDataFormat | Yyyy – MMdd ‘T’ HH: mm: ss. SSSZZ |
Read | primitiveAsString | true, false | false |
Read | allowComments | true, false | false |
Read | allowUnquotedFieldNames | true, false | false |
Read | allowSingleQuotes | true, false | true |
Read | allowNumericLeadingZeros | true, false | false |
Read | allowBackslashEscapingAnyCharacter | true, false | false |
Read | columnNameOfCorruptRecord | true, false | Value of spark.sql.column&NameOf |
Read | multiLine | true, false | false |
9.3 Database Read/Write Optional configuration
The attribute name | meaning |
---|---|
url | Database address |
dbtable | The name of the table |
driver | Database driver |
partitionColumn, lowerBound, upperBoun |
Total number of partitions, upper bound, lower bound |
numPartitions | Maximum number of partitions that can be used for table read and write parallelism. If the number of partitions to be written exceeds this limit, you can call coalesce(numPartition) to reset the number of partitions. |
fetchsize | How many rows of data are fetched per round trip. This option only applies to reading data. |
batchsize | How many rows are inserted per round trip? This option only applies to writes. The default value is 1000. |
isolationLevel | Transaction isolation level: Can be NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ or SERIALIZABLE, which is the standard transaction isolation level. The default value is READ_UNCOMMITTED. This option only applies to data reads. |
createTableOptions | Customize configurations for creating tables while writing data |
createTableColumnTypes | Customize column types to create columns when writing data |
Database to read and write more configuration can refer to the official document: spark.apache.org/docs/latest…
The resources
- Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
- Spark.apache.org/docs/latest…