In the last article, Big Pig introduced the log storage design. Now that our data has landed in the data center, what about ETL? After all, it’s a production environment. You can’t mess with it. In fact, only a few problems can be solved, there is no need to introduce a large level of components to do, of course, each has its own advantages, this paper mainly from the easy to understand, small, simple, high performance of these three aspects to design the starting point, by the way, also realized a sophisticated Filebeat.

design

The LogHub function scans daily incremental logs and writes them to Hbase

There are a few small challenges to overcome

  1. You need to extract every line of data in the file
  2. It can handle large log files of more than 10GB, and can only occupy a certain amount of memory of the machine, the smaller the better
  3. In the preceding figure, you can see that data marked yellow has been written into Hbase. Data cannot be read repeatedly
  4. Inactive files cannot be scanned, because too many files affect the overall READ I/O performance
  5. The process of reading is to ensure that the incremental data cannot be recorded, because it is necessary to ensure that the mysql write at offset is stable and does not jump

implementation

Big pig according to the production environment on the line a re-analysis of the above functions to achieve.

Isn’t the first one a bit easier? But we have to look at it in combination with the five questions above.

In summary: to achieve a high performance and can restart at any time to continue working logHub ETL program.

This is necessary because the production environment can’t afford to be sloppy, or you’ll get the BOSS

The implementation process

You need a method to read all log files

You will also implement a method to save and read the progress of the file

Because you can’t read a log file into memory for processing, you also need a way to read data line by line based on the index

The last remaining tool is the Hbase connection pooling widget

Several core methods have been written, followed by our main program

def run(logPath: File, defaultOffsetDay: String): Unit = {
    val sdfstr = Source.fromFile(seekDayFile).getLines().mkString
    val offsetDay = Option(if (sdfstr == "") null elseSDFSTR) // Read sets the date folder val noneOffsetFold = one day after the countdown of the read datelogPath .listFiles() .filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString) .sortby (f => localdate.parse (f.gete name).toepochday) // Read all log files in the folder, Val filesPar = noneoffsetfold.flatMap (files(_, file => file.getName.endswith ())".log"))) .map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), File.length ()).filter(tp2 => {// Filter new files, With increment of the log file val fileMd5 = MD5Hash. GetMD5AsHex (tp2. _1. GetAbsolutePath. GetBytes ()) val result = offsets. AsScala. Filter (m = > fileMd5.equals(m._1)) result.isEmpty || tp2._3 > result.head._2 }) .par filesPar.tasksupport = pool val willUpdateOffset  = new util.HashMap[String, Long]() val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
    var logTime:String = null filesPar .foreach(tp3 => { val hbaseClient = HbasePool.getTable Lines (tp3._1, tp3._2, tp3._3, tp3. () => { willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3) offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3) }) .foreach(line => { val jsonObject = parse(line) val time = (jsonObject \"time").extract[Long]
            val data = jsonObject \ "data"val dataMap = data.values.asInstanceOf[Map[String, Any]] .filter(_._2 ! = null) .map(x => x._1 -> x._2.toString) val uid = dataMap("uid")
            logTime = time.getLocalDateTime.toString
            val rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)

            val row = new Put(Bytes.toBytes(rowkey))
            dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.tobytes (tp2._2))) hBaseclient.mutate (row)}) hBaseclient.flush ()}) // Update index to file writeSeek(willUpdateOffset) / / update the index date to file writeSeekDay (noneOffsetFold. Last. GetName) / /logTime offset is written to mysql for Spark+Hbase to read and calculate.}Copy the code

The program is minimal, there are no useless features in it, and the online production environment should be like this. You can also add the function of sending out emails and so on according to your needs. The actual calculation is only 100 lines of functional code, and it takes up very little memory, less than 100M, very fine.

Portal complete ETL program source code