The full POM file for MongoDB support was integrated in the previous article

<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion > 4.0.0 < / modelVersion > < groupId > com. Chinamobile. Iot. Meter < / groupId > < artifactId > RSMS - spark - enables the parent < / artifactId > < version > 1.0 < / version > < packaging > pom < / packaging > <! -- Declare public properties --> <properties> <spark.version>2.1.0</spark.version> <scala.version>2.11.8</scala.version> <log4 j. version > 1.2.17 < /log4j.version> <slf4j.version>1.7.22</slf4j.version> </properties> <dependencies> <! -- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version> </dependency> <! -- Logging End --> <! Spark --> <dependency> <groupId>org.mongodb. Spark </groupId> <artifactId>mongo-spark-connector_2.11</artifactId> < version > 2.1.5 < / version > < / dependency > < the dependency > < groupId > org. Apache. Spark < / groupId > The < artifactId > spark - core_2. 11 < / artifactId > < version >${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version> </dependency> <! --Spark END--> <! -- MongoDB --> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> < version > 3.8.0 < / version > < / dependency > < the dependency > < groupId > org. Springframework. Data < / groupId > < artifactId > spring - data - directing a < / artifactId > < version > 1.10.17. RELEASE < / version > < / dependency > <! --MongoDB END --> </dependencies> <modules> <module>rsms-spark-common</module> <module>rsms-alarm-task</module> <module>rsms-freeze-task</module> </modules> </project>Copy the code

MongoManager adds support for transactions

package com.chinamobile.iot.meter.mongo; import com.chinamobile.iot.meter.config.MongoConfig; import com.mongodb.*; import com.mongodb.client.ClientSession; import com.mongodb.client.MongoDatabase; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import java.util.List; /** * @author DBQ * @date 2019/5/7 */ public class MongoManager {public static Logger Logger = LoggerFactory.getLogger(MongoManager.class); private static MongoClient mongo = null; privateMongoManager() {
    }

    static {
        System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - > > > > > > > > > > > > >");
        initDBPrompties();
        logger.info("init mongodb client end.");
    }

    public static MongoDatabase getDB() {
        returnmongo.getDatabase(MongoConfig.DB); } /** * Initializes the connection pool */ private static voidinitDBPromptiesMongo = new MongoClient(mongoconfig. HOST, mongoconfig.port); mongo = new MongoClient(mongoconfig. HOST, mongoconfig.port); } catch (MongoException me) { } } public static boolean checkEmpty(String collection) { long count = getDB().getCollection(collection).countDocuments();return count == 0;
    }

    public static void saveToMongoWithoutTransaction(List<Document> datas, String collection) {
        Assert.notEmpty(datas, "The set cannot be empty.");
        getDB().getCollection(collection).insertMany(datas);
    }

    public static void saveToMongo(List<Document> datas, String collection) {
        Assert.notEmpty(datas, "The set cannot be empty.");
        TransactionOptions txnOptions = TransactionOptions.builder()
                .readPreference(ReadPreference.primary())
                .readConcern(ReadConcern.MAJORITY)
                .writeConcern(WriteConcern.MAJORITY)
                .build();
        try (ClientSession clientSession = mongo.startSession()) {
            clientSession.startTransaction(txnOptions);
            getDB().getCollection(collection).insertMany(clientSession, datas);
            commitWithRetry(clientSession);
        }
    }

    private static void commitWithRetry(ClientSession clientSession) {
        while (true) {
            try {
                clientSession.commitTransaction();
                logger.info("MongoDB Transaction committed");
                break;
            } catch (MongoException e) {
                // can retry commit
                if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
                    logger.error("UnknownTransactionCommitResult, retrying commit operation ...");
                    continue;
                } else {
                    logger.error("Exception during commit ...");
                    throw e;
                }
            }
        }
    }
}

Copy the code
There was a hickey here. According to MongoDB’s website, support for distributed transactions was planned in version 4.2. And as of 4.0, the transaction enabled Java driver version is 3.8.0.

However, after mongoDB is upgraded to 4.2 and the driver is reduced from 3.10 to 3.8, there is still an error that the driver version does not support sharding transactions. After you copy the driver version 3.8 to the Spark jars directory, the problem is resolved.