1. JobContainer

1.1 Basic Introduction

The job instance of JobContainer runs in JobContainer. It is the master of all tasks and is responsible for initialization, splitting, scheduling, running, recycling, monitoring, and reporting.

1.2 Core members and introduction

1.3 Core member variables

1.4 Core method source code

1.4.1 the init method

Initialization/reader and writer's * * * * / private void init () {enclosing jobId = this. The configuration. The getLong ( CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1); if (this.jobId < 0) { LOG.info("Set jobId = 0"); this.jobId = 0; this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, this.jobId); } Thread.currentThread().setName("job-" + this.jobId); JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector( this.getContainerCommunicator()); // Writer this.jobReader = this.initJobReader(jobPluginCollector); this.jobWriter = this.initJobWriter(jobPluginCollector); }Copy the code

1.4.2 the split method

/** * Execute the finer-grained sharding between reader and writer. Note that writer has the same number of sharding results as reader. 1 channel model, so that the reader and writer configurations can be integrated, * then, to avoid ordering the long tail on the read and write side, */ private int split() {this.AdjustchannelNumber (); if (this.needChannelNumber <= 0) { this.needChannelNumber = 1; } List<Configuration> readerTaskConfigs = this .doReaderSplit(this.needChannelNumber); int taskNumber = readerTaskConfigs.size(); List<Configuration> writerTaskConfigs = this .doWriterSplit(taskNumber); List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER); LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList)); /** * input is the parameter list of reader and writer. Output is below the content element of the list * / list < Configuration > contentConfig = mergeReaderAndWriterTaskConfigs (readerTaskConfigs, writerTaskConfigs, transformerList); LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig)); this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig); return contentConfig.size(); }Copy the code

The 1.4.3 Schedule method defines the scheduling class and starts **

/** * the first task to be done is to combine the results of the previous step's reader and writer split into a specific taskGroupContainer. */ private void schedule() {/** * int channelsPerTaskGroup = B/s */ this.configuration.getInt( CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5); int taskNumber = this.configuration.getList( CoreConstant.DATAX_JOB_CONTENT).size(); this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber); PerfTrace.getInstance().setChannelNumber(needChannelNumber); */ List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration, this.needChannelNumber, channelsPerTaskGroup); LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size()); ExecuteMode executeMode = null; AbstractScheduler scheduler; try { executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this.configuration); // Set executeMode for (Configuration taskGroupConfig: taskGroupConfigs) { taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue()); } if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) { if (this.jobId <= 0) { throw DataXException. AsDataXException (FrameworkErrorCode RUNTIME_ERROR, "[local | distribute] mode must be set jobId, and its value > 0."); } } LOG.info("Running by {} Mode.", executeMode); this.startTransferTimeStamp = System.currentTimeMillis(); scheduler.schedule(taskGroupConfigs); this.endTransferTimeStamp = System.currentTimeMillis(); } catch (Exception e) {log. error(" Scheduler [{}] failed.", executeMode); this.endTransferTimeStamp = System.currentTimeMillis(); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } /** * check the task execution */ this.checklimit (); }Copy the code

1.5. Inheritance

JobContainer and TaskGroupContainer are derived from AbstrctContainer

2. ProcessInnerScheduler and StandAloneScheuler

2.1 Basic Introduction:

After JobContainer executes init and split methods, the scheduler starts scheduling and executing tasks. At this point, the scheduler needs to allocate, invoke, monitor, and process dynamic tasks.

2.2 Core member methods, member variables and their introduction

2.3 Core method source code

Starting all TaskGruop sets up a thread pool for unified management

public void startAllTaskGroup(List<Configuration> configurations) {
    this.taskGroupContainerExecutorService = Executors
            .newFixedThreadPool(configurations.size());

    for (Configuration taskGroupConfiguration : configurations) {
        TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
        this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
    }

    this.taskGroupContainerExecutorService.shutdown();
}
Copy the code

Create a TaskGroup thread

private TaskGroupContainerRunner newTaskGroupContainerRunner(
        Configuration configuration) {
    TaskGroupContainer taskGroupContainer = new TaskGroupContainer(configuration);

    return new TaskGroupContainerRunner(taskGroupContainer);
}
Copy the code

2.4 Inheritance relationship

3.TaskGroupContainerRunner

3.1 Basic Introduction

Contains the running content of the TaskGroup’s specific thread.

3.2 Member methods and variables

4. TaskGroupContainer

4.1 Basic Introduction

The TaskGroupContainerRunner thread calls the start method of TaskGroupContainer to start askGroupContainer. TaskGroupContainer Manages TaskGruop tasks. It is used to monitor, run, and centrally manage tasks.

The TaskGroupContainer startup performs two parts: initializes task status information. They are the mapping set of taskId and its Congifuration map, taskQueue to run, taskFailedExecutorMap, and task set runTasks. Enter a loop to judge the execution status of each task.

  1. Check whether any task fails. If yes, add the task to the taskFailedExecutorMap and check whether the current execution supports rerun and failOver. If yes, restore the task to the execution queue. If there is no failure, the task is marked as successful and removed from the status polling map.
  2. If a failed task is found, the status is reported to the container and an exception is thrown.
  3. Check the length of the current execution queue, and if there are channels in the execution queue, build the TaskExecutor, add it to the execution queue, and remove it from waiting.
  4. Check the execution queue and the status of all tasks, and if all tasks are successfully executed, report the taskGroup status and exit the loop.
  5. Check whether the current time exceeds the report time. If the time exceeds the report time, report the current status to the whole world.
  6. After all tasks are successful, report the current task status to the whole world.

4.2 Methods and introduction of core members

4.3 Core member Variables

4.4 Inheritance Relationship

Inherits from the same parent class as JobContainer.

4.5 Core method source code

Start TaskGroupContainer to check the execution status of each task. Schedule and manage each task

public void start() {
    try {
        /**
         * 状态check时间间隔,较短,可以把任务及时分发到对应channel中
         */
        int sleepIntervalInMillSec = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
        /**
         * 状态汇报时间间隔,稍长,避免大量汇报
         */
        long reportIntervalInMillSec = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,
                10000);
        /**
         * 2分钟汇报一次性能统计
         */

        // 获取channel数目
        int channelNumber = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);

        int taskMaxRetryTimes = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);

        long taskRetryIntervalInMsec = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);

        long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
        
        List<Configuration> taskConfigs = this.configuration
                .getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);

        if(LOG.isDebugEnabled()) {
            LOG.debug("taskGroup[{}]'s task configs[{}]", this.taskGroupId,
                    JSON.toJSONString(taskConfigs));
        }
        
        int taskCountInThisTaskGroup = taskConfigs.size();
        LOG.info(String.format(
                "taskGroupId=[%d] start [%d] channels for [%d] tasks.",
                this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
        
        this.containerCommunicator.registerCommunication(taskConfigs);

        Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs); //taskId与task配置
        List<Configuration> taskQueue = buildRemainTasks(taskConfigs); //待运行task列表
        Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>(); //taskId与上次失败实例
        List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); //正在运行task
        Map<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); //任务开始时间

        long lastReportTimeStamp = 0;
        Communication lastTaskGroupContainerCommunication = new Communication();

        while (true) {
           //1.判断task状态
           boolean failedOrKilled = false;
           Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();
           for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
              Integer taskId = entry.getKey();
              Communication taskCommunication = entry.getValue();
                if(!taskCommunication.isFinished()){
                    continue;
                }
                TaskExecutor taskExecutor = removeTask(runTasks, taskId);

                //上面从runTasks里移除了,因此对应在monitor里移除
                taskMonitor.removeTask(taskId);

                //失败,看task是否支持failover,重试次数未超过最大限制
              if(taskCommunication.getState() == State.FAILED){
                    taskFailedExecutorMap.put(taskId, taskExecutor);
                 if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){
                        taskExecutor.shutdown(); //关闭老的executor
                        containerCommunicator.resetCommunication(taskId); //将task的状态重置
                    Configuration taskConfig = taskConfigMap.get(taskId);
                    taskQueue.add(taskConfig); //重新加入任务列表
                 }else{
                    failedOrKilled = true;
                     break;
                 }
              }else if(taskCommunication.getState() == State.KILLED){
                 failedOrKilled = true;
                 break;
              }else if(taskCommunication.getState() == State.SUCCEEDED){
                    Long taskStartTime = taskStartTimeMap.get(taskId);
                    if(taskStartTime != null){
                        Long usedTime = System.currentTimeMillis() - taskStartTime;
                        LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms",
                                this.taskGroupId, taskId, usedTime);
                        //usedTime*1000*1000 转换成PerfRecord记录的ns,这里主要是简单登记,进行最长任务的打印。因此增加特定静态方法
                        PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L);
                        taskStartTimeMap.remove(taskId);
                        taskConfigMap.remove(taskId);
                    }
                }
           }
           
            // 2.发现该taskGroup下taskExecutor的总状态失败则汇报错误
            if (failedOrKilled) {
                //...此处省略部分代码
            }
            
            //3.有任务未执行,且正在运行的任务数小于最大通道限制
            Iterator<Configuration> iterator = taskQueue.iterator();
            while(iterator.hasNext() && runTasks.size() < channelNumber){
                Configuration taskConfig = iterator.next();
                Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
                int attemptCount = 1;
                TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
                if(lastExecutor!=null){
                    attemptCount = lastExecutor.getAttemptCount() + 1;
                    long now = System.currentTimeMillis();
                    long failedTime = lastExecutor.getTimeStamp();
                    if(now - failedTime < taskRetryIntervalInMsec){  //未到等待时间,继续留在队列
                        continue;
                    }
                    if(!lastExecutor.isShutdown()){ //上次失败的task仍未结束
                        if(now - failedTime > taskMaxWaitInMsec){
                            markCommunicationFailed(taskId);
                            reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                            throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");
                        }else{
                            lastExecutor.shutdown(); //再次尝试关闭
                            continue;
                        }
                    }else{
                        LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown",
                                this.taskGroupId, taskId, lastExecutor.getAttemptCount());
                    }
                }
                Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
               TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
                taskStartTimeMap.put(taskId, System.currentTimeMillis());
               taskExecutor.doStart();

                iterator.remove();
                runTasks.add(taskExecutor);

                //上面,增加task到runTasks列表,因此在monitor里注册。
                taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));

                taskFailedExecutorMap.remove(taskId);
                LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started",
                        this.taskGroupId, taskId, attemptCount);
            }

            //4.任务列表为空,executor已结束, 搜集状态为success--->成功
            if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
               // 成功的情况下,也需要汇报一次。否则在任务结束非常快的情况下,采集的信息将会不准确
              //...省略部分代码
            }

            // 5.如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报
            long now = System.currentTimeMillis();
            if (now - lastReportTimeStamp > reportIntervalInMillSec) {
                lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                        lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

                lastReportTimeStamp = now;

                //taskMonitor对于正在运行的task,每reportIntervalInMillSec进行检查
                for(TaskExecutor taskExecutor:runTasks){
                    taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
                }

            }

            Thread.sleep(sleepIntervalInMillSec);
        }

        //6.最后还要汇报一次
        reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);


    } catch (Throwable e) {
        //...省略部分代码
    }finally {
        //...省略部分代码
        
    }
}
Copy the code

5. AbstractContainer

5.1 Basic Introduction

Abstract class of JobContainer and TaskGroupContainer that holds the global Configuration of the container.

5.2 Methods and introduction of core members

6. TaskExecutor

6.1 Basic Introduction

TaskExecutor is an internal class of TaskGroupContainer that manages and starts individual tasks. The read-write thread is started and some intermediate transport classes are defined for transport and monitoring.

6.2 Introduction to core member methods and variables

6.3 Core method source code

Startup method:

public void doStart() { this.writerThread.start(); // Reader can't finish if (! this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) { throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable()); } this.readerThread.start(); // Here reader may end soon if (! This. ReaderThread. IsAlive () && enclosing taskCommunication. GetState () = = State. The FAILED) {/ / Reader online startup that hanging may happen here For this type of situation Need to throw an exception immediately throw DataXException. AsDataXException (FrameworkErrorCode RUNTIME_ERROR, this.taskCommunication.getThrowable()); }}Copy the code

Initialize the plug-in (write plug-in initialization similar to his, too long code, omitted here)

7. Record

7.1 Basic Introduction

Record mainly DefaultRecord and TerminateRecord internal class, is the data source on both sides, that is, the basic unit transferred between reading and writing threads, represents a Record in the database.

7.2 Core membership methods and introduction

7.3 Inheritance Relationship

8. Remaining data structures

8.1 the Channel

Is a data transfer and storage center class for data sources to perform read and write operations in a task

8.2 mysqlWriter and mysqlReader

Responsible for reading and writing mysql, including init initialization method, split personalized split task, and task internal class startRead and startWrite methods (JDBC get database data to store in Record). Inheriting from the abstract classes Reader and Writer.

Their methods are called in the read-write thread, as discussed in the plug-in development guide.

8.3 the Configuration

The Configuration class stores all INFORMATION about THE Json Configuration file and provides non-destructive storage of multi-level Json Configuration information. Core member methods, used to modify and retrieve data:

The core member variables are mainly root variables, which parse the entire Json file through Json format and turn it into an Object class.

9. To summarize

This is an introduction to the core data structure, but there are many utility classes that are not covered, such as the DataX custom class loader, how to run the read-write thread class, if you are interested in looking at the source code.