background
Spark Structured Streaming data is processed by using Spark Structured Streaming data in project development. The processing flow is as follows: Message middleware (Source) -> Spark Structured Streaming(work) -> AFS(sink). In the process of source->work, messages are stored in protobuf format, where Spark Structured Streaming receives data in Array[Byte], So we need to deserialize Array[Byte] data through protobuf to get the final complete String. PS: At first, I was lazy and tried to convert the data into String by using new String(Array[Byte]). Finally, AS expected, I could only see part of the data, and most of the remaining data were garbled or blank, so I could not get the content of the field I wanted. So in the end, it’s okay to deserialize using Protobuf.
Sad history
Without further ado, the process is as follows:
Things to prepare
- A messaging middleware that can retrieve data in protoBuf format;
- Spark Structured Streaming running environment, I used Hadoop environment;
- A.proto file corresponding to protobuf data in message-oriented middleware;
- A native Protobuf compiler that can execute protoc commands;
Preparations for protobuf and local environment
Protobuf related knowledge, proto proto related basic knowledge such as grammar see the proto’s official website: developers. Google. Cn/protocol – bu… Or you can go search some other people’s blogs or something. My test.proto file looks like this:
syntax = "proto2";
// Enumeration of languages, continue to add later
enum LanguageTypes {
CH = 0; / / Chinese
ENGLISH = 1; / / English
// Below is a list of supplementary standards
}
// Log character encoding enumeration
enum CodeType {
CODE_TYPE_UNKNOWN = 0; // So far type
UTF8 = 1; //utf8
GBK = 2; //gbk
}
message Log {
// The above fields whose IDS are smaller than 129 are reserved and cannot be added. The ids of the user-owned fields start from 130
optional int64 connection_code = 130; // User connection number
optional string action_json = 131; // User behavior data
required string send_time = 132; // Request sending time
}
Copy the code
Install proto compiler on your own computer, Windows, Mac environment installation tutorial a lot of search, go to download, install on the line. Because of the running environment of my program, I have installed version 2.4.1 on my computer. The download path is as follows: github.com/protocolbuf… My system is a MAC, so when the installation is complete, enter the command on the terminal
protoc --version
Copy the code
The result is
libprotoc 2.4.1
Copy the code
It worked.
Protobuf compiles to Java classes
Enter the command on the local terminal:
protoc -I=proto File storage directory --java_out= Absolute path to the destination directory that the final Java class wants to store the absolute path to the proto fileCopy the code
The target Java class is generated in the path specified by the — javA_out argument. I generated the test. Java class using test.proto; Move the generated test.java class to the specified directory in your code.
Maven rely on
My project is Scala project, so I need to add the protobuf-Java dependency in maven project to use the above Java class. My POM dependency is as follows:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.41.</version>
</dependency>
Copy the code
Maven depends on pits
- The Maven dependency version must be the same as the protoc compiler version of the locally generated Java classes. Otherwise, the Java classes corresponding to ProTO will be unavailable.
- Our Hadoop environment relies on protobuf version 2.5.0, but the spark-core dependency introduced uses Protobuf 2.4.1, where 2.4.1 and 2.5.0 are not compatible.
- Commands can be used in the project path
mvn dependency:tree
To see what other packages and versions Maven depends on itself; - Initially, I installed the latest version of 3.5.1, but when the compilation ran, I received an error message:
- Commands can be used in the project path
java.lang.NoSuchMethodError
Copy the code
However, I can jump to the corresponding method in the local jump, which is caused by packet conflict, so I reduced the protoc version to 2.5.0, but error message:
java.lang.VerifyError:class com.XX.XX.Test$Log overrides final method.
Copy the code
This is because even though 2.5.0 is used in Hadoop, 2.4.1 is still called when Spark is running, so I reduced the version to 2.4.1. After dropping to 2.4.1, changes were made to some other dependent versions that were not compatible with 2.4.1. Call successful!
The Spark code
val inputStream = spark.readStream
......
.load
.as[Array[Byte]]
.map(row => {
val log = Test.Log.parseFrom(row)
val action_json = log.getActionJson
action_json
})
.toDF("value")
Copy the code