1. How to process configuration file && table data in the production environment
Configuration file, or configuration tables, general is put in the db online, such as mysql relational database, or the background rd directly send you a file, the data quantity than the big table offline data warehouse is small, so in this case, the common practice is to small tables, or small files to broadcast, then an example here, The use of broadcast tables solves IP address mapping problems
Address: data link: pan.baidu.com/s/1FmFxSrPI… Extraction code: Hell
2. Log Analysis Case 1
2.1 Data Description
HTTP. Log:
Logs generated when users visit websites. The log format is timestamp, IP address, access address, access data, and browser information. For example:
IP. Dat: IP segment data, which records the locations corresponding to some IP segments. The total number of IP segments is about 110,000
The file location is data/http.log and data/ip.dat
Link: pan.baidu.com/s/1FmFxSrPI… Extraction code: Hell
Requirements: Convert IP to address in the http.log file. If 122.228.96.111 is changed to Wenzhou, and the total visits of each city are counted
2.2. The implementation ideas and codes are as follows
There are three key points. The key information of http.log is the IP address, so according to the principle of data simplification, only THE IP address can be read. In addition, when IP mapping is compared, the IP address mapping file is sorted. Find the address and map it to the address.
package com.hoult.work
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
/** * Data source: 1. IP address access log 2. IP address mapping table * Needs to broadcast the mapping table and convert the address to long type for comparison */
object FindIp {
def main(args: Array[String) :Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName(this.getClass.getCanonicalName)
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val ipLogsRDD = sc.textFile("data/http.log")
.map(_.split("\ \ |") (1))
val ipInfoRDD = sc.textFile("data/ip.dat").map {
case line: String= > {val strSplit: Array[String] = line.split("\ \ |")
Ip(strSplit(0), strSplit(1), strSplit(7))}}val brIPInfo = sc.broadcast(ipInfoRDD.map(x => (ip2Long(x.startIp), ip2Long(x.endIp), x.address))collect())
// The result of the association is RDD
ipLogsRDD
.map(x => {
val index = binarySearch(brIPInfo.value, ip2Long(x))
if(index ! =- 1 )
brIPInfo.value(index)._3
else
"NULL"
}).map(x => (x, 1))
.reduceByKey(_ + _)
.map(x => S "city:${x._1}, Page Views:${x._2}")
.saveAsTextFile("data/work/output_ips")}// IP is changed to long
def ip2Long(ip: String) :Long = {
val fragments = ip.split("[the]")
var ipNum = 0L
for (i <- 0 until fragments.length) {
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
// Dichotomy matches IP rules
def binarySearch(lines: Array[(Long.Long.String)], ip: Long) :Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
return middle
if (ip < lines(middle)._1)
high = middle - 1
else {
low = middle + 1}}- 1}}case class Ip(startIp: String, endIp: String, address: String)
Copy the code
The screenshot is as follows:
3. Log Analysis Case 2
3.1 Data Description
Log format: IP Hit ratio (Hit/Miss) Response time Request Time Request Method Request URL Request Protocol Status code Response size Referer User agent
Log file location: data/cdn.txt
Data case:
Task:
2.1. Calculate the number of independent IP addresses
2.2. Count the number of independent IP addresses for each video (video logo: *.mp4 can be found in some log files, representing a video file)
2.3. Collect the traffic of each hour in a day
AggregateByKey can be used to improve the performance of video access IP address. AggregateByKey can be used to improve performance of video access IP address
3.2 Implementation Code
package com.hoult.work
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
/** * select * from RDD, ** * from RDD, ** * from RDD, ** ** * Count the number of independent IP addresses * 2. Count the number of independent IP addresses for each video (the video logo: *.mp4 can be found in some log files, representing a video file) * 3. Collect traffic statistics for each hour of a day */
object LogAnaylse {
def main(args: Array[String) :Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName(this.getClass.getCanonicalName)
.getOrCreate()
val sc = spark.sparkContext
val cdnRDD = sc.textFile("data/cdn.txt")
// Compute independent IPS
// aloneIPs(cdnRDD.repartition(1))
// Number of independent IP addresses per video
// videoIPs(cdnRDD.repartition(1))
// Traffic per hour
hourPoor(cdnRDD.repartition(1))}/** * Number of independent IP addresses */
def aloneIPs(cdnRDD: RDD[String]) = {
// Matches the IP address
val IPPattern = "((? : (? :25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]? \\d)))\\.) {3} (? :25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]? \\d))))".r
val ipnums = cdnRDD
.flatMap(x => (IPPattern findFirstIn x))
.map(y => (y,1))
.reduceByKey(_+_)
.sortBy(_._2,false)
ipnums.saveAsTextFile("data/cdn/aloneIPs")}/** * Number of independent IP addresses */
def videoIPs(cdnRDD: RDD[String]) = {
// Matches the HTTP response code and request data size
val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r
//[15/Feb/2017:11:17:13 +0800] Matches 2017:11 based on playback statistics per hour
val timePattern = ". * (2017) : ([0-9] {2}) : [0-9] {2} : [0-9] {2}. *".r
import scala.util.matching.Regex
// Entering paste mode (ctrl-D to finish)
def isMatch(pattern: Regex, str: String) = {
str match {
case pattern(_*) => true
case_ = >false}}def getTimeAndSize(line: String) = {
var res = ("".0L)
try {
val httpSizePattern(code, size) = line
val timePattern(year, hour) = line
res = (hour, size.toLong)
} catch {
case ex: Exception => ex.printStackTrace()
}
res
}
val IPPattern = "((? : (? :25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]? \\d)))\\.) {3} (? :25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]? \\d))))".r
val videoPattern = "([0-9]+).mp4".r
val res = cdnRDD
.filter(x => x.matches(".*([0-9]+)\\.mp4.*"))
.map(x => (videoPattern findFirstIn x toString,IPPattern findFirstIn x toString))
.aggregateByKey(List[String]())(
(lst, str) => (lst :+ str),
(lst1, lst2) => (lst1 ++ lst2)
)
.mapValues(_.distinct)
.sortBy(_._2.size,false)
res.saveAsTextFile("data/cdn/videoIPs")}/** * Traffic per hour in a day ** /
def hourPoor(cdnRDD: RDD[String]) = {
val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r
val timePattern = ". * (2017) : ([0-9] {2}) : [0-9] {2} : [0-9] {2}. *".r
import scala.util.matching.Regex
def isMatch(pattern: Regex, str: String) = {
str match {
case pattern(_*) => true
case_ = >false}}def getTimeAndSize(line: String) = {
var res = ("".0L)
try {
val httpSizePattern(code, size) = line
val timePattern(year, hour) = line
res = (hour, size.toLong)
} catch {
case ex: Exception => ex.printStackTrace()
}
res
}
cdnRDD
.filter(x=>isMatch(httpSizePattern,x))
.filter(x=>isMatch(timePattern,x))
.map(x=>getTimeAndSize(x))
.groupByKey()
.map(x=>(x._1,x._2.sum))
.sortByKey()
.map(x=>x._1+"CDN traffic ="+x._2/(102424*1024) +"G")
.saveAsTextFile("data/cdn/hourPoor")}}Copy the code
Screenshot of running results:
4. Advertising exposure analysis cases
Assume that each line in the click log file (click.log) and exposure log imp. Log is formatted as follows
// Click log
INFO 2019- 09- 01 00:29:53requestURI:/click? app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019- 09- 01 00:30:31requestURI:/click? app=2&p=1&adid=18005472&industry=469&adid=31
INFO 2019- 09- 01 00:31:03requestURI:/click? app=1&p=1&adid=18005472&industry=469&adid=32
INFO 2019- 09- 01 00:31:51requestURI:/click? app=1&p=1&adid=18005472&industry=469&adid=33
// Exposure log
INFO 2019- 09- 01 00:29:53requestURI:/imp? app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019- 09- 01 00:29:53requestURI:/imp? app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019- 09- 01 00:29:53requestURI:/imp? app=1&p=1&adid=18005472&industry=469&adid=34
Copy the code
Spark-core is used to calculate the exposure number and click number of each ADID. The idea is simple and the code can be directly used
Code:
package com.hoult.work
import org.apache.spark.sql.SparkSession
object AddLog {
def main(args: Array[String) :Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName(this.getClass.getCanonicalName)
.getOrCreate()
val sc = spark.sparkContext
val clickRDD = sc.textFile("data/click.log")
val impRDD = sc.textFile("data/imp.log")
val clickRes = clickRDD.map{line => {
val arr = line.split("\\s+")
val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
(adid, 1)
}}.reduceByKey(_ + _)
val impRes = impRDD.map { line =>
val arr = line.split("\\s+")
val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
(adid, 1)
}.reduceByKey(_ + _)
// Save the file to HDFS
clickRes.fullOuterJoin(impRes)
.map(x => x._1 + "," + x._2._1.getOrElse(0) + "," + x._2._2.getOrElse(0))
.repartition(1)
// .saveAsTextFile("hdfs://linux121:9000/data/")
.saveAsTextFile("data/add_log")
sc.stop()
}
}
Copy the code
Analysis: There are two shuffles in total, fulljon can be changed to Union + reduceByKey, reducing shuffle to one
5. Use spark-SQL to complete the conversion
Select ID, startDate, endDate from startDate; select ID, startDate from startDate;
1 2019-03-04 2020-02-03
2 the 2020-04-05 2020-08-04
3 2019-10-09 2020-06-11
Write SQL (SQL and DSL required) to change the above data to:
The 2019-03-04 2019-10-09
The 2019-10-09 2020-02-03
The 2020-02-03 2020-04-05
The 2020-04-05 2020-06-11
The 2020-06-11 2020-08-04
The 2020-08-04 2020-08-04
The first column is actually the result of the startDate and endDate columns, and the second column is the next column, which can be used with the lead
The window function
The code is as follows:
package com.hoult.work
import org.apache.spark.sql.{DataFrame.SparkSession}
object DataExchange {
def main(args: Array[String) :Unit = {
val spark = SparkSession
.builder()
.appName("DateSort")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
/ / the original data
val tab = List((1."2019-03-04"."2020-02-03"), (2."2020-04-05"."2020-08-04"), (3."2019-10-09"."2020-06-11"))
val df: DataFrame = spark.createDataFrame(tab).toDF("ID"."startdate"."enddate")
val dateset: DataFrame = df.select("startdate").union(df.select("enddate"))
dateset.createOrReplaceTempView("t")
val result: DataFrame = spark.sql(
""" |select tmp.startdate, nvl(lead(tmp.startdate) over(partition by col order by tmp.startdate), startdate) enddate from |(select "1" col, startdate from t) tmp |""".stripMargin)
result.show()
}
}
Copy the code
Running results:
Check your profile for more.