background

Spark Structured Streaming data is processed by using Spark Structured Streaming data in project development. The processing flow is as follows: Message middleware (Source) -> Spark Structured Streaming(work) -> AFS(sink). In the process of source->work, messages are stored in protobuf format, where Spark Structured Streaming receives data in Array[Byte], So we need to deserialize Array[Byte] data through protobuf to get the final complete String. PS: At first, I was lazy and tried to convert the data into String by using new String(Array[Byte]). Finally, AS expected, I could only see part of the data, and most of the remaining data were garbled or blank, so I could not get the content of the field I wanted. So in the end, it’s okay to deserialize using Protobuf.

Sad history

Without further ado, the process is as follows:

Things to prepare

  • A messaging middleware that can retrieve data in protoBuf format;
  • Spark Structured Streaming running environment, I used Hadoop environment;
  • A.proto file corresponding to protobuf data in message-oriented middleware;
  • A native Protobuf compiler that can execute protoc commands;

Preparations for protobuf and local environment

Protobuf related knowledge, proto proto related basic knowledge such as grammar see the proto’s official website: developers. Google. Cn/protocol – bu… Or you can go search some other people’s blogs or something. My test.proto file looks like this:

syntax = "proto2";

// Enumeration of languages, continue to add later
enum LanguageTypes {
	CH = 0;	/ / Chinese
	ENGLISH = 1;	/ / English
	// Below is a list of supplementary standards
}

// Log character encoding enumeration
enum CodeType {
	CODE_TYPE_UNKNOWN = 0;	// So far type
	UTF8 = 1;	//utf8
	GBK = 2;	//gbk
}

message Log {
	// The above fields whose IDS are smaller than 129 are reserved and cannot be added. The ids of the user-owned fields start from 130
	optional int64 connection_code = 130;	// User connection number
	optional string action_json = 131;	// User behavior data
	required string send_time = 132;	// Request sending time
}
Copy the code

Install proto compiler on your own computer, Windows, Mac environment installation tutorial a lot of search, go to download, install on the line. Because of the running environment of my program, I have installed version 2.4.1 on my computer. The download path is as follows: github.com/protocolbuf… My system is a MAC, so when the installation is complete, enter the command on the terminal

protoc --version
Copy the code

The result is

libprotoc 2.4.1
Copy the code

It worked.

Protobuf compiles to Java classes

Enter the command on the local terminal:

protoc -I=proto File storage directory --java_out= Absolute path to the destination directory that the final Java class wants to store the absolute path to the proto fileCopy the code

The target Java class is generated in the path specified by the — javA_out argument. I generated the test. Java class using test.proto; Move the generated test.java class to the specified directory in your code.

Maven rely on

My project is Scala project, so I need to add the protobuf-Java dependency in maven project to use the above Java class. My POM dependency is as follows:

<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>2.41.</version>
</dependency>
Copy the code

Maven depends on pits

  • The Maven dependency version must be the same as the protoc compiler version of the locally generated Java classes. Otherwise, the Java classes corresponding to ProTO will be unavailable.
  • Our Hadoop environment relies on protobuf version 2.5.0, but the spark-core dependency introduced uses Protobuf 2.4.1, where 2.4.1 and 2.5.0 are not compatible.
    • Commands can be used in the project pathmvn dependency:treeTo see what other packages and versions Maven depends on itself;
    • Initially, I installed the latest version of 3.5.1, but when the compilation ran, I received an error message:
java.lang.NoSuchMethodError
Copy the code

However, I can jump to the corresponding method in the local jump, which is caused by packet conflict, so I reduced the protoc version to 2.5.0, but error message:

java.lang.VerifyError:class com.XX.XX.Test$Log overrides final method.
Copy the code

This is because even though 2.5.0 is used in Hadoop, 2.4.1 is still called when Spark is running, so I reduced the version to 2.4.1. After dropping to 2.4.1, changes were made to some other dependent versions that were not compatible with 2.4.1. Call successful!

The Spark code

val inputStream = spark.readStream
	......
	.load
	.as[Array[Byte]]
	.map(row => {
		val log = Test.Log.parseFrom(row)
		val action_json = log.getActionJson
		action_json
	})
	.toDF("value")
Copy the code