The more concise, the clearer
Brief description
This article focuses on two things about Datax:
- Scheduling process
- Data transfer process
Scheduling refers to the order and priority in which the Datax executes tasks based on data (task execution); Data transfer refers to how readers and writers interact with each other and how Datax features such as rate control and parallel operation are implemented.
The source entry
Scheduling process
The code entry to the scheduling process is in the Schedule () method of JobContainer.java.
We first get the global number of channels, the number of channels for each TaskGroup, and calculate the required number of channels.
int channelsPerTaskGroup = this.configuration.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
int taskNumber = this.configuration.getList(
CoreConstant.DATAX_JOB_CONTENT).size();
this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
Copy the code
JobAssignUtil is then called to allocate appropriate channels to each TaskkGroup.
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup); // Fair distribution
Copy the code
AbstractScheduler is then instantiated for scheduling.
private void schedule(a) {
/ /...
scheduler.schedule(taskGroupConfigs);
/ /...
}
Copy the code
There are two main things that the scheduling method does:
- Register monitoring information (covered in a separate article below)
- Start all tasks
The code to start the task is
int totalTasks = calculateTaskCount(configurations); // Count the total number of tasks
startAllTaskGroup(configurations); // Start all tasks
Copy the code
ProcessInnerScheduler#startAllTaskGroup
public void startAllTaskGroup(List<Configuration> configurations) {
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());
for (Configuration taskGroupConfiguration : configurations) {
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration); TaskGroupContainerRunner -> TaskGroupContaine
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}
Copy the code
Datax uses TaskGroupContainerRunner to encapsulate the Configuration as a TaskGroupContainer, The state attribute of TaskGroupContainerRunner represents the state of its TaskGroupContainer. It is then thrown to the thread pool to execute. Next is the code that starts the task.
try {
Thread.currentThread().setName(
String.format("taskGroup-%d".this.taskGroupContainer.getTaskGroupId()));
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
this.state = State.FAILED;
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
Copy the code
At this point, the scheduled process ends and the data transfer process follows.
Data transfer process
Initialize the
The data transfer process is stored in TaskGroupContainer. Therefore, the configuration of control indicators and retry policies is handled in detail here. First, Datax uses a producer-consumer model to decouple, improve parallelism, and control the rate at which data is processed. In TaskGroupContainer, readers and writers are actually linked through channels. A channel can be in-memory or persistent, and the plug-in does not care. The plug-in writes data to the channel through the RecordSender and reads data from the channel through the RecordReceiver.
The code is too long, so I’ll take a screenshot. TaskGroupContainer#start code entry, representing the TaskGroup start. The main contents of this method are:
- Gets parameters to configure the monitor
- Encapsulate each parameter into an executable task, perform the task and register the monitor
- You can monitor the task status and retry the task
We focus here on the execution of the mission. TaskExecutor is an inner class of TaskGoupContainer.
class TaskExecutor {
private Configuration taskConfig;
private int taskId;
private int attemptCount;
private Channel channel;
private Thread readerThread;
private Thread writerThread;
private ReaderRunner readerRunner;
private WriterRunner writerRunner;
private Communication taskCommunication;
// method area
}
Copy the code
TaskGoupContainer encapsulates the shard configuration into a TaskExecutor
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();
Copy the code
The code to start is the following
public void doStart(a) {
this.writerThread.start(); / / 1.
/ / 2.
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
//throw exception
}
this.readerThread.start(); / / 3.
/ / 4.
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
//throw exception}}Copy the code
Code ① and code ③ start two threads, the reader thread and the writer thread respectively. Both threads are generated when TaskExecutor is instantiated. First let’s look at the amount of Runnable the writer thread needs to execute. This is mainly the generateRunner method.
/ / 1.
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME));
/ / 2.
newRunner.setJobConf(this.taskConfig
.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
/ / 3.
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.WRITER);
((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
this.channel, pluginCollector));
/ / 4.
newRunner.setTaskPluginCollector(pluginCollector);
Copy the code
Code ① loads the WriterRunner task; Code ② passes in the write task configuration as an argument. Codes ③ and ④ actually load the taskPlugin for handling dirty data and job/ Task traffic data
And then we look at the read task, which is also in the generateRunner method
/ / 1.
newRunner = LoadUtil.loadPluginRunner(pluginType,
this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
/ / 2.
newRunner.setJobConf(this.taskConfig.getConfiguration(
CoreConstant.JOB_READER_PARAMETER));
/ / 3.
pluginCollector = ClassUtil.instantiate(
taskCollectorClass, AbstractTaskPluginCollector.class,
configuration, this.taskCommunication,
PluginType.READER);
/ / 4.
RecordSender recordSender;
if(transformerInfoExecs ! =null && transformerInfoExecs.size() > 0) {
recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
} else {
recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
}
((ReaderRunner) newRunner).setRecordSender(recordSender);
/ / 5.
newRunner.setTaskPluginCollector(pluginCollector);
Copy the code
Code ① loads the ReaderRunner task. Code ② writes the read configuration as an argument. Codes ③ and ⑤ actually load the taskPlugin for handling dirty data and job/ Task traffic data. It is worth noting in code 4 that it determines which Exchange to use. Exchange can be understood as an external middleware interacting with a channel. BufferedRecordTransformerExchanger refers to may, according to a specific format. BufferedRecordExchanger transfers only according to the Datax format.
Task read/write starts (MySQl as an example)
When the read/write task is initialized, the next step is the start method. First, read tasks. The read task is defined in ReaderRunner, and for more flexibility, the plugin mechanism is also used in ReaderRunner. If we need to extend, just extend reader.task.
Reader.Task taskReader = (Reader.Task) this.getPlugin();
Copy the code
Then focus on the startRead method of Reader.Task that calls the startRead method passed into Exchange
taskReader.startRead(recordSender);
recordSender.terminate();
Copy the code
Calling the startRead method requires the Exchange to be passed in, followed by the concrete data source implementation. Implementation class CommonRdbmsReader
The first is to get QUERY_SQL, USERNAME, password and other parameters, to connect
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl, username, password);
Copy the code
Then execute QUERY_SQL to retrieve the data
rs = DBUtil.query(conn, querySql, fetchSize);
Copy the code
Then loop through the query results and start writing
while (rs.next()) {
this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
}
Copy the code
The transportOneRecord actually uses sanodomain for interaction
protected Record transportOneRecord(RecordSender recordSender, ResultSet rs,
ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
TaskPluginCollector taskPluginCollector) {
Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector);
recordSender.sendToWriter(record);
return record;
}
Copy the code
Next comes writing tasks. Write task implementation in commonRdbmsWriterTask#startWrite. The write task first gets the connection, then handles session issues for the connection, and finally
Connection connection = DBUtil.getConnection(this.dataBaseType,
this.jdbcUrl, username, password);
DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
this.dataBaseType, BASIC_MESSAGE);
Copy the code
And then you start writing data. The write data is obtained from the SANo11003. Obtain data loops from the RECOVERY machine and check whether the number of columns is correct
// Use buffer cache, batchSize is to control the number of each send
List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
while((record = recordReceiver.getFromReader()) ! =null) {
if(record.getColumnNumber() ! =this.columnNumber) {
// throw error... }}Copy the code
If it meets the criteria, it can be added to the cache. And then it goes into the database
doBatchInsert(connection, writeBuffer);
writeBuffer.clear();
bufferBytes = 0;
Copy the code
Exchanger interaction
Exchanger implements RecordSender and RecordReceiver, at the same time, it has two implementation class BufferedRecordExchanger and BufferedRecordTransformerExchanger.
RecordSender.java
public interface RecordSender {
public Record createRecord(a);
public void sendToWriter(Record record);
public void flush(a);
public void terminate(a);
public void shutdown(a);
}
Copy the code
RecordReceiver.java
public interface RecordReceiver {
public Record getFromReader(a);
public void shutdown(a);
}
Copy the code
So let’s take buffered record Changer as an example, and let’s pick a couple of them.
The method name | role |
---|---|
sendToWriter | Write data to a channel |
flush | Flush the buffer, that is, write the buffer data to a channel |
terminate | Flush buffer data into channel immediately |
getFromReader | Get data from a channel |
The sendToWriter method is responsible for putting records into a channel. The first check is made to see if the record size exceeds the limit
if (record.getMemorySize() > this.byteCapacity) {
this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("Single record exceeds size limit, current limit is :%s".this.byteCapacity)));
return;
}
Copy the code
Then determine if the channel is full. If it’s full, refresh and send
boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
if (isFull) {
flush();
}
Copy the code
Otherwise join the queue
this.buffer.add(record);
this.bufferIndex++;
memoryBytes.addAndGet(record.getMemorySize());
Copy the code
The flush method is also simple to add directly to a channel
this.channel.pushAll(this.buffer);
this.buffer.clear();
Copy the code
The terminate method is used to flush buffer data directly into a channel
flush();
this.channel.pushTerminate(TerminateRecord.get());
Copy the code
The getFromReader method is used to fetch data from writer and proceed to the next step. The first step is to check whether the queue is empty. If null, the method is then called to write the data in batches to writer’s buffer
boolean isEmpty = (this.bufferIndex >= this.buffer.size());
if (isEmpty) {
receive();
}
Copy the code
If not empty, read by index and return one of the buffers.
Record record = this.buffer.get(this.bufferIndex++);
if (record instanceof TerminateRecord) {
record = null;
}
return record;
Copy the code
At the end of the article
In general, Datax’s read and write tasks follow this process, but it is flexible in that it provides many implementation cases for data sources. In addition, readers and writers are also provided as plug-ins for more expansion and service adaptation
end