I. Experiment description

Experimental data are generated by Java code, and the format of each data is as follows:

Time stamp Province city User ID AD ID

The experiment mainly fulfilled three requirements, namely

(1) Realize the real-time dynamic blacklist mechanism: block the users who click on an advertisement more than 100 times a day, and the ID of the blacklisted user is stored in Mysql

(2) Real-time statistics of the total click traffic of each advertisement in each province and province every day, and store it in Mysql

(3) The total clicks of advertisements in the last 1 minute shall be calculated every 10 seconds and displayed in HTML

Second, the experiment code

  • Data simulation generation
package cn.edu.neu.advertise;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.*;

/ * * *@author32098 * /
public class KafkaDataMocker {
    public static List<String> mockData(a) throws InterruptedException {
        List<String> list = new ArrayList<>();

        List<String> provinceList = Arrays.asList("Jiangxi"."Liaoning"."Zhejiang"."Guangdong"."Hunan"."Hubei"."Jilin"."Heilongjiang"."Fujian");
        List<String> cityList = Arrays.asList("Nanchang"."Shenyang"."Hangzhou"."Guangzhou"."Changsha"."Wuhan"."Changchun"."Harbin"."Xiamen");

        int len = provinceList.size();
        Random r = new Random();
        for(int i=0; i<r.nextInt(100); i++){
            Thread.sleep(10);
            int idx = r.nextInt(len);
            String province = provinceList.get(idx);
            String city = cityList.get(idx);
            String uid = "U" + r.nextInt(10);
            String aid = "Ad_" + r.nextInt(20);
            String record = System.currentTimeMillis() + "" + province + "" + city + "" + uid + "" + aid;
            list.add(record);
        }
        return list;
    }

    public static void main(String[] args) throws InterruptedException {
        HashMap<String, Object> pros = new HashMap<>(3);
        pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092");
        pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(pros);
        int ite = 0;
        while (true){
            ite = ite + 1;
            System.out.println("# # # # # # # # # # # # # # # #"+ ite +"# # # # # # # # # # # # # # # #");
            List<String> records = mockData();
            records.forEach(
                    elem -> {
                        System.out.println(elem);
                        ProducerRecord<String, String> record = new ProducerRecord<>("advertise-user".null, elem); producer.send(record); }); Thread.sleep(1000); }}}/* Start Kafka server: kafka-server-start.sh config/server.properties kafka-console-consumer.sh --bootstrap-server master:9092 --topic storm-topic --from-beginning */

Copy the code
  • MysqlUtil
package cn.edu.neu.advertise;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

/ * * *@author32098 * /
public class MysqlUtil {
    public static Connection getConnection(a){
        String url = "jdbc:mysql://master:3306/user_advertise? useUnicode=true&characterEncoding=utf-8";
        String user = "root";
        String password = "Hive@2020";
        try {
            return DriverManager.getConnection(url, user, password);
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
        return null; }}Copy the code
  • Real-time dynamic blacklist mechanism
package cn.edu.neu.advertise;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/ * * *@author32098 * Implement real-time dynamic blacklist mechanism: block users who click on an advertisement more than 100 times a day, and the blacklist user ID is stored in Mysql */
public class BlackListBoltA extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {
            String value = input.getStringByField("value");
            // You must ack, otherwise messages in Kafka will be consumed repeatedly
            collector.ack(input);
            System.out.println("Received from kafka: "+ value);
            String[] strs = value.split("");
            collector.emit(new Values(strs[0], strs[3], strs[4]));
        }catch(Exception e){ e.printStackTrace(); collector.fail(input); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("time_stamp"."uid"."aid")); }}Copy the code
package cn.edu.neu.advertise;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/ * * *@author32098 * Implement real-time dynamic blacklist mechanism: block users who click on an advertisement more than 100 times a day, and the blacklist user ID is stored in Mysql */
public class BlackListBoltB extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String ts = tuple.getStringByField("time_stamp");
        String uid = tuple.getStringByField("uid");
        String aid = tuple.getStringByField("aid");
        String value = "1";
        collector.ack(tuple);

        try {
            Connection conn = MysqlUtil.getConnection();
            assertconn ! =null;
            PreparedStatement ps = conn.prepareStatement("select uid from black_list where uid=?");
            ps.setString(1, uid);
            ResultSet rs = ps.executeQuery();
            if(! rs.next()){ String day =new SimpleDateFormat("yyyy-MM-dd").format(new Date(Long.parseLong(ts.trim())));
                ps = conn.prepareStatement(
                        "select * from user_advertise where day=? and uid=? and aid=?"
                );
                ps.setString(1, day);
                ps.setString(2, uid);
                ps.setString(3, aid);
                rs = ps.executeQuery();
                if(rs.next()){
                    PreparedStatement psA = conn.prepareStatement(
                            "update user_advertise set count = count + ? where day=? and uid=? and aid=?"
                    );
                    // psA.setInt(1, 1);
                    psA.setString(1, value);
                    psA.setString(2, day);
                    psA.setString(3, uid);
                    psA.setString(4, aid);
                    psA.executeUpdate();
                    psA.close();
                }else{
                    PreparedStatement psB = conn.prepareStatement("insert into user_advertise(day,uid,aid,count) values (? ,? ,? ,?) ");
                    psB.setString(1, day);
                    psB.setString(2, uid);
                    psB.setString(3, aid);
                    psB.setString(4, value);
                    psB.executeUpdate();
                    psB.close();
                }
                ps = conn.prepareStatement(
                        "select * from user_advertise where day=? and uid=? and aid=? and count>60"
                );
                ps.setString(1, day);
                ps.setString(2, uid);
                ps.setString(3, aid);
                rs = ps.executeQuery();
                if(rs.next()){
                    PreparedStatement psC = conn.prepareStatement("insert into black_list(uid) value(?) on duplicate key update uid=?");
                    psC.setString(1, uid);
                    psC.setString(2, uid);
                    psC.executeUpdate();
                    psC.close();
                }
                ps.close();
            }
            conn.close();
        } catch(SQLException throwables) { throwables.printStackTrace(); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}Copy the code
package cn.edu.neu.advertise;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/ * * *@author32098 * /
public class BlackListApp {
    private static final String BOOTSTRAP_SERVERS = "master:9092";
    private static final String TOPIC_NAME = "advertise-user";

    public static void main(String[] args) {

        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout".new KafkaSpout<>(getKafkaSpoutConfig()), 1);
        builder.setBolt("boltA".new BlackListBoltA(), 2).setNumTasks(2).shuffleGrouping("kafka_spout");
        builder.setBolt("boltB".new BlackListBoltB(), 2).setNumTasks(2).fieldsGrouping("boltA".new Fields("time_stamp"."uid"."aid"));

        // If the external parameter cluster is transmitted, it indicates that the online environment is started; otherwise, it indicates that the local environment is started
        if (args.length > 0 && "cluster".equals(args[0]) {try {
                StormSubmitter.submitTopology("Cluster-BlackListApp".new Config(), builder.createTopology());
            } catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Local-BlackListApp".newConfig(), builder.createTopology()); }}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(a) {
        return KafkaSpoutConfig.builder(BlackListApp.BOOTSTRAP_SERVERS, BlackListApp.TOPIC_NAME)
                // Except for the group ID, the following configurations are optional. The group ID must be specified; otherwise, an InvalidGroupIdException will be thrown
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                // Define a retry policy
                .setRetry(getRetryService())
                // The time interval for the scheduled submission of offsets. The default is 15s
                .setOffsetCommitPeriodMs(10 _000)
                .build();
    }

    /** * Define a retry policy *@return KafkaSpoutRetryService
     */
    private static KafkaSpoutRetryService getRetryService(a) {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)); }}Copy the code
  • Real-time statistics of the daily provinces and cities of the advertising click total flow
package cn.edu.neu.advertise;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/ * * *@author32098 * Real-time statistics of the total click traffic of each advertisement in each province every day, and store it in Mysql */
public class ClickCountBoltA extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {
            String value = input.getStringByField("value");
            // You must ack, otherwise messages in Kafka will be consumed repeatedly
            collector.ack(input);
            System.out.println("Received from kafka: "+ value);
            String[] strs = value.split("");
            collector.emit(new Values(strs[0], strs[1], strs[2], strs[4]));
        }catch(Exception e){ e.printStackTrace(); collector.fail(input); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("time_stamp"."province"."city"."aid")); }}Copy the code
package cn.edu.neu.advertise;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/ * * *@author32098 * /
public class ClickCountBoltB extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String ts = tuple.getStringByField("time_stamp");
        String province = tuple.getStringByField("province");
        String city = tuple.getStringByField("city");
        String aid = tuple.getStringByField("aid");
        String value = "1";
        collector.ack(tuple);

        String day = new SimpleDateFormat("yyyy-MM-dd").format(new Date(Long.parseLong(ts.trim())));

        try {
            Connection conn = MysqlUtil.getConnection();
            assertconn ! =null;
            PreparedStatement ps = conn.prepareStatement(
                    "insert into province_city_advertise(day,province,city,aid,count) values (? ,? ,? ,? ,?) on duplicate key update count=count+?"
            );
            ps.setString(1, day);
            ps.setString(2, province);
            ps.setString(3, city);
            ps.setString(4, aid);
            ps.setString(5, value);
            ps.setString(6, value);
            ps.executeUpdate();
            ps.close();
            conn.close();
        } catch(SQLException throwables) { throwables.printStackTrace(); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}Copy the code
package cn.edu.neu.advertise;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/ * * *@author32098 * /
public class ClickCountApp {
    private static final String BOOTSTRAP_SERVERS = "master:9092";
    private static final String TOPIC_NAME = "advertise-user";

    public static void main(String[] args) {

        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout".new KafkaSpout<>(getKafkaSpoutConfig()), 1);
        builder.setBolt("boltA".new ClickCountBoltA()).shuffleGrouping("kafka_spout");
        builder.setBolt("boltB".new ClickCountBoltB()).fieldsGrouping("boltA".new Fields("time_stamp"."province"."city"."aid"));

        // If the external parameter cluster is transmitted, it indicates that the online environment is started; otherwise, it indicates that the local environment is started
        if (args.length > 0 && "cluster".equals(args[0]) {try {
                StormSubmitter.submitTopology("Cluster-ClickCountApp".new Config(), builder.createTopology());
            } catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Local-ClickCountApp".newConfig(), builder.createTopology()); }}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(a) {
        return KafkaSpoutConfig.builder(ClickCountApp.BOOTSTRAP_SERVERS, ClickCountApp.TOPIC_NAME)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                .setRetry(getRetryService())
                .setOffsetCommitPeriodMs(10 _000)
                .build();
    }

    /** * Define a retry policy *@return KafkaSpoutRetryService
     */
    private static KafkaSpoutRetryService getRetryService(a) {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)); }}Copy the code
  • Total AD clicks in the last 1 minute are calculated every 10 seconds and displayed in HTML
package cn.edu.neu.advertise;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/ * * *@author32098 * Total clicks of ads in the last 1 minute, calculated every 10 seconds */
public class ClickNearlyMinuteBoltA extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {
            String value = input.getStringByField("value");
            // You must ack, otherwise messages in Kafka will be consumed repeatedly
            collector.ack(input);
            System.out.println("Received from kafka: "+ value);
            String[] strs = value.split("");
            collector.emit(new Values(strs[0], strs[4]));
        }catch(Exception e){ e.printStackTrace(); collector.fail(input); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("time_stamp"."aid")); }}Copy the code
package cn.edu.neu.advertise;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import org.apache.storm.windowing.TupleWindow;

import java.awt.*;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Array;
import java.text.SimpleDateFormat;
import java.util.*;

/ * * *@author32098 * Total clicks of ads in the last 1 minute, calculated every 10 seconds */
public class ClickNearlyMinuteBoltB extends BaseWindowedBolt {
    private OutputCollector collector;
    private String projectRoot;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.projectRoot = System.getProperty("user.dir");
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        Map<String, Integer> hmsClick = new TreeMap<>();

        for (Tuple tuple: tupleWindow.get()){

            String ts = tuple.getStringByField("time_stamp");
            String aid = tuple.getStringByField("aid");

            long timeStamp = Long.parseLong(ts.trim());
            String time = new SimpleDateFormat("HH:mm:ss").format(new Date(timeStamp));
            String[] hms = time.split(":");
            int s = (Integer.parseInt(hms[2) /10+1) *10;
            int m = Integer.parseInt(hms[1]);
            int h = Integer.parseInt(hms[0]);
            if(s == 60){
                m = m + 1;
                s = 0;
                if(m == 60){
                    h = h + 1;
                    if(h == 24){
                        h = 0;
                    }
                }
            }
            String hStr, mStr, sStr;
            if(h < 10){
                hStr = "0" + h;
            }else{
                hStr = String.valueOf(h);
            }
            if(m < 10){
                mStr = "0" + m;
            }else{
                mStr = String.valueOf(m);
            }
            if(s == 0){
                sStr = "00";
            }else{
                sStr = String.valueOf(s);
            }

            String hms_ = hStr+":"+mStr+":"+sStr;
            if(hmsClick.containsKey(hms_)){
                hmsClick.put(hms_, hmsClick.get(hms_)+1);
            }else{
                hmsClick.put(hms_, 1);
            }
        }
        String file = projectRoot + "/src/main/java/cn/edu/neu/advertise/advertise_click_nearly_minute.json";
        try {
            PrintWriter out = new PrintWriter(new FileWriter(new File(file), false));
            StringBuffer jsonStr = new StringBuffer("[");
            hmsClick.forEach(
                    (xtime, yclick) -> {
                        String jsonElem = "{\"xtime\":\""+xtime+"\",\"yclick\":\""+yclick+"\}," "; System.out.println(jsonElem); jsonStr.append(jsonElem); }); jsonStr.deleteCharAt(jsonStr.length()-1);
            jsonStr.append("]");
            out.println(jsonStr.toString());
            out.flush();
        } catch(IOException e) { e.printStackTrace(); }}@Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}}Copy the code

Inherit from BaseWindowedBolt to implement sliding Windows.

package cn.edu.neu.advertise;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.topology.base.BaseWindowedBolt.Duration;

import java.util.concurrent.TimeUnit;

/ * * *@author32098 * /
public class ClickNearlyMinuteApp {
    private static final String BOOTSTRAP_SERVERS = "master:9092";
    private static final String TOPIC_NAME = "advertise-user";

    public static void main(String[] args) {

        final TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_spout".new KafkaSpout<>(getKafkaSpoutConfig()), 1);
        builder.setBolt("boltA".new ClickNearlyMinuteBoltA()).shuffleGrouping("kafka_spout");
        // window
        builder.setBolt("boltB".new ClickNearlyMinuteBoltB().withWindow(new Duration(60, TimeUnit.SECONDS), new Duration(10, TimeUnit.SECONDS)),1).setNumTasks(1).fieldsGrouping("boltA".new Fields("time_stamp"."aid"));

        Config config = new Config();
        config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 70000);
        // If the external parameter cluster is transmitted, it indicates that the online environment is started; otherwise, it indicates that the local environment is started
        if (args.length > 0 && "cluster".equals(args[0]) {try {
                StormSubmitter.submitTopology("Cluster-ClickNearlyMinuteApp", config, builder.createTopology());
            } catch(AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); }}else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Local-ClickNearlyMinuteApp", config, builder.createTopology()); }}private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(a) {
        return KafkaSpoutConfig.builder(ClickNearlyMinuteApp.BOOTSTRAP_SERVERS, ClickNearlyMinuteApp.TOPIC_NAME)
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                .setRetry(getRetryService())
                .setOffsetCommitPeriodMs(10 _000)
                .build();
    }

    /** * Define a retry policy *@return KafkaSpoutRetryService
     */
    private static KafkaSpoutRetryService getRetryService(a) {
        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)); }}Copy the code

Java. Lang. IllegalArgumentException: Window duration (length + sliding interval) the value of 70000 is wining the topology. The message. The timeout. Value 30000 secs (illegal abnormal parameters, Config. Put (config.topology_message_timeout_secs, 70000);

<! DOCTYPEhtml>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>The total number of ads clicked in the last one minute is calculated every 10 seconds</title>
<! -- <script src="echarts.js"></script>-->
<! -- < script type = "text/javascript" SRC = "jquery - 1.9.0. Min. Js" > < / script > -- >
    <script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script>
    <script type="text/javascript" src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js"></script>
</head>
<body>

<div id="display" style="height: 450px; width:800px"></div>
<script>
    var myChart = echarts.init(document.getElementById("display"));
    setInterval(function () {
        $.getJSON("advertise_click_nearly_minute.json".function(data){
            var x = [];
            var y = [];
            $.each(data,function (i,obj) {
                x.push(obj.xtime)
                y.push(obj.yclick)
            });
            var option = {
                xAxis: {type:"category".data:x
                },
                yAxis: {type:"value",},series: [{data:y,
                    type:"line"}}; myChart.setOption(option) }) },5000)
</script>

</body>
</html>

Copy the code
[{"xtime":"22:28:20"."yclick":"92"},
{"xtime":"22:28:30"."yclick":"110"},
{"xtime":"22:28:40"."yclick":"105"},
{"xtime":"22:28:50"."yclick":"99"},
{"xtime":"22:29:00"."yclick":"88"},
{"xtime":"22:29:10"."yclick":"60"}]Copy the code

3. Experimental results

  • Requirements (1)

  • (2)

  • (3)

[video (video – iGuWXioS – 1617857099744) (type – bilibili) (url-player.bilibili.com/player.html…)”