Abstract: An error message “IOException: totalValueCount == 0” was reported when a table was read in the Spark SQL ETL task. However, the table was written without any exception.

IOException: totalValueCount == 0

1. Problem description

When the Spark SQL is used to perform the ETL task, an error message “IOException: totalValueCount == 0” is displayed when a table is read. However, the table is normal when it is written.

2. Preliminary analysis

The result of this table is generated when two tables are joined. After analysis, data skew is generated in join results, and the skew key is NULL. After Join, each task writes to a file. Therefore, the task whose partition key is null writes a large number of NULL values to a file, totaling 2.2 billion null values.

2.2 billion is a sensitive number, just above the int maximum value of 2,147,483,647. Therefore, it was initially suspected that Parquet had a problem writing more than in.max values.

[Note] This article only focuses on the problem of reading error caused by a large number of null values written to the same file. Whether such a large number of NULls are justified for this column data is beyond the scope of this article.

3. Deep dive into Parquet (version 1.8.3)

Entry: Spark (Spark 2.3) -> Parquet

The Parquet call entry is on Spark, so start digging the call stack from Spark.

InsertIntoHadoopFsRelationCommand.run()/SaveAsHiveFile.saveAsHiveFile()->

FileFormatWriter.write()

There are several steps:

1. Start the homework before, create outputWriterFactory: ParquetFileFormat. PrepareWrite (). This sets up a set of configuration information related to the Parquet write file. The main one is to set the WriteSupport class: ParquetOutputFormat. SetWriteSupportClass (job, classOf [ParquetWriteSupport]), ParquetWriteSupport is Spark define your own classes.

ExecuteTask () -> writetask.execute (); OutputWriterFactory. NewInstance ().

(3) for each record, use ParquetOutputWriter. Write write parquet file (InternalRow) methods in turn.

4. Before the end of a Task, call ParquetOutputWriter. Close () to close the resources.

3.1 the Write process

In the ParquetOutputWriter, through ParquetOutputFormat. GetRecordWriter construct a RecordWriter (ParquetRecordWriter), which includes:

1. WriteSupport set when prepareWrite() : converts Spark Record and writes to parquet structure

2. ParquetFileWriter: Writes files

ParquetRecordWriter, is actually the entrusting the write operation to a internalWriter (InternalParquetRecordWriter, Construct with WriteSupport and ParquetFileWriter.

Now let’s sort out the general process so far: SingleDirectoryWriteTask/DynamicPartitionWriteTask.execute-> ParquetOutputWriter.write -> ParquetRecordWriter.write ->InternalParquetRecordWriter.write

Next, InternalParquetRecordWriter. Write inside, is three things:

1. WriteSupport. Write that ParquetWriteSupport. Write, inside in three steps:

1. MessageColumnIO. MessageColumnIORecordConsumer. StartMessage;

2. ParquetWriteSupport. WriteFields: write a line of each column value, except null values;

3. MessageColumnIO. MessageColumnIORecordConsumer. EndMessage: in the second step is missing fields into a null value.

ColumnWriterV1. WriteNull -> accountForValueWritten: 1) Add counter valueCount (int type) 2) Check whether space is full, writePage – checkpoint 1 is required

Add counter recordCount (long type)

3. Check whether the block size needs to be flushRowGroupToStore-checkpoint 2

Since the values written are all null and memSize is 0 at both checkpoints 1 and 2, page and row group are not refreshed. The result is that you keep adding null values to the same page. The ColumnWriterV1 counter valueCount is an int and overflows to a negative value when int. Max is exceeded.

Therefore, only when the close() method is called (when the task ends) will flushRowGroupToStore be executed:

ParquetOutputWriter.close -> ParquetRecordWriter.close-> InternalParquetRecordWriter.close -> flushRowGroupToStore-> ColumnWriteStoreV1.flush -> for each column ColumnWriterV1.flush

Since valueCount overflow is negative, no page is written here either.

Because writePage has not been called, totalValueCount here is always 0.

ColumnWriterV1. WritePage – > ColumnChunkPageWriter. WritePage – > accumulated totalValueCount

At the end of write, InternalParquetRecordWriter.close-> flushRowGroupToStore -> ColumnChunkPageWriteStore.flushToFileWriter-> for each column ColumnChunkPageWriter.writeToFileWriter:

1. ParquetFileWriter. StartColumn: totalValueCount currentChunkValueCount assignment

2. ParquetFileWriter.writeDataPages

3. ParquetFileWriter. EndColumn: currentChunkValueCount (0), and other metadata information construct a ColumnChunkMetaData, relevant information will eventually be written to the file.

3.2 the Read process

Also, use Spark as the entrance to view information.

Initialization phase: ParquetFileFormat.BuildReaderWithPartitionValues-> VectorizedParquetRecordReader.initialize -> ParquetFileReader.readFooter-> ParquetMetadataConverter.readParquetMetadata -> fromParquetMetadata-> Columnchunkmetadata. get, which contains valueCount (0).

When reading: VectorizedParquetRecordReader. NextBatch – > checkEndOfRowGroup: 1) ParquetFileReader. ReadNextRowGroup – > for eachchunk, currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages())

Since getValueCount is 0, pagesInChunk is empty.

ColumnChunkPageReader:

Because the page list is empty, totalValueCount is 0, resulting in an error in constructing the VectorizedColumnReader.

4. Solution: Parquet Upgrade (Version 1.11.1)

In the new version, ParquetWriteSupport.write ->MessageColumnIO.MessageColumnIORecordConsumer.endMessage ->ColumnWriteStoreV1(ColumnWriteStoreBase).endRecord:

Add a maximum number of records per page (default 2W) property and check logic to endRecord, and writePage should exceed the limit so that valueCount of ColumnWriterV1 will not overflow (the writePage will be zeroed out after each writePage).

And compared with the old version 1.8.3 ColumnWriteStoreV1. EndRecord empty function.

Attached: a small trick in Parquet

In Parquet, when a value of type long is in a range, int is used to store it as follows:

Check whether int can be used for storage:

If possible, use IntColumnChunkMetaData instead of LongColumnChunkMetaData to construct the time conversion:

When using, turn back, IntColumnChunkMetaData getValueCount – > intToPositiveLong () :

The normal int range is -2^31 to (2^ 31-1). Since metadata information (such as valueCount) is non-negative integers, only numbers in the range 0 to (2^ 31-1) can be stored. In this way, you can express numbers in the range 0 to (2^32-1), and the range is twice as large.

Appendix: Test case code that can be used for replay (depends on some Spark classes and can be run in the Spark project)

Test case code.txt

Click to follow, the first time to learn about Huawei cloud fresh technology ~