In the last article, Big Pig introduced the log storage design. Now that our data has landed in the data center, what about ETL? After all, it’s a production environment. You can’t mess with it. In fact, only a few problems can be solved, there is no need to introduce a large level of components to do, of course, each has its own advantages, this paper mainly from the easy to understand, small, simple, high performance of these three aspects to design the starting point, by the way, also realized a sophisticated Filebeat.
design
The LogHub function scans daily incremental logs and writes them to Hbase
There are a few small challenges to overcome
- You need to extract every line of data in the file
- It can handle large log files of more than 10GB, and can only occupy a certain amount of memory of the machine, the smaller the better
- In the preceding figure, you can see that data marked yellow has been written into Hbase. Data cannot be read repeatedly
- Inactive files cannot be scanned, because too many files affect the overall READ I/O performance
- The process of reading is to ensure that the incremental data cannot be recorded, because it is necessary to ensure that the mysql write at offset is stable and does not jump
implementation
Big pig according to the production environment on the line a re-analysis of the above functions to achieve.
Isn’t the first one a bit easier? But we have to look at it in combination with the five questions above.
In summary: to achieve a high performance and can restart at any time to continue working logHub ETL program.
This is necessary because the production environment can’t afford to be sloppy, or you’ll get the BOSS
The implementation process
You need a method to read all log files
You will also implement a method to save and read the progress of the file
Because you can’t read a log file into memory for processing, you also need a way to read data line by line based on the index
The last remaining tool is the Hbase connection pooling widget
Several core methods have been written, followed by our main program
def run(logPath: File, defaultOffsetDay: String): Unit = {
val sdfstr = Source.fromFile(seekDayFile).getLines().mkString
val offsetDay = Option(if (sdfstr == "") null elseSDFSTR) // Read sets the date folder val noneOffsetFold = one day after the countdown of the read datelogPath .listFiles() .filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString) .sortby (f => localdate.parse (f.gete name).toepochday) // Read all log files in the folder, Val filesPar = noneoffsetfold.flatMap (files(_, file => file.getName.endswith ())".log"))) .map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), File.length ()).filter(tp2 => {// Filter new files, With increment of the log file val fileMd5 = MD5Hash. GetMD5AsHex (tp2. _1. GetAbsolutePath. GetBytes ()) val result = offsets. AsScala. Filter (m = > fileMd5.equals(m._1)) result.isEmpty || tp2._3 > result.head._2 }) .par filesPar.tasksupport = pool val willUpdateOffset = new util.HashMap[String, Long]() val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
var logTime:String = null filesPar .foreach(tp3 => { val hbaseClient = HbasePool.getTable Lines (tp3._1, tp3._2, tp3._3, tp3. () => { willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3) offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3) }) .foreach(line => { val jsonObject = parse(line) val time = (jsonObject \"time").extract[Long]
val data = jsonObject \ "data"val dataMap = data.values.asInstanceOf[Map[String, Any]] .filter(_._2 ! = null) .map(x => x._1 -> x._2.toString) val uid = dataMap("uid")
logTime = time.getLocalDateTime.toString
val rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)
val row = new Put(Bytes.toBytes(rowkey))
dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.tobytes (tp2._2))) hBaseclient.mutate (row)}) hBaseclient.flush ()}) // Update index to file writeSeek(willUpdateOffset) / / update the index date to file writeSeekDay (noneOffsetFold. Last. GetName) / /logTime offset is written to mysql for Spark+Hbase to read and calculate.}Copy the code
The program is minimal, there are no useless features in it, and the online production environment should be like this. You can also add the function of sending out emails and so on according to your needs. The actual calculation is only 100 lines of functional code, and it takes up very little memory, less than 100M, very fine.
Portal complete ETL program source code