As a ten-billion-level flow real-time analysis and statistics system how can there be no PV/UV these two classic super Maria indicators, say 500 years ago, it is the two ancestors, cough… Sorry, back to the main text, big pig in the last article has introduced the small high-performance ETL program design and implementation, by now, our data has been landed on Hbase, and the log time has been written to Mysql, everything is already available, next we are going to improve the index, Let’s start with two classic metrics.
Program flow
Let’s first take a look at the calculation process of the whole program, please see the big picture:
-
Starting the calculation is our Driver entry point
-
Check if listening Redis has received program exit notification before starting calculation, if any program ends, otherwise proceed
-
Start by querying the average time point for the progress of the ETL LogHub log from our previous article
-
Switch is to judge whether the time difference between Loghub and the index time we calculated last time is enough. Generally, it is defined as 3 minutes later, because the time of Loghub will fluctuate a little
-
If not, Sleep for 30 seconds. You can control the Sleep range by yourself.
-
If yes, calculate the end time of the last indicator calculation ~ (LogHub time – 3 minutes log fluctuation)
-
After calculating the result of the update indicator and updating the calculation time of the indicator, go back to point 2.
Program implementation
Start at the DriverMain entrance
// Listen for redis exit messageswhile (appRunning) {
val dbClient = new DBJdbc(props.getProperty("jdbcUrl"// Offset val loghubTime = dbClient.query("loghub"). ToLocalDateTime. MinusMinutes (3) / / indicators calculated offset val indicatorTime = dbClient. Query ("indicator").tolocaldatetime // Val betweentime = Duration. Between (indicatorTime, loghubTime).toMinutes val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS") // The difference is enough time to carry out indicator operation, otherwise sleepif(betweenTimeMinutes >= 1) {app.run(Spark, indicatorTime, loghubTime)"offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset")}elseTimeunit.seconds.sleep (30)}}Copy the code
From the notes, the overall idea is quite clear.
Now let’s move on and see what the method in run does that’s interesting
conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE)
conf.set("TableInputFormat.SCAN_ROW_START", start)
conf.set("TableInputFormat.SCAN_ROW_START", end)
val logDS = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat2], classOf[ImmutableBytesWritable], ClassOf [Result]). Map(tp2 => hbaseutil.resulttomap (Tp2._2)). Map(map => {LogCase(// child)caseDt = dt (map.get("time").toLocalDateTimeStr(),
map.get("time").toLocalDate().toString
),
`type` = map.get("type"),
aid = map.get("aid"),
uid = map.get("uid"),
tid = map.get("tid"),
ip = map.get("ip")
)
}).toDS()
logDS.cache()
logDS.createTempView("log"New PV().run() new UV().run()Copy the code
Start and end are the time ranges of logs to be queried
The Hbase time range data is converted to a log table in SparkSQL
You can use this log table in UV and PV calculations
Let’s take a look at what’s going on in these two classic indicators:
spark.sql(
""" |SELECT | aid, | dt.date, | COUNT(1) as pv |FROM | log |GROUP BY | aid, | dt.date """.stripMargin)
.rdd
.foreachPartition(rows => {
val props = PropsUtils.properties("db")
val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
rows.foreach(row => {
dbClient.upsert(
Map(
"time" -> row.getAs[String]("date"),
"aid" -> row.getAs[String]("aid")
),
Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)),
"common_report"
)
})
dbClient.close()
})
Copy the code
Wow ran a look, big brother you this also write too simple bar
A normal PV algorithm, plus a foreachPartition operation, aggregates the result data upsert foreach row to our COMMON_report index table
Group by follows the dimension to be aggregated. The above is to count the PV of each article every day
Common_report = time+aid; pv = 0
The DDL of a table looks like this:
create table common_report
(
id bigint auto_increment primary key,
aid bigint not null,
pv int default 0 null,
uv int default 0 null,
time date not null,
constraint common_report_aid_time_uindex unique (aid, time)
);
Copy the code
It was all right.
DbClient. Upsert: dbClient. Upsert: dbClient. Upsert: dbClient. Upsert: dbClient.
INSERT INTO common_report (time, aid, pv)
VALUES ('2019-03-26'.'10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1;
Copy the code
Big pig that UV is how to achieve? If a user comes in after the first time today, it can’t count again.
This is simple, you can use Redis to remove weights, but we are using Hbase, what do we use it to do, let’s take a look at how it is implemented in UV:
val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
import spark.implicits._
logDS
.mapPartitions(partitionT => {
val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE)
val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes)
partitionT
.grouped(Consts.BATCH_MAPPARTITIONS)
.flatMap { tList =>
tList
.zip(hbaseClient.incrments(tList.map(md5)))
.map(tp2 => {
val log = tp2._1
log.copy(ext = EXT(tp2._2))
})
}
}).createTempView("uvTable")
spark.sql(
""" |SELECT | aid, | dt.date, | COUNT(1) as uv |FROM | uvTable |WHERE | ext.render = 1 |GROUP BY | aid, | dt.date """.stripMargin)
.rdd
.foreachPartition(rows => {
val props = PropsUtils.properties("db")
val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
rows.foreach(row => {
dbClient.upsert(
Map(
"time" -> row.getAs[String]("date"),
"aid" -> row.getAs[String]("aid")
),
Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)),
"common_report"
)
})
dbClient.close()
})
Copy the code
Ext. Render = 1; spark. SQL = PV;
CACHE_TABLE is an Hbase intermediate table for storing users’ UV tags. It can be configured with a TTL of 3 days or 2 days.
create 'CACHE_FOR_TEST',{NAME => 'info',TTL => '3 DAYS',CONFIGURATION => {'SPLIT_POLICY'= >'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy'.'KeyPrefixRegionSplitPolicy.prefix_length'= >'2'},COMPRESSION=>'SNAPPY'},SPLITS => ['20'.'40'.'60'.'80'.'a0'.'c0'.'e0']
Copy the code
What else?
Don’t panic, don’t panic, the pig slowly explained:
val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
Copy the code
You can pull the log out of the table by passing it as a parameter.
The following mapPartitions are interesting:
partitionT
.grouped(1000)
.flatMap { tList =>
tList
.zip(hbaseClient.incrments(tList.map(md5)))
.map(tp2 => {
val log = tp2._1
log.copy(ext = EXT(tp2._2))
})
}
Copy the code
In fact, it is to process the data of each partition, that is, to convert the data. Every time we receive a piece of data, we will go to Hbase for incrment once, and the returned result is render. The number of incrment times for the user today is the corresponding number.
What’s the use of that? I directly take out the data from Hbase GET and then judge whether there is such a user. If there is no such user, it will be the first time today. Then I PUT the user into Hbase and mark it, so easy.
In fact, we did so at the beginning, and later found that it is better to write business things together in SQL, easy to maintain, and incrment benefits a lot, because it is with transactions, can be modified by multiple threads.
In addition, you have also found that GET and PUT are two request operations, which can not guarantee transactions. There are tens of millions of indicators missing a few pieces of data. You have no idea how hard I had to find them.
Render = 1 render = 1 render = 1 Render = 2; render = 2; render = 2; render = 2; render = 2;
Let’s see what the Incrments method does
def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = {
if (incs.isEmpty) {
Seq[Long]()
} else {
require(incs.head.length == 32, "pk require 32 length")
val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) }
val results = new Array[Object](convertIncs.length)
table.batch(convertIncs.asJava, results)
results.array.indices.map(
ind =>
Bytes.toLong(
results(ind)
.asInstanceOf[Result]
.getValue(
Bytes.toBytes(family),
Bytes.toBytes(incs(ind).takeRight(24))
)
)
)
}
}
Copy the code
This method implements batch processing of incrment. As we tested it in the production environment online, batch processing is hundreds of times better than single processing, so this is why it is listed in mapPartitions, because batch data is only converted in this method. ForeachPartition is a batch processing operation, foreach and map are a single operation can not be used, we have used in the output report to Mysql.
The big pig has written such a long article without knowing it
To close the calculator, just send redis a Stop message
RedisUtil().getResource.publish("computeListenerMessage"."stop")
Copy the code
No more copying code, no more appearing to be supported by code.
Welfare complete project source code