People do not know where to go, peach blossom still smile spring breeze.

DataX is a widely used offline data synchronization tool/platform within Alibaba Group. Implement efficient data synchronization among heterogeneous data sources including MySQL, Oracle, SqlServer, Postgre, HDFS, Hive, ADS, HBase, TableStore(OTS), MaxCompute(ODPS), AND DRDS.

To optimize the

The optimization is as follows:

The default HbaseAbstractTask startWriter method

public void startWriter(RecordReceiver lineReceiver,TaskPluginCollector taskPluginCollector){
        Record record;
        try {
            while((record = lineReceiver.getFromReader()) ! =null) {
                Put put;
                try {
                    put = convertRecordToPut(record);
                } catch (Exception e) {
                    taskPluginCollector.collectDirtyRecord(record, e);
                    continue;
                }
                try {
                    this.htable.put(put);
                } catch (IllegalArgumentException e) {
                    if(e.getMessage().equals("No columns to insert") && nullMode.equals(NullModeType.Skip)){
                        LOG.info(String.format("Record is empty. If you set nullMode to [SKIP], this record will be ignored. Record [%s]", record.toString()));
                        continue;
                    }else {
                        taskPluginCollector.collectDirtyRecord(record, e);
                        continue; }}}}catch (IOException e){
            throw DataXException.asDataXException(Hbase094xWriterErrorCode.PUT_HBASE_ERROR,e);
        }finally {
            Hbase094xHelper.closeTable(this.htable); }}Copy the code

The htable API of hbase supports the putList method. Modify the following code:

public void startWriter(RecordReceiver lineReceiver,TaskPluginCollector taskPluginCollector){
        Record record;
        List<Put> putList = new ArrayList<>(2000);
        Long begin = System.currentTimeMillis();
        try {
            while((record = lineReceiver.getFromReader()) ! =null) {
                Put put;
                try {
                    put = convertRecordToPut(record);
                } catch (Exception e) {
                    taskPluginCollector.collectDirtyRecord(record, e);
                    continue;
                }
                putList.add(put);
                try {
                    if (putList.size() % 2000= =0 || System.currentTimeMillis() - begin > 200) {
                        this.asyncTable.put(putList); putList.clear(); begin = System.currentTimeMillis(); }}catch (IllegalArgumentException e) {
                    if (e.getMessage().equals("No columns to insert") && nullMode.equals(NullModeType.Skip)) {
                        LOG.info(String.format("Record is empty. If you set nullMode to [SKIP], this record will be ignored. Record [%s]", record.toString()));
                        continue;
                    } else {
                        taskPluginCollector.collectDirtyRecord(record, e);
                        continue; }}}}finally{ Hbase20xHelper.closeConn(future); }}Copy the code

Change to commit every 2000 records to reduce requests.

conclusion

If you are using a writer that supports batch submission, you can do the same