Structured Streaming + Kafka Integration
With reference to the official spark.apache.org/docs/latest…
Start the kafka
// Download kafka and ZooKeeper and unzip them... / / before starting the kafka need to start the zk $bin/zookeeper - server - start. Sh config/zookeeper. The properties / / start kafka, $bin/kafka-server-start.sh config/server.properties // Create a topic $named Quickstart-events with kafka Bin /kafka-topics. Sh --create --topic quickstart-events --bootstrap-server localhost:9092 // Start a producer so that it can send messages to topic in shell $ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092Copy the code
Write the flow calculation code and run it in IDEA
Refer to the org. Apache. Spark. Examples. SQL. Streaming. JavaStructuredNetworkWordCount
1. Create a Java Maven project
2. Pom adds dependencies
< the dependency > < groupId > org. Apache. Kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 2.2.2 < / version > < / dependency > < the dependency > < groupId > org. Apache. The spark < / groupId > < artifactId > spark - sql_2. 12 < / artifactId > < version > 3.1.2 < / version > < / dependency > < the dependency > < groupId > org. Apache. Spark < / groupId > <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency> // Optional This is required for local debug output logs < the dependency > < groupId > ch. Qos. Logback < / groupId > < artifactId > logback - core < / artifactId > < version > 1.1.7 < / version > </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> The < version > 1.1.7 < / version > < / dependency >Copy the code
3. Write a main method and Run…
public static void main(String[] args) throws Exception { SparkConf config = new SparkConf().setAppName("JavaStructuredKafkaWordCount").setMaster("local[1]"); SparkContext sparkContext = new SparkContext(config); SparkSession spark = new SparkSession(sparkContext); // Create DataSet representing the stream of input lines from kafka Dataset<String> lines = spark .readStream() . The format (" kafka "). The option (" kafka. The bootstrap. The servers ", "127.0.0.1:9092"). The option (" subscribe ", "quickstart-events") .load() .selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()); // Generate running word count Dataset<org.apache.spark.sql.Row> wordCounts = lines.flatMap( (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING()).groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); }Copy the code
4. Kafka Producer by Step1…
> hello a
> hello b
> ccc
Copy the code
5. You can view the number of occurrences of each word through the IDEA console…
Batch: 0 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ +-----+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ | value|count| +------+-----+ | hel| 1| | owen| 3| |uewior| 1| | owen2| 1| | wu| 1| +------+-----+Copy the code
Package the code to run in a Spark cluster (typically in a production environment)
Add fat-JAR plug-in to POM
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.2-beta-5</version> <executions> <phase>package</phase> < Goals > <goal>single</goal> </goals> <configuration> <archive> <manifest> <mainClass> xxx.xxx.client.SparkApplication </mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </execution> </executions> </plugin>Copy the code
maven package…
Download the Spark 3.xx version from the Spark official website, decompress it, and run it in the directory
./bin/spark-submit \ --class xxx.xxx.client.SparkApplication \ --master 'local[2]' \ --packages Org.apache. spark:spark-sql-kafka-0-10_2.12:3.1.2 \ /Users/xxx.xxx-latest- snapshot-jar-with-dependencies. Jar \ 1000 >> ~/Desktop/xxxxxxx.tempCopy the code
Then go to the file ~/Desktop/ xxxxXXx. temp to see the real-time calculation results
introduce
What is the relationship between Structured Streaming and Spark Streaming?
Structured Streaming is a new feature in version 2.x and will replace Spark Streaming 3 in the future. As shown above, Structured Streaming is part of Spark SQL.
structured-streaming-programming-guide
Source ——> Structured Streaming ——> Sink
The built-in Source includes File, Kafka, Socket, and Rate Source.