Structured Streaming + Kafka Integration

With reference to the official spark.apache.org/docs/latest…

Start the kafka

// Download kafka and ZooKeeper and unzip them... / / before starting the kafka need to start the zk $bin/zookeeper - server - start. Sh config/zookeeper. The properties / / start kafka, $bin/kafka-server-start.sh config/server.properties // Create a topic $named Quickstart-events with kafka Bin /kafka-topics. Sh --create --topic quickstart-events --bootstrap-server localhost:9092 // Start a producer so that it can send messages to topic in shell  $ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092Copy the code

Write the flow calculation code and run it in IDEA

Refer to the org. Apache. Spark. Examples. SQL. Streaming. JavaStructuredNetworkWordCount

1. Create a Java Maven project

2. Pom adds dependencies

< the dependency > < groupId > org. Apache. Kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > 2.2.2 < / version > < / dependency > < the dependency > < groupId > org. Apache. The spark < / groupId > < artifactId > spark - sql_2. 12 < / artifactId > < version > 3.1.2 < / version > < / dependency > < the dependency > < groupId > org. Apache. Spark < / groupId > <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency> // Optional This is required for local debug output logs < the dependency > < groupId > ch. Qos. Logback < / groupId > < artifactId > logback - core < / artifactId > < version > 1.1.7 < / version > </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> The < version > 1.1.7 < / version > < / dependency >Copy the code

3. Write a main method and Run…

public static void main(String[] args) throws Exception { SparkConf config = new SparkConf().setAppName("JavaStructuredKafkaWordCount").setMaster("local[1]"); SparkContext sparkContext = new SparkContext(config); SparkSession spark = new SparkSession(sparkContext); // Create DataSet representing the stream of input lines from kafka Dataset<String> lines = spark .readStream() . The format (" kafka "). The option (" kafka. The bootstrap. The servers ", "127.0.0.1:9092"). The option (" subscribe ", "quickstart-events") .load() .selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()); // Generate running word count Dataset<org.apache.spark.sql.Row> wordCounts = lines.flatMap( (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING()).groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream()  .outputMode("complete") .format("console") .start(); query.awaitTermination(); }Copy the code

4. Kafka Producer by Step1…

> hello a
> hello b
> ccc
Copy the code

5. You can view the number of occurrences of each word through the IDEA console…

Batch: 0 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ +-----+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ | value|count| +------+-----+ | hel| 1| | owen| 3| |uewior|  1| | owen2| 1| | wu| 1| +------+-----+Copy the code

Package the code to run in a Spark cluster (typically in a production environment)

Add fat-JAR plug-in to POM

<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.2-beta-5</version> <executions> <phase>package</phase> < Goals > <goal>single</goal> </goals> <configuration> <archive> <manifest> <mainClass> xxx.xxx.client.SparkApplication </mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </execution> </executions> </plugin>Copy the code

maven package…

Download the Spark 3.xx version from the Spark official website, decompress it, and run it in the directory

./bin/spark-submit \ --class xxx.xxx.client.SparkApplication \ --master 'local[2]' \ --packages Org.apache. spark:spark-sql-kafka-0-10_2.12:3.1.2 \ /Users/xxx.xxx-latest- snapshot-jar-with-dependencies. Jar \ 1000 >> ~/Desktop/xxxxxxx.tempCopy the code

Then go to the file ~/Desktop/ xxxxXXx. temp to see the real-time calculation results

introduce



What is the relationship between Structured Streaming and Spark Streaming?

Structured Streaming is a new feature in version 2.x and will replace Spark Streaming 3 in the future. As shown above, Structured Streaming is part of Spark SQL.

structured-streaming-programming-guide

Source ——> Structured Streaming ——> Sink

The built-in Source includes File, Kafka, Socket, and Rate Source.