preface

Every year on The Tmall Double 11 shopping festival, there will be a huge real-time battle screen, showing the current sales situation. Behind this cool page, there is actually a very powerful technical support, and this scenario is actually real-time report analysis.


1. Overview of business requirements

Simulated transaction order data, sent to distributed message queue Kafka, real-time consumption transaction order data for analysis and processing, the business flow chart is as follows:

Real-time consumption transaction order data from Kafka, according toDifferent dimensionsReal-time statistics [Sales order volume], the final Report Report results are storedMySQLDatabase;

Ii Project Code

1. Simulate trading data

Write a program to generate real-time transaction order data, use Json4J class library to convert data into JSON characters, and send Kafka Topic, the code is as follows:

/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = order entity class = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
package cn.itcast.spark.mock

/** * Order entity Class (Case Class) *@paramOrderId orderId *@paramUserId userId *@paramOrderTime Order date time *@paramIP Indicates the single IP address *@paramOrderMoney Order amount *@paramOrderStatus orderStatus */
case class OrderRecord( orderId: String, userId: String, orderTime: String, ip: String, orderMoney: Double, orderStatus: Int )


/ / = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = simulation order data = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
package cn.itcast.spark.mock

import java.util.Properties

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json

import scala.util.Random

/ * * * simulation data production orders are sent to the Kafka * Topic in the Topic of each data Message type String, send data in JSON format * data conversion: * Convert the Order class instance object to JSON-formatted string data (you can use the JSON4S class library) */
object MockOrderProducer {
    
    def main(args: Array[String]): Unit = {
        
        var producer: KafkaProducer[String, String] = null
        try {
            Kafka Client Producer Configuration information
            val props = new Properties()
            props.put("bootstrap.servers"."node1.itcast.cn:9092")
            props.put("acks"."1")
            props.put("retries"."3")
            props.put("key.serializer", classOf[StringSerializer].getName)
            props.put("value.serializer", classOf[StringSerializer].getName)
            
            // 2. Create a KafkaProducer object and pass in configuration information
            producer = new KafkaProducer[String, String](props)
            
            // Random number instance object
            val random: Random = new Random()
            // Order status: order open 0, order cancelled 1, order closed 2, order completed 3
            val allStatus =Array(0.1.2.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0)
            
            while(true) {// The number of orders generated by each loop simulation
                val batchNumber: Int = random.nextInt(1) + 5
                (1 to batchNumber).foreach{number =>
                    val currentTime: Long = System.currentTimeMillis()
                    val orderId: String = s"${getDate(currentTime)}%06d".format(number)
                    val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
                    val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")
                    val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
                    val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
                    // 3. Order record data
                    val orderRecord: OrderRecord = OrderRecord(
                        orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
                    )
                    // Convert to JSON format data
                    val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
                    println(orderJson)
                    // 4. Build the ProducerRecord object
                    val record = new ProducerRecord[String, String]("orderTopic", orderJson)
                    Def send(messages: KeyedMessage[K,V]*)
                    producer.send(record)
                }
                Thread.sleep(random.nextInt(100) + 500)}}catch {
            case e: Exception => e.printStackTrace()
        }finally {
            if(null! = producer) producer.close() } }/**================= obtain the current time =================*/
    def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
        val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
        val formatDate: String = fastFormat.format(time)  // Format the date
        formatDate
    }
    
    /**================= Obtain the random IP address =================*/
    def getRandomIp: String = {
        / / IP range
        val range: Array[(Int, Int)] = Array(
            (607649792.608174079), / / 36.56.0.0-36.63.255.255
            (1038614528.1039007743), / / 61.232.0.0-61.237.255.255
            (1783627776.1784676351), / / 106.80.0.0-106.95.255.255
            (2035023872.2035154943), / / 121.76.0.0-121.77.255.255
            (2078801920.2079064063), / / 123.232.0.0-123.235.255.255
            (-1950089216, -1948778497),/ / 139.196.0.0-139.215.255.255
            (-1425539072, -1425014785),/ / 171.8.0.0-171.15.255.255
            (-1236271104, -1235419137),/ / 182.80.0.0-182.92.255.255
            (-770113536, -768606209),/ / 210.25.0.0-210.47.255.255
            (-569376768, -564133889) / / 222.16.0.0-222.95.255.255
        )
        // Random number: IP address range subscript
        val random = new Random()
        val index = random.nextInt(10)
        val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)

        // Convert an IP address of type Int to IPv4
        number2IpString(ipNumber)
    }
    
    / * * = = = = = = = = = = = = = = = = = converts Int type IPv4 addresses to a string type = = = = = = = = = = = = = = = = = * /
    def number2IpString(ip: Int): String = {
        val buffer: Array[Int] = new Array[Int](4)
        buffer(0) = (ip >> 24) & 0xff
        buffer(1) = (ip >> 16) & 0xff
        buffer(2) = (ip >> 8) & 0xff
        buffer(3) = ip & 0xff
        // Return the IPv4 address
        buffer.mkString(".")}}Copy the code

2. Create a Maven module

Create Maven module and add related dependencies as follows:

    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <properties>
        <scala.version>2.1112.</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4. 5</spark.version>
        <hadoop.version>2.6. 0-cdh516.2.</hadoop.version>
        <kafka.version>2.0. 0</kafka.version>
        <mysql.version>8.019.</mysql.version> </properties> <dependencies> <! Scala -lang</groupId> <artifactId> Scala -library</artifactId> <version>${scala.version}</version> </dependency> <! -- Spark Core dependency --> <dependency> <groupId>org.apache. Spark </groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <! Spark SQL dependency --> <dependency> <groupId>org.apache. Spark </groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <! -- Structured Streaming + Kafka dependency --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka0- 10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <! Hadoop Client dependency --> <dependency> <groupId>org.apache. Hadoop </groupId> <artifactId> Hadoop Client </artifactId> <version>${hadoop.version}</version> </dependency> <! --> <dependency> <groupId>org.apache. Kafka </groupId> <artifactId> Kafka -clients</artifactId> <version>2.0. 0</version> </dependency> <! Lionsoul </groupId> <artifactId> ip2Region </artifactId> <version>1.72.</version> </dependency> <! --> <dependency> <groupId> MySQL </groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <! Alibaba </groupId> <artifactId> Fastjson </artifactId> <version>1.247.</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <! <plugins> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF- 8 -</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2. 0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
Copy the code

The project structure is as follows:

3. Core code

RealTimeOrderReport.java

package cn.itcast.spark.report

import java.util.concurrent.TimeUnit

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.expressions.{UserDefinedAggregateFunction, UserDefinedFunction}
import org.apache.spark.sql.types.{DataType, DataTypes}
import org.lionsoul.ip2region.{DataBlock, DbConfig, DbSearcher}

	def printToConsole(dataFrame: DataFrame) = {
		dataFrame.writeStream
  		.format("console")
  		.outputMode(OutputMode.Update())
  		.option("numRows"."50")
  		.option("truncate"."false")
  		.start()
	}



	def main(args: Array[String]): Unit = {
		//1. Obtain the Spark instance object
		val spark: SparkSession = SparkSession.builder()
			.appName("isDemo")
			.master("local[3]")
			.config("spark.sql.shuffle.partitions"."3")
			.getOrCreate()
		import spark.implicits._

		val dataFrame: DataFrame = spark.readStream
			.format("kafka")
			.option("kafka.bootstrap.servers"."node1.itcast.cn:9092")
			.option("subscribe"."orderTopic")
			.load()
			.selectExpr("CAST (value AS STRING)")
// printToConsole(dataFrame)


val ip_to_region: UserDefinedFunction = udf((ip: String) => {
	// 1. Create DbSearch and specify the location of the data dictionary file
	val dbSearcher = new DbSearcher(new DbConfig(), "src/main/dataset/ip2region.db")
	// 2. Pass the IP address and obtain the data
	val dataBlock: DataBlock = dbSearcher.btreeSearch(ip)
	// 3. Get and parse provinces and cities
	val region: String = dataBlock.getRegion
	/ / println (region) / / China | | | 0 hainan province haikou city | education network
	val Array(_, _, pronvice, city, _) = region.split("\ \ |")
	(pronvice, city)
})
		val frame: DataFrame = dataFrame
			.select(
				get_json_object($"value"."$.ip").as("ip"),
				get_json_object($"value"."$.orderMoney")
					.cast(DataTypes.createDecimalType(10.2))
					.as("money"),
				get_json_object($"value"."$.orderStatus").as("status")
			)
			.filter($"status"= = =0)
			.withColumn("region", ip_to_region($"ip"))
			.select(
				$"region._1".as("province"),
				$"region._2".as("city"),
				$"money"
			)


// printToConsole(frame)

/ / / * *
// * Order entity Class (Case Class)
// * @param orderId orderId
// * @param userId userId
// * @param orderTime orderTime
// * @param IP IP address
// * @param orderMoney order amount
// * @param orderStatus orderStatus
//			*/
//

// printToConsole(dframe)
		//SELECT "country" as type, SUM(money) as totalMoney FROM tmp_view
		//SELECT province as type, SUM(money) as totalMoney FROM tmp_view GROUP BY province
		//SELECT city as type, SUM(money) as totalMoney FROM (SELECT * FROM tmp_view WHERE city in (" Beijing ", "Shanghai "," Shenzhen ", "Guangdong ", "Hangzhou", "chengdu", "nanjing", "wuhan", "xian") t GROUP BY tc ity
		frame.createOrReplaceTempView("tmp_view")
		val f: DataFrame = spark.sql(
			""" |SELECT "countries" as type, SUM(money) as totalMoney FROM tmp_view """.stripMargin)
		val f2: DataFrame = spark.sql(
			""" |SELECT province as type, SUM(money) as totalMoney FROM tmp_view GROUP BY province """.stripMargin)
		val f3: DataFrame = spark.sql(
			""" |SELECT city as type, SUM(money) as totalMoney FROM (SELECT * FROM tmp_view WHERE city in ("The Beijing municipal","Shanghai","shenzhen","guangzhou","hangzhou","chengdu","nanjing","Wuhan city","Xi 'an"))t GROUP BY t.city """.stripMargin)
// printToConsole(f3)
		saveToMySQL(f,"total")
		saveToMySQL(f2,"totalprovince")
		saveToMySQL(f3,"totalcity")


		spark.streams.awaitAnyTermination()
	}
	def saveToMySQL(streamDF:DataFrame,reportType:String)={
		streamDF.writeStream
  		.outputMode(OutputMode.Complete())
  		.queryName(s"${reportType}")
  		.foreachBatch((batchDF:DataFrame,batchId:Long)=>{
				batchDF.coalesce(1)
  				.write.mode(SaveMode.Overwrite)
  				.format("jdbc")
  				.option("url"."jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true")
  				.option("driver"."com.mysql.cj.jdbc.Driver")
  				.option("user"."root")
  				.option("password"."123456")
  				.option("dbtable",s"db_spark.tb_order${reportType}")
  				.save()
			}
			)
			.option("checkpointLocation", s"datas/spark/structured-ckpt-${System.currentTimeMillis()}")
			.start()
	}
}

Copy the code

OrderRecord.scala

package cn.itcast.spark.mock

/** * Order entity Class (Case Class) *@paramOrderId orderId *@paramUserId userId *@paramOrderTime Order date time *@paramIP Indicates the single IP address *@paramOrderMoney Order amount *@paramOrderStatus orderStatus */
case class OrderRecord( orderId: String, userId: String, orderTime: String, ip: String, orderMoney: Double, orderStatus: Int )

Copy the code

conclusion

Summary: Real-time report analysis is one of the report statistics schemes adopted by many companies in recent years, among which the most important application is real-time large screen display. The real-time results obtained by streaming calculation are directly pushed to the front-end application to display the real-time transformation of important indicators.

The most typical case is the “Double 11” activity on Taobao. In the annual Shopping festival of “Double 11”, apart from the crazy shopping, the most eye-catching thing is the non-stop jumping transaction amount on the big screen. In the whole computing link, including ordering and purchasing of Tmall transactions, data collection, data calculation and data verification, the whole link time compression displayed on the big screen of Singles’ Day is less than 5 seconds, and the peak computing performance is up to 300,000 orders/second. The calculation backup of multiple link streams ensures that nothing goes wrong.

The double eleven real-time report analysis of the actual combat is mainly written in SQL, has not been written in DSL, which is to be improved. The tmall double 11 real-time report sharing here, like partners welcome a key three even!!Copy the code