1 Program entry class Engine
The entry class for task execution is Engine
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
// Call the internal entry method
Engine.entry(args);
} catch (Throwable e) {
/ / /... Omit other code
}
System.exit(exitCode);
}
Copy the code
The entry method inside Engine provides the following functions:
- Parse the command line parameters -mode, -jobid, and -job to obtain the execution mode, jobid, and job configuration file paths respectively.
- Parse the Configuration of the json file edited by the user, package the parse JSON file in a Configuration class (how to parse later added), create a new Engine class and call the start method to start.
public static void entry(final String[] args) throws Throwable {
// Define the command line arguments -mode, -jobid, -job
Options options = new Options();
options.addOption("job".true."Job config.");
options.addOption("jobid".true."Job unique id.");
options.addOption("mode".true."Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);// Parse command line arguments
String jobPath = cl.getOptionValue("job");
// If the user does not specify jobid explicitly, datax.py specifies jobid with a default value of -1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
// Specify the Job Configuration path. ConfigParser parses all Job, Plugin, and Core information and returns it in Configuration
Configuration configuration = ConfigParser.parse(jobPath);// Configuration contains all information about the configuration file throughout the datax program
/ / /... Omit other code
Engine engine = new Engine();
engine.start(configuration);// Start the engine
}
Copy the code
How to parse the configParser. parse method temporarily omitted, can be used directly
Engine’s start method does the following:
- The first is to continue binding some information to the configuration class, such as the ColumnCast conversion information.
- Initialize the PluginLoader to retrieve various plug-in configurations (in preparation for subsequent hot loading).
- Create JobContainer and start it. JobContainer will be the running container for a data synchronization job.
/* check job model (job/task) first */
public void start(Configuration allConf) {
// Bind column to transform information
ColumnCast.bind(allConf);
/** * Initialize PluginLoader to get various plug-in configurations */
LoadUtil.bind(allConf);
booleanisJob = ! ("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer will set and adjust the value after schedule
int channelNumber =0;
AbstractContainer container;
long instanceId;
int taskGroupId = -1;
// Basically job mode
if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
//JobContainer initializes, passing in global configuration parameters
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
} else {
/ / /... The code omitted here is almost useless
}
PerfTrace is enabled by default
boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
// Datax Shell tasks in Standlone mode are not reported
if(instanceId == -1){
perfReportEnable = false;
}
// The code is not understood here
int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}
// Extract job-related configurations from the total configuration file
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
// Initialize PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
// Start JobContainer. The startup class is introduced and JobContainner is entered
container.start()
}
Copy the code
2 JobContainer
The Job instance runs in jobContainer. It is the master of all tasks and is responsible for initialization, splitting, scheduling, running, recycling, monitoring, and reporting. However, it does not perform actual data synchronization.
The start method
- JobContainer is responsible for all tasks in start(), including init, prepare, split, and scheduler
- The init method is responsible for initializing and loading readers and writers
- Prepare Method Do some preparatory work
- The split method splits jobs into multiple tasks based on the configured concurrency parameters
- Scheduler is the real scheduling task scheduling and running.
@Override
public void start(a) {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if(isDryRun) {
/ / /... Omitted, hardly needed
} else {
// Clone a copy of the configuration, because changes need to be made
userConf = configuration.clone();
// preprocessing
this.preHandle();
// Initialize the read and write plug-ins
this.init();
// Do the preloading of plug-ins, some plug-ins are not required, such as mysqlReader
this.prepare();
// Split tasks to prepare for concurrency
this.totalStage = this.split();
// Start the task
this.schedule();
// Task post-processing
this.post();
// Task post-processing
this.postHandle();
// Trigger hook? Don't understand
this.invokeHooks(); }}catch (Throwable e) {
/ / /... Omit other code
} finally {
/ / /... Omit other code}}Copy the code
Init the init method is used to initialize read and Writer plug-ins, including loading the specified plug-in through the class loader and assigning the contents of the configuration file to internal variables of the Read and Write plug-ins for subsequent calls. This procedure makes a judgment about the tables, columns, and so on in the configuration file. After initialization, the read and write variables in the container are the concrete plug-ins.
private void init(a) {
// Get the job from the configuration
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
if (this.jobId < 0) {
LOG.info("Set jobId = 0");
this.jobId = 0;
// Add jobId information to the configuration information
this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
this.jobId);
}
Thread.currentThread().setName("job-" + this.jobId);
// This is not the case
JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
this.getContainerCommunicator());
// Reader must come first, since Writer depends on Reader
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
}
Copy the code
The initJobReader method mainly uses the URLClassLoader to load a class of plug-ins, which can be found in the specified directory for loading. Once loaded, the plug-in’s own internal init method is called for personalized initialization.
private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) {
// Get the read plug-in name
this.readerPluginName = this.configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME); //job.content[0].reader.name
// The lib package that loads the plug-in is loaded into the JVM based on the read plug-in class name
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.readerPluginName)); // Reset the plugin jar classLoader
// Create a read object
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
// Set jobConfig for reader
jobReader.setPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
// Set readerConfig for reader
jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init(); // After loading a specific plug-in, perform the corresponding operation
// reset the original classLoader
classLoaderSwapper.restoreCurrentThreadClassLoader();
return jobReader;
}
Copy the code
Take the initialization of the MysqlReader plug-in
@Override
public void init(a) {
this.originalConfig = super.getPluginJobConf();
Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
if(userConfigedFetchSize ! =null) {
LOG.warn("There is no need to configure fetchSize for mysqlReader, mysqlReader will ignore this configuration. If you don't want to see this warning again, remove the fetchSize configuration.);
}
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
this.commonRdbmsReaderJob.init(this.originalConfig);
}
Copy the code
The prepare method is omitted here. It is not required by all plug-ins and is empty in many plug-ins.
JobContainer Task splitting method: split
After init and prepare, the most important step before task execution is task partitioning.
- Split refers to splitting readers and writers based on the needChannelNumber. Each Reader and Writer plug-in has its own split method.
- The jobReader that has been initialized in JobContainer splits the Configuration based on the Configuration and its own conditions. The Configuration file that has been assigned to JobContainer contains all information about the data to be synchronized.
- After splitting, a List of configurations is returned. Each Configuration represents a portion of the data that needs to be synchronized from the original total Configuration file. Add it to the total configuration file store to provide configuration support for subsequent calls.
- Note that readers must be shelled first, because Writer shards readers based on the number of readers that have been shelled.
private int split(a) {
this.adjustChannelNumber();
// Set the number of pipes
if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}
// Shard read plugins, returns a list of read plugins configurations containing each shard, one for each subsequent service
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
// Read the number of plug-in shards
int taskNumber = readerTaskConfigs.size();
// Split write plugins according to the number of shards of read plugins, return a list of write configurations containing each shard
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);
Job. Content [0]. Transformer configuration.
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
// Merge read task configuration, write task configuration, transformer configuration
List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
// Assign the configured list to the general configuration file this.configuration for subsequent calls.
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
return contentConfig.size();
}
Copy the code
The mysql Reader is used as an example
@Override
public List<Configuration> split(int adviceNumber) {
return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
}
Copy the code
MysqlReader actually calls the split method of commonRdbmsReaderJob, which is common to the framework itself. See the code comment below for details
public static List<Configuration> doSplit(
Configuration originalSliceConfig, int adviceNumber) {
boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
int eachTableShouldSplittedNumber = -1;
if (isTableMode) {
// adviceNumber indicates the number of channels, i.e., the number of concurrent datax tasks
/ / the number of copies eachTableShouldSplittedNumber is a single table should segmentation, rounded up and adviceNumber no proportion relationship already
eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
}
// Get the column information in the configuration file
String column = originalSliceConfig.getString(Key.COLUMN);
// Get the configuration of where in the configuration file
String where = originalSliceConfig.getString(Key.WHERE, null);
// Get all connections in the configuration file
List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);
List<Configuration> splittedConfigs = new ArrayList<Configuration>();
// Iterate over all connections
for (int i = 0, len = conns.size(); i < len; i++) {
Configuration sliceConfig = originalSliceConfig.clone();
// Get the configuration of the corresponding connection
Configuration connConf = Configuration.from(conns.get(i).toString());
String jdbcUrl = connConf.getString(Key.JDBC_URL);
sliceConfig.set(Key.JDBC_URL, jdbcUrl);
// Extract IP /port from jdbcUrl for resource usage marking to provide meaningful shuffle operations for core
sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));
sliceConfig.remove(Constant.CONN_MARK);
Configuration tempSlice;
// The table mode is configured
if (isTableMode) {
// It has been extended and processed previously, and can be used directly
List<String> tables = connConf.getList(Key.TABLE, String.class);
Validate.isTrue(null! = tables && ! tables.isEmpty(),"You read the database table configuration error.");
String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);
/ / the final segmentation number does not necessarily equals eachTableShouldSplittedNumber
boolean needSplitTable = eachTableShouldSplittedNumber > 1
&& StringUtils.isNotBlank(splitPk);
if (needSplitTable) {
if (tables.size() == 1) {
Num =num*2+1
// splitPk is null; // splitPk is null
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1; // Should not add 1 to cause a long tail
// Consider other ratio figures? (splitPk is null, ignore the long tail)
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
// To avoid importing small Hive files, the default cardinality is 5. Use splitFactor to set the cardinality
// The final task count is (channel/tableNum) rounded up *splitFactor
Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR);
eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor;
}
/ / try on each table, segmentation for eachTableShouldSplittedNumber
for (String table : tables) {
tempSlice = sliceConfig.clone();
tempSlice.set(Key.TABLE, table);
If splitPK is not configured, you do not need to split a single tableList<Configuration> splittedSlices = SingleTableSplitUtil .splitSingleTable(tempSlice, eachTableShouldSplittedNumber); splittedConfigs.addAll(splittedSlices); }}else {//
for(String table : tables) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.TABLE, table); String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column); tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); splittedConfigs.add(tempSlice); }}}else {
// querySql mode is configured. If SQL mode is configured, it is relatively simple, several SQL sentences, several segmentation
List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);
// TODO check is configured as multiple statements?
for(String querySql : sqls) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.QUERY_SQL, querySql); splittedConfigs.add(tempSlice); }}}return splittedConfigs;
}
Copy the code
Program down from above and is not hard to find in the annotation, the split method can determine whether the need for single table within segmentation, when meets the demand of concurrency is higher, and configure the splitPk partitioning (primary key) parameters, requirements for single table split, split in front of the number have been calculated, or several tables to open several concurrent, The following is a single table split source code: mainly through the primary key, table name, column name, WHERE condition, combined into a SQL, and then by adding where condition to THE SQL, divide the primary key range, and then split the SQL to the corresponding Configuration file class Configuration and form a list, as each task divided out of the Configuration basis.
public static List<Configuration> splitSingleTable(
Configuration configuration, int adviceNum) {
List<Configuration> pluginParams = new ArrayList<Configuration>();
List<String> rangeList;
String splitPkName = configuration.getString(Key.SPLIT_PK);
String column = configuration.getString(Key.COLUMN);
String table = configuration.getString(Key.TABLE);
String where = configuration.getString(Key.WHERE, null);
boolean hasWhere = StringUtils.isNotBlank(where);
if (DATABASE_TYPE == DataBaseType.Oracle) {
rangeList = genSplitSqlForOracle(splitPkName, table, where,
configuration, adviceNum);
} else {
Pair<Object, Object> minMaxPK = getPkRange(configuration);
if (null == minMaxPK) {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK,
"Shard table by shard primary key failed. DataX supports only one shard primary key and the type is integer or string. Try using another shard primary key or contact your DBA.);
}
configuration.set(Key.QUERY_SQL, buildQuerySql(column, table, where));
if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) {
// Start /end is Null
pluginParams.add(configuration);
return pluginParams;
}
boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration
.getString(Constant.PK_TYPE));
boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration
.getString(Constant.PK_TYPE));
// Perform gradual segmentation to find the critical value
if (isStringType) {
rangeList = RdbmsRangeSplitWrap.splitAndWrap(
String.valueOf(minMaxPK.getLeft()),
String.valueOf(minMaxPK.getRight()), adviceNum,
splitPkName, "'", DATABASE_TYPE);
} else if (isLongType) {
rangeList = RdbmsRangeSplitWrap.splitAndWrap(
new BigInteger(minMaxPK.getLeft().toString()),
new BigInteger(minMaxPK.getRight().toString()),
adviceNum, splitPkName);
} else {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK,
"The splitPk type DataX that you configured is not supported. DataX supports only one shard primary key and the type is integer or string. Try using another shard primary key or contact your DBA.);
}
}
String tempQuerySql;
// Store all the SQL in a table
List<String> allQuerySql = new ArrayList<String>();
if (null! = rangeList && ! rangeList.isEmpty()) {for (String range : rangeList) {
Configuration tempConfig = configuration.clone();
// Here the primary key is shard to get the range and add the range to the WHERE condition to form a new SQL
tempQuerySql = buildQuerySql(column, table, where)
+ (hasWhere ? " and " : " where ") + range; allQuerySql.add(tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql); pluginParams.add(tempConfig); }}else {
Configuration tempConfig = configuration.clone();
tempQuerySql = buildQuerySql(column, table, where)
+ (hasWhere ? " and " : " where ")
+ String.format(" %s IS NOT NULL", splitPkName);
// add to SQL collection
allQuerySql.add(tempQuerySql);
tempConfig.set(Key.QUERY_SQL, tempQuerySql);
pluginParams.add(tempConfig);
}
Configuration tempConfig = configuration.clone();
tempQuerySql = buildQuerySql(column, table, where)
+ (hasWhere ? " and " : " where ")
+ String.format(" %s IS NULL", splitPkName);
// add to SQL collection
allQuerySql.add(tempQuerySql);
tempConfig.set(Key.QUERY_SQL, tempQuerySql);
pluginParams.add(tempConfig);
return pluginParams;
}
Copy the code
Conclusion:
- Table mode: When splitPk is not configured, the number of tasks is the same as that of tables. For example, if the table is configured with two tasks (table1 and table2), at least two tasks are enabled for table1 and table2 respectively.
- Table mode: splitPk is used together with channel. Number of tasks = (rounded up)(Number of channels/tables). When the number of tasks is greater than 1, the task will be divided again. The final number of tasks = Number of tasks x 5 + 1. The configured splitPk will be integrated into the querySql in the Configuration. For example, if an ID is configured, conditions such as ID >1 and ID <5 will be added to the querySql to achieve the splitting effect.
- QuerySql mode: There are several querySql, generating the same number of task configurations.
- Writer has only the table mode. If there is only one table, ensure that the number of tasks in writer is the same as that in Reader. If there are multiple tables, ensure that the number of tasks in writer is the same as that in Reader
- Note: The sharding policy for mysql is applicable to databases that support SQL statements. It does not represent the sharding policy for all data sources
3 JobContainer Task scheduling method: schedule
Select * from taskgroup; select * from taskgroup; select * from taskgroup; select * from taskgroup;
// The number of tasks to run per taskgroup
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
/ / the total number of the task
int taskNumber = this.configuration.getList(
CoreConstant.DATAX_JOB_CONTENT).size();
/ / taskgroup quantity
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
Copy the code
Then, obtain the configuration information to determine which tasks each taskGroup needs to run. After determining the number of tasks, allocate tasks to specific Taskgroups evenly, create task actuators, and execute the tasks.
// Assign specific tasks to specific taskgroups equally.
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
ExecuteMode executeMode = null;
AbstractScheduler scheduler;
try {
// Create an actuator to monitor what
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
/ / set executeMode
for (Configuration taskGroupConfig : taskGroupConfigs) {
taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
}
/ /... Omit other code
// Start the task
scheduler.schedule(taskGroupConfigs);
} catch (Exception e) {
/ /... Omit other code
}
Copy the code
The scheduler. Schedule method then calls the startAllTaskGroup method of its AbstractScheduler parent class to start all taskgroups after certain parameters and exception exclusion checks are configured.
public void startAllTaskGroup(List<Configuration> configurations) {
// Start a thread pool with the number of taskgroups
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());
for (Configuration taskGroupConfiguration : configurations) {
// Create a TaskGroupContainerRunner thread
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
// Enable the taskgroup to run
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}
Copy the code
When the thread is started, TaskGroupContainer is launched to run all tasks in a TaskGroup
@Override
public void run(a) {
try {
// Set the thread name
Thread.currentThread().setName(
String.format("taskGroup-%d".this.taskGroupContainer.getTaskGroupId()));
/ / start TaskGroupContainer
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
this.state = State.FAILED;
throwDataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); }}Copy the code
4 TaskGroupContainer
Then the TaskGroupContainer starts. The TaskGroupContainer starts in two parts:
- Initialize task execution status information, including taskId and Congifuration map mapping set, task queue waiting to run, Task FailedexecutorMap, and task set runTasks
- Enter a loop to judge the execution status of each task.
- Check whether any task fails. If any task fails, the task is added to the Task FailedexecutorMap. If the task supports rerun and failOver, the task is put back to the execution queue. If there is no failure, the task is marked as successful and removed from the status polling map
- If a failed task is found, the status is reported to the container and an exception is thrown
- Check the length of the current execution queue, and if there are channels in the execution queue, build the TaskExecutor, add it to the execution queue, and remove it from waiting
- Check the execution queue and the status of all tasks, and if all tasks are successfully executed, report the taskGroup status and exit the loop
- Check whether the current time exceeds the report time. If the time exceeds the report time, report the current status to the whole world
- After all tasks are successful, report the current task status to the whole world.
See the source code notes below.
public class TaskGroupContainer extends AbstractContainer {
private static final Logger LOG = LoggerFactory
.getLogger(TaskGroupContainer.class);
// The current jobId of the taskGroup
private long jobId;
/ / the current taskGroupId
private int taskGroupId;
// The channel class to use
private String channelClazz;
// Class used by the Task collector
private String taskCollectorClass;
private TaskMonitor taskMonitor = TaskMonitor.getInstance();
public TaskGroupContainer(Configuration configuration) {
super(configuration);
initCommunicator(configuration); // Initializes communicator
this.jobId = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
/ / core. Container. TaskGroup. Id task group id
this.taskGroupId = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
/ / pipeline implementation class core. Transport. Channel. The class
this.channelClazz = this.configuration.getString(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
/ / the core task collector. The statistics. The collector. The plugin. TaskClass
this.taskCollectorClass = this.configuration.getString(
CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
}
/ /...
@Override
public void start(a) {
try {
/ * * * state check interval, is shorter, can be distributed the tasks in a timely manner to the corresponding channel of * core in the container. The taskGroup. SleepInterval * /
int sleepIntervalInMillSec = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
/ * * * status report time interval, a bit long, avoid a lot of report * core in the container. The taskGroup. ReportInterval * /
long reportIntervalInMillSec = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,
10000);
/** * 2 minutes to report one-time statistics */
//core.container.taskGroup.channel
// Get the number of channels
int channelNumber = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
/ / maximum number of retries core. Container. Task. FailOver. MaxRetryTimes 1 by default
int taskMaxRetryTimes = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);
/ / task group retry interval. The core container. Task. FailOver. RetryIntervalInMsec
long taskRetryIntervalInMsec = this.configuration.getLong(
CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
//core.container.task.failOver.maxWaitInMsec
long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
// Get all task configurations of the current task group
List<Configuration> taskConfigs = this.configuration
.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
int taskCountInThisTaskGroup = taskConfigs.size();
LOG.info(String.format(
"taskGroupId=[%d] start [%d] channels for [%d] tasks.".this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
// Task group registers communicator
this.containerCommunicator.registerCommunication(taskConfigs);
//taskId and task configuration
Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs);
List<Configuration> taskQueue = buildRemainTasks(taskConfigs); // List of tasks to run
Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>(); //taskId and the last failed instance
List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); // The task is running
Map<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); // Task start time
long lastReportTimeStamp = 0;
Communication lastTaskGroupContainerCommunication = new Communication();
// Start the loop
while (true) {
//1. Check the task status
boolean failedOrKilled = false;
Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap(); // Task id corresponds to communicator, which is used to collect task operation information
for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
Integer taskId = entry.getKey();
Communication taskCommunication = entry.getValue();
if(! taskCommunication.isFinished()){continue; // The current task is not finished
}
// The finished task is removed from the running task collection
TaskExecutor taskExecutor = removeTask(runTasks, taskId);
// It is removed from runTasks, so it is removed from monitor
taskMonitor.removeTask(taskId);
// Failed to check whether the task supports failover and the retry times did not exceed the upper limit
if(taskCommunication.getState() == State.FAILED){
taskFailedExecutorMap.put(taskId, taskExecutor);
if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){
taskExecutor.shutdown(); // Close the old executor
containerCommunicator.resetCommunication(taskId); // Reset the state of the task
Configuration taskConfig = taskConfigMap.get(taskId);
taskQueue.add(taskConfig); // Rejoin the task list
}else{
failedOrKilled = true;
break; }}else if(taskCommunication.getState() == State.KILLED){
failedOrKilled = true;
break;
}else if(taskCommunication.getState() == State.SUCCEEDED){
Long taskStartTime = taskStartTimeMap.get(taskId);
if(taskStartTime ! =null){
Long usedTime = System.currentTimeMillis() - taskStartTime;
LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms".this.taskGroupId, taskId, usedTime);
//usedTime*1000*1000 is converted to the NS of PerfRecord, which is mainly a simple register to print the longest task. Hence the addition of specific static methods
PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L); taskStartTimeMap.remove(taskId); taskConfigMap.remove(taskId); }}}// 2. If the taskExecutor status of the taskGroup fails, an error is reported
if (failedOrKilled) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
}
//3. A task is not executed and the number of running tasks is less than the upper limit of the channel
Iterator<Configuration> iterator = taskQueue.iterator();
while(iterator.hasNext() && runTasks.size() < channelNumber){
Configuration taskConfig = iterator.next();
Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
int attemptCount = 1;
TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
if(lastExecutor! =null){
attemptCount = lastExecutor.getAttemptCount() + 1;
long now = System.currentTimeMillis();
long failedTime = lastExecutor.getTimeStamp();
if(now - failedTime < taskRetryIntervalInMsec){ // The waiting time is not reached
continue;
}
if(! lastExecutor.isShutdown()){// The task that failed last time is still not finished
if(now - failedTime > taskMaxWaitInMsec){
markCommunicationFailed(taskId);
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "Task Failover wait timeout");
}else{
lastExecutor.shutdown(); // Try closing again
continue; }}else{
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown".this.taskGroupId, taskId, lastExecutor.getAttemptCount());
}
}
Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
iterator.remove();
runTasks.add(taskExecutor); // Continue to add to the collection of running tasks
// Above, add task to runTasks list, so register with monitor.
taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
// The task id is removed from the failed map
taskFailedExecutorMap.remove(taskId);
LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started".this.taskGroupId, taskId, attemptCount);
}
//4. The task list is empty, executor is finished, and the collection status is SUCCESS --> Success
if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
// In case of success, report again. Otherwise, the information collected will be inaccurate if the task ends very quickly
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
LOG.info("taskGroup[{}] completed it's tasks.".this.taskGroupId);
break;
}
// 5. If the current time has exceeded the interval of the reporting time, we need to report it immediately
long now = System.currentTimeMillis();
if (now - lastReportTimeStamp > reportIntervalInMillSec) {
lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
lastReportTimeStamp = now;
//taskMonitor checks running tasks per reportIntervalInMillSec
for(TaskExecutor taskExecutor:runTasks){ taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
}
}
Thread.sleep(sleepIntervalInMillSec);
}
//6. One last debrief
reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
} catch (Throwable e) {
Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
if (nowTaskGroupContainerCommunication.getThrowable() == null) {
nowTaskGroupContainerCommunication.setThrowable(e);
}
nowTaskGroupContainerCommunication.setState(State.FAILED);
this.containerCommunicator.report(nowTaskGroupContainerCommunication);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}finally {
if(! PerfTrace.getInstance().isJob()){// Finally print the average CPU consumption and GC statistics
VMInfo vmInfo = VMInfo.getVmInfo();
if(vmInfo ! =null) {
vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); } LOG.info(PerfTrace.getInstance().summarizeNoException()); }}}}Copy the code
5 TaskExecute
TaskExecute is an internal class of TaskGroupContainer that manages the execution of the basic task.
- Initialize some information, such as initializing the read-write thread, instantiating the pipe that stores the read data, and getting transformer parameters.
- After initialization, the read/write thread is started. A single task (some data synchronization tasks) is started.
- Read runner uses JDBC to encapsulate each data read from the database into a Record into a Channel. When the data is read, a TerminateRecord identifier will be written at the end.
- The WriterRunner reads the Record continuously from a Channel until TerminateRecord identifies the data to be finished and all the data is read into the database
class TaskExecutor {
private Configuration taskConfig; // Current task configuration item
private Channel channel; // The pipe is used to cache the read data
private Thread readerThread; / / read threads
private Thread writerThread; / / write threads
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
/** ** taskPluginCollector */ * channel * 2. ReaderRunner * 3
private Communication taskCommunication;
public TaskExecutor(Configuration taskConf, int attemptCount) {
// Get the taskExecutor configuration
this.taskConfig = taskConf;
/ /...
/** * The taskExecutor Communication from taskId is passed to readerRunner and writerRunner, as well as to channel for statistics */
this.taskCommunication = containerCommunicator
.getCommunication(taskId);
// Instantiate the pipe that stores read data
this.channel = ClassUtil.instantiate(channelClazz,
Channel.class, configuration);
this.channel.setCommunication(this.taskCommunication);
/** * get transformer parameters */
List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
/** * generate writerThread */
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
String.format("%d-%d-%d-writer",
jobId, taskGroupId, this.taskId));
ContextClassLoader (contextClassLoader); // contextClassLoader (contextClassLoader)
this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.taskConfig.getString(
CoreConstant.JOB_WRITER_NAME)));
/** * Generate readerThread */
readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
String.format("%d-%d-%d-reader",
jobId, taskGroupId, this.taskId));
/** * contextClassLoader (contextClassLoader, contextClassLoader, contextClassLoader)
this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
PluginType.READER, this.taskConfig.getString(
CoreConstant.JOB_READER_NAME)));
}
public void doStart(a) {
this.writerThread.start();
// Writer can't finish without reader
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
this.readerThread.start();
// Here reader may end soon
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
// There is a possibility that the Reader will start and hang, for which an exception must be thrown immediately
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable()); }}}Copy the code
ReaderRunner (similar to WriterRunner)
- ReaderRunner is initialized by Taskexcute’s generateRunner.
- ReaderRunner mainly calls the task internal class of the corresponding plugin, calls the init, prepare and startRead methods of each plug-in, and starts to read the database data.
public void run(a) {
assert null! =this.recordSender;
Reader.Task taskReader = (Reader.Task) this.getPlugin();
// Count waitWriterTime and end in finally.
PerfRecord channelWaitWrite = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME);
try {
channelWaitWrite.start();
LOG.debug("task reader starts to do init ...");
PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
initPerfRecord.start();
taskReader.init();
initPerfRecord.end();
LOG.debug("task reader starts to do prepare ...");
PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE);
preparePerfRecord.start();
taskReader.prepare();
preparePerfRecord.end();
LOG.debug("task reader starts to read ...");
PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
dataPerfRecord.start();
taskReader.startRead(recordSender);
recordSender.terminate();
dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
dataPerfRecord.end();
LOG.debug("task reader starts to do post ...");
PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_POST);
postPerfRecord.start();
taskReader.post();
postPerfRecord.end();
// automatic flush
// super.markSuccess(); This cannot be marked as a success, as this is marked by writerRunner.
} catch (Throwable e) {
LOG.error("Reader runner Received Exceptions:", e);
super.markFail(e);
} finally {
LOG.debug("task reader starts to do destroy ...");
PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);
desPerfRecord.start();
super.destroy();
desPerfRecord.end();
channelWaitWrite.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_WRITER_TIME));
long transformerUsedTime = super.getRunnerCommunication().getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME);
if (transformerUsedTime > 0) {
PerfRecord transformerRecord = newPerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.TRANSFORMER_TIME); transformerRecord.start(); transformerRecord.end(transformerUsedTime); }}}Copy the code
Take mysql as an example, mysqlReader reads data through JDBC and forwards it to the corresponding Writer through senderRecord in the form of a Record through a Channel. The code is as follows
public void startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize) {
String querySql = readerSliceConfig.getString("querySql");
String table = readerSliceConfig.getString("table");
PerfTrace.getInstance().addTaskDetails(this.taskId, table + "," + this.basicMsg);
LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql, this.basicMsg);
PerfRecord queryPerfRecord = new PerfRecord(this.taskGroupId, this.taskId, PHASE.SQL_QUERY);
queryPerfRecord.start();
Connection conn = DBUtil.getConnection(this.dataBaseType, this.jdbcUrl, this.username, this.password);
DBUtil.dealWithSessionConfig(conn, readerSliceConfig, this.dataBaseType, this.basicMsg);
int columnNumber = false;
ResultSet rs = null;
try {
rs = DBUtil.query(conn, querySql, fetchSize);
queryPerfRecord.end();
ResultSetMetaData metaData = rs.getMetaData();
int columnNumber = metaData.getColumnCount();
PerfRecord allResultPerfRecord = new PerfRecord(this.taskGroupId, this.taskId, PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
long rsNextUsedTime = 0L;
for(long lastTime = System.nanoTime(); rs.next(); lastTime = System.nanoTime()) {
rsNextUsedTime += System.nanoTime() - lastTime;
// Transfer the record to channel via recordSender
this.transportOneRecord(recordSender, rs, metaData, columnNumber, this.mandatoryEncoding, taskPluginCollector);
}
allResultPerfRecord.end(rsNextUsedTime);
LOG.info("Finished read record by Sql: [{}\n] {}.", querySql, this.basicMsg);
} catch (Exception var20) {
throw RdbmsException.asQueryException(this.dataBaseType, var20, querySql, table, this.username);
} finally {
DBUtil.closeDBResources((Statement)null, conn); }}Copy the code
Writer retrieves the Record from a Channel and stores it in the target database.
6 Data structure during data transmission
A separate article will be discussed