UpdateStateByKey operation

The updateStateByKey operation allows you to maintain an arbitrary state while continuously updating with new information. To use this, you will have to do two steps. Define state – The state can be any data type. Define status update functionality – Use functions to specify how to update the status with the previous state and specify new values from the input stream. Within each batch, Spark applies status updates to all existing keys, whether or not they have new data in the batch. If the update function returns None, the key-value pair is eliminated. Let’s give an example. Suppose you want to keep a run count of each word you see in your text data stream. Here, the run count is the state, which is an integer. We define the update function as: \

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); }};Copy the code

Pairs (word, 1) in Pairs (word, 1) were counted for each word in wordcount

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
Copy the code

UpdateFunction is called for each word, where newValues has the order of 1 (from (word, 1) pairs) and runningCount has the previous count.

Note that using updateStateByKey requires configuring the checkpoint directory, which is discussed in detail in checkPointing.

2. WordCount

2.1. Set the checkpoint directory

To use the updateStateByKey operator, you must set a checkpoint directory and enable the checkpoint mechanism jssc.checkpoint("hdfs://spark001:9000/wordcount_checkpoint");Copy the code

2.2. Obtain data through socket

        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("spark001", 9999);
Copy the code

2.3, participle, (word, 1)

JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); }}); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); }});Copy the code

2.4 |UpdateStateByKey operation

JavaPairDStream<String, updateStateByKey <String, Integer> wordcounts = pairs. UpdateStateByKey (// Optional represents a state that may or may not have existed before Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){ private static final long serialVersionUID = 1L; // In fact, this function is called every batch calculation for each word. The first parameter,values, corresponds to the new value of the key in the batch. (hello,1) (hello,1) (Hello,1) then (1,1) is passed in. The second argument represents the state prior to the key Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {// Define a global count Integer newValue = 0; if(state.isPresent()){ newValue = state.get(); } for(Integer value : values){ newValue += value; } return Optional.of(newValue); }});Copy the code

Java complete code

package com.chb.spark.streaming; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; public class UpdateStateByKeyWordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]"); JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(5)); To use the updateStateByKey operator, you must set a checkpoint directory and enable the checkpoint mechanism jssc.checkpoint("hdfs://spark001:9000/wordcount_checkpoint"); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("spark001", 9999); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); }}); JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); }}); JavaPairDStream<String, updateStateByKey <String, Integer> wordcounts = pairs. UpdateStateByKey (// Optional represents a state that may or may not have existed before Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){ private static final long serialVersionUID = 1L; // In fact, this function is called every batch calculation for each word. The first parameter,values, corresponds to the new value of the key in the batch. (hello,1) (hello,1) (Hello,1) then (1,1) is passed in. The second argument represents the state prior to the key Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {// Define a global count Integer newValue = 0; if(state.isPresent()){ newValue = state.get(); } for(Integer value : values){ newValue += value; } return Optional.of(newValue); }}); wordcounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); }}Copy the code

The Scala version uses Spark to calculate product attention in real time

Simulated data

9.896811 goodsID - 110: : : : : : : 0:0 goodsID - 38: : 1:8.256732: : 0: : goodsID - 108:1:4: : : 4.9478645:0: : 1 GoodsID - 183: : 1:9.024677: : 0: : goodsID - 166:2:2: : : 9.436835:1: : goodsID - 140:0:3: : : 8.426323:0:0 GoodsID - 129:2: : : : 0.1236406:1: : : 0 goodsID - 171:1: : : 6.1114955:1:2 goodsID - 178: : : : : 5.1479044:0:0 GoodsID - 66: : : : : 6.0941777:1: goodsID - 112:2:3: : : 7.1715264:1: : 0 goodsID - 10: : : : : 7.8836794:1:0 GoodsID - 180: : 1: : : 6.3729606:1:2Copy the code

Scala code

package com.chb.sparkstreaming.goods import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import java.text.NumberFormat /** */ object GoodQuota {def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Goodquota").setMaster("local[*]") val ssc = new StreamingContext (sparkConf, Seconds (5)). SSC sparkContext. SetCheckpointDir (" checkDir ") / / specified port monitoring, Val datas = ssc.socketTextStream("localhost", Val quotaDStream = datas.map(line => {val Arrays = line.split("::") val goodId = Arrays (0) val lookTimes = arrays(1) val duration = arrays(2) val iscollect = arrays(3) val num = arrays(4) That is to say, there is a weight for browsing times, browsing time, purchasing strength and whether to collect the product. Different companies may think different options have different weights. You may think browsing time is more important, while another person thinks browsing times is more important, so we have agreed on this calculation formula in advance. We agree that the weight of browsing times is 0.8, the weight of browsing time is 0.6, and the strength of collection and purchase is 1. Var quota:Double = looktimes.todouble *0.8 + duration.todouble *0.6 + iscollect.todouble + num.todouble Keep three decimal val numberFormat = numberFormat. GetInstance numberFormat. SetMinimumFractionDigits quota = (3) Numberformat.format (quota).todouble (goodId, quota)}) // Update attention // Since streaming data is generated every minute and every second, the calculated attention value is also changing, so the status value needs to be updated. Use updateStateByKey to do this. That's the hard part here. // Perform Transformation level processing on the initial DStream, such as programming higher-order functions such as map and filter, to perform specific data computations. In this example, updateStateByKey is used to update historical status by Batch Interval. In this example, checkPoint is used to save the value of the parent RDD. You can also try using mapWithState to update values after Spark1.6.X. val updateDStream = quotaDStream.updateStateByKey((values:Seq[Double], State :Option[Double]) => {var updateValue = state.getorelse (0.0) for(value<-values){updateValue += value} Transform val sortDStream = updatedStream. transform(RDD => { rdd.map(tuple=>(tuple._2, tuple._1)).sortByKey(false).map(tuple=>(tuple._2, tuple._1)) rdd }) sortDStream.print(5)*/ updateDStream.foreachRDD(rdd => { val sortRDd = rdd.map(tuple=>(tuple._2, tuple._1)).sortByKey(false).map(tuple=>(tuple._2, Take (5) for(tuple<-ts) {println(" item: "+ tuple._1 +" "+ tuple._2)}}) /* updatedStream. foreachRDD(RDD => {val ts = RDD. The tuple. _1). SortByKey (false). The map (tuple = > (a tuple) _2, tuple. _1)), take (5) (a tuple) < - ts {println (" commodity: "+ tuple. _1 +" attention: "+ tuple) _2)}}) * / SSC. Start (SSC). The awaitTermination ()}}Copy the code