The Transform operation

The transform operation allows any RDD to RDD function to be applied to DStream. It can be used to apply any RDD operation that is not exposed in the DStream API. For example, the ability to combine each batch in a data stream with other data sets is not directly exposed in the DStream API. However, you can easily do this using transform. This makes for a very powerful possibility. For example, real-time data cleaning can be done by combining input data streams with pre-calculated spam (which can also be generated with Spark) and then filtering against it.

I. Case: filtering users who brush ads,

1.1. Simulate a blacklist

1.1.1. Simulation users click on advertisements on the website, but there is the phenomenon of brushing advertisements, so the click flow of such users is filtered, and such users are added to the blacklist.

// Blacklist list (user, Boolean), true indicates that the user is in the blacklist. final List<Tuple2<String, Boolean>> blockList = new ArrayList<Tuple2<String, Boolean>>(); BlockList. Add (new Tuple2<String, Boolean>("lisi", true));Copy the code

1.1.1. Turn the blacklist into an RDD.

// Blacklist RDD (user, Boolean) JavaPairRDD<String, Boolean> blackRDD = jssc.sparkContext().parallelizepairs (blockList);Copy the code

1.2, // Get simulated click log from specified port: “date user”

JavaReceiverInputDStream<String> adsClickLogDStream = jssc.sockettExtStream ("192.168.1.224", 9999);Copy the code

1.3. Format the data in the data stream

The log format is date user. In order to facilitate join operation with blacklist RDD in the follow-up work, the log format is changed to (user, log).

log: Date user = (user, log) // To join RDD in data stream and RDD in blacklist Format data in RDD (user, log) JavaPairDStream<String, String> userAdsClickLogDStream = adsClickLogDStream.mapToPair( new PairFunction<String, String, String>() { private static final long serialVersionUID = 1L; @override public Tuple2<String, String> Call (String log) throws Exception {// Convert the log format, "Date user" becomes (user, log) return new Tuple2<String, String>(log.split(" ")[1], log); }});Copy the code

1.4. Filter user logs in the blacklist.The transform operation is used here

// Perform the transform operation in real time Each batch of RDD, join operation with the blacklist RDD JavaDStream < String > validAdsClickLogDStream = userAdsClickLogDStream. The transform (new Function<JavaPairRDD<String,String>, JavaRDD<String>>() { private static final long serialVersionUID = 1L; @Override public JavaRDD<String> call(JavaPairRDD<String, String> userAdsClickLogRDD) throws Exception {// Join the blacklist RDD with each Batch RDD // // String is a user, string is a log, is it Optional //(user, (log,)) boolean)) JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD = userAdsClickLogRDD.leftOuterJoin(blackRDD); JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() { /* * public interface Function<T1, R> extends Serializable { R call(T1 v1) throws Exception; } */ private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception {//(user, (log, If (tupl._2._2.isPresent () && tupl._2._. get()) {return false; }else { return true; }}}); JavaRDD<String> validAdsCiickLogRDD = Filteredrdd.map (new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() { private static final long serialVersionUID = 1L; @Override public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception { return tuple._2._1; }}); Return validAdsCiickLogRDD; }});Copy the code

1.4.1 In transfrom operation, join operation is performed on RDD in each batch

// Join the blacklisted RDD with each batch of RDD. Not every user is in the blacklisted RDD. Therefore, use join directly String = user,string = log, Optional // (user, (log, boolean)) JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> joindRDD = userAdsClickLogRDD.leftOuterJoin(blackRDD);Copy the code

1.4.2 Filter the result after blacklist and RDDjoin in Batch

JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> filteredRDD = joindRDD.filter(new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() { /* * public interface Function<T1, R> extends Serializable { R call(T1 v1) throws Exception; } */ private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception {//(user, (log, If (tupl._2._2.isPresent () && tupl._2._. get()) {return false; }else { return true; }}});Copy the code

1.4.3 There are only normal users who have not been filtered. Use the map function to convert them to the format we want. We just click log

JavaRDD<String> validAdsCiickLogRDD = Filteredrdd.map (new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() { private static final long serialVersionUID = 1L; @Override public String call( Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception { return tuple._2._1; }}); Return validAdsCiickLogRDD;Copy the code

1.5, start,

Behind / / it can be written to the message queue middleware in Kafka, as an effective AD click advertising billing service data validAdsClickLogDStream. Print (); jssc.start(); jssc.awaitTermination(); jssc.close();Copy the code

scala

package com.chb.scala import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds import org.apache.spark.SparkContext object BlackListFilter { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("BlackListFilter") .setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val blackList = Array((" Jack ", true), (" Rose ", True)) / / set the parallelism val blackListRDD = SSC. SparkContext. Parallelize (blackList, Var st = ssc.socketTextStream("192.168.179.5", 8888) boolean==> val users = st.map { line => (line.split(" ")(1), Line)} val validRddDS = users.transform(ld => {leftOuterJoin (k, v) join (k,w) ==> (k, v, v) LeftOuterJoin (blackListRDD) val fRdd = lJoinRdd. filter(tuple => {println(tuple)) If (tupl._2._2.getorelse (false)) {false} else {true}}) val validRdd = frdd.map (tuple => tupl._2._1) validRdd }) validRddDS.print() ssc.start() ssc.awaitTermination() } }Copy the code