The second article in 2021

Brief description

This article focuses on Datax log auditing capabilities.

Log is an indispensable tool for application running and troubleshooting defects. Logging is especially important for data transfer tools such as Datax, which require extreme compression machine performance, real-time transmission, and dirty data logging.

Design interpretation

In the Datax log, the information includes but is not limited to transfer speed, Reader\Writer performance, process, CPU, JVM, and GC. From the directions listed above, we can see that the dimensions involved have gone from large to small, from CPU to process, from tasks to more granular tasks…

So in fact, to cover all of this, Datax does log monitoring at two different levels:

  1. CPU, JVM, GC, etc
  2. Task level

And the task level is split into

  1. The Job tasks
  2. TaskGroup test TaskGroup
  3. Task Single Task

Task A single Task has more granularity:

  1. The init time for Reader\Writer
  2. Time required for Prepare for Reader\Writer
  3. The time required for Reader Writer to process Data
  4. Reader\Writer Time required for other events….
  5. As well as custom event timing… This is generally customized development

Task Individual tasks continue to be broken down as well as transfer rates, aggregates of metrics, and averages.

In my opinion, Datax also has two different levels of design and two levels of code. So at the code level, Datax has this design:

  1. The operation of the AbstractContainerCommunicator is responsible for collecting the whole task information; The subclass TGContainerCommunicator collects task group information about taskGroupContainer and the subclass JobContainerCommunicator reports task level information about taskGroupContainer
  2. PerfTrace is equivalent to a link tracker for a single JVM, and PerfRecord is the data in PerfTrace.

From the point of view of the code design, it is not hard to see how PerfTrace can be used for logging in real time while the JVM is running; Communicator acts as a reporting device for external information, and can perform a log and monitor data output in real time or regularly.

The source code to explain

AbstractContainerCommunicator

Look at the properties of the AbstractContainerCommunicator first

    private Configuration configuration; / / 1.
    private AbstractCollector collector; / / 2.
    private AbstractReporter reporter;   / / 3.

    private Long jobId;     / / 4.

    private VMInfo vmInfo = VMInfo.getVmInfo();    / / 5.
Copy the code

Code ① represents task configuration code ② represents information collector ③ Represents information reporter code ④ represents task identification code ⑤ represents JVM and process-level monitoring

Initialize the

Communicator initialization includes Job initialization and TaskGroup initialization. The Job code entry is to instantiate the Scheduler and set up the Communicator when JobContainer starts scheduling.

    private void schedule(a) {
        scheduler = initStandaloneScheduler(this.configuration);  
    }
    
    private AbstractScheduler initStandaloneScheduler(Configuration configuration) {
        AbstractContainerCommunicator containerCommunicator = new StandAloneJobContainerCommunicator(configuration);
        super.setContainerCommunicator(containerCommunicator);

        return new StandAloneScheduler(containerCommunicator);
    }
Copy the code

While StandAloneJobContainerCommunicator refers to in the face of single independent JobContainer. Instantiation StandAloneJobContainerCommunicator, at the same time set up information collector and report.

    public StandAloneJobContainerCommunicator(Configuration configuration) {
        super(configuration);
        super.setCollector(new ProcessInnerCollector(configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_JOB_ID)));
        super.setReporter(new ProcessInnerReporter());
    }
Copy the code

While TaskGroup initialization, is in TaskGroupContainer initialization, at the same time will instantiate a StandaloneTGContainerCommunicator and its member attribute assignment.

    public TaskGroupContainer(Configuration configuration) {
        super(configuration);

        initCommunicator(configuration);

        / /...
    }
    
    private void initCommunicator(Configuration configuration) {
        super.setContainerCommunicator(new StandaloneTGContainerCommunicator(configuration));
    }
Copy the code

Therefore, Datax uses different communicators for Job and TaskGroup. So next we will talk about the actual detection process separately.

registered

When Communicator and Scheduler are instantiated, it is ready to be used. Configurations are first registered in Communicator.

     this.containerCommunicator.registerCommunication(configurations);
Copy the code

Register the AbstractCollector#registerTGCommunication method called. The logic is: Loop Configurations, which obtains its taskGroupId, and each taskGroup has a Communication

    public void registerTGCommunication(List<Configuration> taskGroupConfigurationList) {
        for (Configuration config : taskGroupConfigurationList) {
            int taskGroupId = config.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
            LocalTGCommunicationManager.registerTaskGroupCommunication(taskGroupId, newCommunication()); }}Copy the code

The logic is: Loop Configurations, which obtains its taskGroupId, and each taskGroup has a Communication. While LocalTGCommunicationManager responsible for save the information

Using the process

Use flow refers to the use of Communicator in actual framework code.

JobContainerCommunicator

The monitoring layer of the Job is in the AbstractScheduler#schedule method. In the scheduling process, monitoring involves the following steps:

  1. Collect Job running information
  2. Get information about the performance of each TaskGroup and report back

The first step is to collect the running status of the Job and call the collect method

    Communication nowJobContainerCommunication = this.containerCommunicator.collect();
Copy the code

And then you actually call the chain

StandAloneJobContainerCommunicator#collect
ProcessInnerCollector#collectFromTaskGroup
LocalTGCommunicationManager#getJobCommunication
Copy the code

Then retrieve the performance information of each TaskGroup and report it back.

    this.containerCommunicator.report(reportCommunication);
Copy the code

And then you actually call the chain

StandAloneJobContainerCommunicator#report
AbstractReporter#reportJobCommunication
Copy the code

TaskGroupContainerCommunicator

The monitoring layer of a TaskGroup is located in the TaskGroupContainer#start method. In the process, monitoring involves the following steps:

  1. Register tasks from the same TaskGroup into TGCommunicator
  2. The mapping between Task and Communication is obtained from TGCommunicator, and the loop judgment is used for counting

The first step is to register the TaskGroup’s Task.

  this.containerCommunicator.registerCommunication(taskConfigs);
Copy the code

Then the call above is Communicator#registerCommunication

Copy the code

Call again AbstractCollector# registerTaskCommunication

    public void registerTaskCommunication(List<Configuration> taskConfigurationList) {
        for (Configuration taskConfig : taskConfigurationList) {
            int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
            this.taskCommunicationMap.put(taskId, newCommunication()); }}Copy the code

Then get the mapping between Task and Communication from TGCommunicator and make a loop judgment. Because the code is too long, I simplify the code.

while (true) {
    / / 1.
    Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
    / / 2.
    for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
        if(taskCommunication.getState() == State.FAILED){   / / 3.
            containerCommunicator.resetCommunication(taskId); 
            
        }else if(taskCommunication.getState() == State.KILLED){  / / 4.
            failedOrKilled = true;
            break;
        }else if(taskCommunication.getState() == State.SUCCEEDED){  / / 5.
            / /...
            PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);
            / /...}}/ / 6.
    if (failedOrKilled) {
        lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    }
    
    / / all landowners
    while(iterator.hasNext() && runTasks.size() < channelNumber){
        / /...
        TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
        if(lastExecutor! =null) {/ /...
            if(! lastExecutor.isShutdown()){if(now - failedTime > taskMaxWaitInMsec){
                    markCommunicationFailed(taskId);
                    reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                    //throw Exception
                }else{}}}/ /...
        TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
        taskExecutor.doStart();
        / /...
        taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
    }
    / / end
    if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
        lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    }
    
    / / pet-name ruby
    long now = System.currentTimeMillis();
    if (now - lastReportTimeStamp > reportIntervalInMillSec) {
        lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                            
        for(TaskExecutor taskExecutor:runTasks){
             taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId())); }}}Copy the code

(1) Obtain the Communication code of all registered tasks first; (2) loop Communication; (3) perform a corresponding processing code for each state; (4) verify whether the retry condition is met if it is FAILED; If the TaskExeccutor is KILLED, the loop code (⑤) is displayed. If the TaskExeccutor is KILLED, the loop code (⑥) is displayed. If failedOrKilled=true, the loop code (⑦) is displayed. This is mainly to check whether the failed task is not executed and whether it still cannot run. ⑨ Report the execution status of each task on a regular basis, which is equivalent to updating the TaskMonitor data to the Communicator

This code is the TaskGroup monitoring the health of its Task. And one of the most frequent is reportTaskGroupCommunication method, generally he is an update to the method of statistics

    private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){
        Communication 
        / / 1.
        nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
        nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());
        / / 2.
        Communication reportCommunication = CommunicationTool.getReportCommunication(nowTaskGroupContainerCommunication,
                lastTaskGroupContainerCommunication, taskCount);
        / / 3.
        this.containerCommunicator.report(reportCommunication);
        return reportCommunication;
    }
Copy the code

Code ①, obtain the current latest operation code ②, count the latest data with the old data ③, re-report the data and finally return

PS: CommunicationTool is Datax statistical tool, interested can go to understand

PerfTrace and PerfRecord

As mentioned above for container-level log monitoring, we actually have transport event monitoring as well. What is transport event level monitoring? In fact, it is used to monitor the stages of Reader and Writer in terms of time, speed, statistics, and so on.

So in the Datax, PerfTrace and PerfRecord are responsible for this implementation. PerfTrace can be understood as data statistics of link tracking of microservices. It logs jobs (local mode) and Taskgroups (distribute mode) because both are JVMS, meaning that only one PerfTrace is required in a JVM.

So what data is PerfRecord actually allowed to record? Below I present a more detailed list of the latitude statistics currently supported by the framework.

public enum PHASE {

    // The total time of task running. The top 10 are frame statistics, followed by the personality statistics of some plug-ins
    TASK_TOTAL(0),

    //Reader init、prepare、data、post 以及 destroy
    READ_TASK_INIT(1),
    READ_TASK_PREPARE(2),
    READ_TASK_DATA(3),
    READ_TASK_POST(4),
    READ_TASK_DESTROY(5),

    //Writer init, prepare, data, POST, and destroy
    WRITE_TASK_INIT(6),
    WRITE_TASK_PREPARE(7),
    WRITE_TASK_DATA(8),
    WRITE_TASK_POST(9),
    WRITE_TASK_DESTROY(10),

    //SQL_QUERY: SQL query phase, part of reader personality statistics
    SQL_QUERY(100),

    // All data is read from SQL
    RESULT_NEXT_ALL(101),}Copy the code

So how are PerfTrace and PerfRecord implemented in real code? I use ReaderRunner as an example. ReaderRunner#start needs to count the init, start, end, or destroy methods of reader. Task, so use PerfRecord to count.

First, when PerfRecord#start is called, it registers itself into PerfTrace so that it can be traced further.

    public void start(a) {
        if(PerfTrace.getInstance().isEnable()) {
            this.startTime = new Date();
            this.startTimeInNs = System.nanoTime();
            this.action = ACTION.start;
            // Register in PerfTrace
            PerfTrace.getInstance().tracePerfRecord(this); perf.info(toString()); }}Copy the code

Then start doing the actual business logic, such as counting init methods

    PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
    initPerfRecord.start();
    taskReader.init();
    initPerfRecord.end();
Copy the code

At the code level, we just need to pass in TaskGroupId, TaskId, and events to do an execution time count; Again for instance

    PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
    dataPerfRecord.start();
    taskReader.startRead(recordSender);
    dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
    dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
    dataPerfRecord.end();
Copy the code

Only addCount and addSize can be used to complete the statistics. But at the same time, you also found that the CommunicationTool is the data statistics mentioned above, isn’t it? Yes, in fact ReaderRunner itself has a Communication property that counts corresponding data.

At the end of the task, the current PerfRecord is aggregated for statistics. The nodes counted by the Job and TaskGroup are both calculated and printed when PerfTrace#summarizeNoException is called in finally.

First, if the number of PerfRecords is greater than 0, it will be summarized and printed

    if (totalEndReport.size() > 0) {
        sumPerf4EndPrint(totalEndReport);
    }
Copy the code

The average elapsed time, number of tasks, maximum elapsed time, and longest task for each stage of the taskGroup are then cyclically calculated

    for (PHASE phase : keys) {
        SumPerfRecord4Print sumPerfRecord = perfRecordMaps4print.get(phase);
        if (sumPerfRecord == null) {
            continue;
        }
        long averageTime = sumPerfRecord.getAverageTime();
        long maxTime = sumPerfRecord.getMaxTime();
        int maxTaskId = sumPerfRecord.maxTaskId;
        int maxTaskGroupId = sumPerfRecord.getMaxTaskGroupId();
        info.append(String.format("%-20s | %18s | %18s | %18s | %18s | %-100s\n",
                phase, unitTime(averageTime), sumPerfRecord.totalCount, unitTime(maxTime), jobId + "-" + maxTaskGroupId + "-" + maxTaskId, taskDetails.get(maxTaskId)));
    }

Copy the code

Finally, the average number of records, average bytes, speed, maximum bytes and other data of task execution are calculated

    long averageRecords = countSumPerf.getAverageRecords();
    long averageBytes = countSumPerf.getAverageBytes();
    long maxRecord = countSumPerf.getMaxRecord();
    long maxByte = countSumPerf.getMaxByte();
    int maxTaskId4Records = countSumPerf.getMaxTaskId4Records();
    int maxTGID4Records = countSumPerf.getMaxTGID4Records();

    info.append("\n\n 2. record average count and max count task info :\n\n");
    info.append(String.format("%-20s | %18s | %18s | %18s | %18s | %18s | %-100s\n"."PHASE"."AVERAGE RECORDS"."AVERAGE BYTES"."MAX RECORDS"."MAX RECORD`S BYTES"."MAX TASK ID"."MAX TASK INFO"));
    if (maxTaskId4Records > -1) {
        info.append(String.format("%-20s | %18s | %18s | %18s | %18s | %18s | %-100s\n"
                , PHASE.READ_TASK_DATA, averageRecords, unitSize(averageBytes), maxRecord, unitSize(maxByte), jobId + "-" + maxTGID4Records + "-" + maxTaskId4Records, taskDetails.get(maxTaskId4Records)));

    }
Copy the code

At the end of the article

This article has only scratched the surface of Datax’s log monitoring capabilities, but there can be more customizations and assumptions about log data monitoring in the future.