Each “like” on the page sends the id of the “like” article to Kafka. Then spark Streaming reads the data in Kafka, counts the “like” number, and updates it back to mysql
The full case code has been uploaded to github: github.com/neatlife/my…
Get case items
Projects can be created at https://start.spring.io
Add web-starter and kafka-starter. Spark and Spark kafka streaming need to be added manually. No corresponding starter is available
Then the clients for Kafka and Spark Streaming are introduced in POM.xml
<! -- Spark dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>Kafka_2. 10</artifactId>
<version>0.8.2.1</version>
</dependency>
<! -- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - core_2. 10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - streaming_2. 10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - streaming - kafka_2. 10</artifactId>
<version>1.6.2</version>
</dependency>
Copy the code
Send a “like” postId to Kafka
Kafka: juejin.cn/post/684490…
Create kafka topics, such as myLikes
kafka-topics --create --topic mylikes --replication-factor 1 --partitions 1 --zookeeper zoo1:2181
Copy the code
The operation effect is as follows
Add a like interface, the core code is as follows
@RequestMapping("/send-like")
public String sendLike(@RequestParam(value = "post_id", required = true) Integer postId) {
producer.send(postId);
return "test1";
}
Copy the code
Kafka sends the core code below
public void send(Integer postId) {
ProducerRecord producerRecord = new ProducerRecord(topic, postId.toString(), "1");
this.kafkaTemplate.send(producerRecord);
System.out.println("Sent sample postId [" + postId + "] to " + topic);
}
Copy the code
Note that the key and value sent to Kafka are strings. The ID and the number of likes are int, so spark needs to do this conversion
Read data from Kafka in Spark and count upvotes
Create a Spark client that reads data from Kafka
SparkConf conf = new SparkConf()
.setAppName("mySparkLikes")
.setMaster("local[*]")
.set("spark.default.parallelism"."15")
.set("spark.streaming.concurrentJobs"."5")
.set("spark.executor.memory"."1G")
.set("spark.cores.max"."3")
.set("spark.local.dir"."/tmp/mySparkLikes")
.set("spark.streaming.kafka.maxRatePerPartition"."5");
Set<String> topics = Collections.singleton(topic);
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list"."127.0.0.1:9092");
JavaStreamingContext jsc = new JavaStreamingContext(
new JavaSparkContext(conf),
Durations.seconds(3));
jsc.checkpoint("checkpoint");
Copy the code
Create a Kafka data stream
// Get the data stream
final JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
jsc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics
);
System.out.println("stream started!");
stream.print();
Copy the code
Stream.print () triggers reading of data
Convert postId and likes in Kafka to integers
JavaPairDStream<Integer, Integer> countDStream = stream
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<String, String> stringStringJavaPairRDD) throws Exception {
return stringStringJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, String>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<String, String> stringStringTuple2) throws Exception {
return new Tuple2<>(new Integer(stringStringTuple2._1), newInteger(stringStringTuple2._2)); }}); } }) .reduceByKey(Integer::sum);Copy the code
SQL statement that generates the number of likes
countDStream.foreachRDD(v -> {
v.foreach(record -> {
String sql = String.format("UPDATE `post` SET likes = likes + %s WHERE id=%d", record._2, record._1);
System.out.println(sql);
});
log.info("Batch data stream processed: {}", v);
});
Copy the code
Start flow calculation
jsc.start();
Copy the code
Add an interface to invoke the above code
@RequestMapping("/launch")
public String launch(a) {
sparkLikeService.launch();
return "test2";
}
Copy the code
To start the stream computing engine, visit /launch, and then visit the send-like interface to generate the likes data. Check the SQL statement generated by the console
You can see the SQL that has received the likes. You can use JPA to store the likes in the database
Local debugging
The Spark job can be debugged locally, but the following conditions must be met
- Spark needs to be started locally
- The address for connecting the job to the master must be local[*].
The resources
- www.iteblog.com/archives/13…
- Github.com/eBay/Spark/…
- Blog.csdn.net/guotong1988…
- www.4spaces.org/spark-map-f…
- Blog.csdn.net/wuxintdrh/a…