The full POM file for MongoDB support was integrated in the previous article
<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion > 4.0.0 < / modelVersion > < groupId > com. Chinamobile. Iot. Meter < / groupId > < artifactId > RSMS - spark - enables the parent < / artifactId > < version > 1.0 < / version > < packaging > pom < / packaging > <! -- Declare public properties --> <properties> <spark.version>2.1.0</spark.version> <scala.version>2.11.8</scala.version> <log4 j. version > 1.2.17 < /log4j.version> <slf4j.version>1.7.22</slf4j.version> </properties> <dependencies> <! -- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version> </dependency> <! -- Logging End --> <! Spark --> <dependency> <groupId>org.mongodb. Spark </groupId> <artifactId>mongo-spark-connector_2.11</artifactId> < version > 2.1.5 < / version > < / dependency > < the dependency > < groupId > org. Apache. Spark < / groupId > The < artifactId > spark - core_2. 11 < / artifactId > < version >${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version> </dependency> <! --Spark END--> <! -- MongoDB --> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> < version > 3.8.0 < / version > < / dependency > < the dependency > < groupId > org. Springframework. Data < / groupId > < artifactId > spring - data - directing a < / artifactId > < version > 1.10.17. RELEASE < / version > < / dependency > <! --MongoDB END --> </dependencies> <modules> <module>rsms-spark-common</module> <module>rsms-alarm-task</module> <module>rsms-freeze-task</module> </modules> </project>Copy the code
MongoManager adds support for transactions
package com.chinamobile.iot.meter.mongo; import com.chinamobile.iot.meter.config.MongoConfig; import com.mongodb.*; import com.mongodb.client.ClientSession; import com.mongodb.client.MongoDatabase; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import java.util.List; /** * @author DBQ * @date 2019/5/7 */ public class MongoManager {public static Logger Logger = LoggerFactory.getLogger(MongoManager.class); private static MongoClient mongo = null; privateMongoManager() {
}
static {
System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - > > > > > > > > > > > > >");
initDBPrompties();
logger.info("init mongodb client end.");
}
public static MongoDatabase getDB() {
returnmongo.getDatabase(MongoConfig.DB); } /** * Initializes the connection pool */ private static voidinitDBPromptiesMongo = new MongoClient(mongoconfig. HOST, mongoconfig.port); mongo = new MongoClient(mongoconfig. HOST, mongoconfig.port); } catch (MongoException me) { } } public static boolean checkEmpty(String collection) { long count = getDB().getCollection(collection).countDocuments();return count == 0;
}
public static void saveToMongoWithoutTransaction(List<Document> datas, String collection) {
Assert.notEmpty(datas, "The set cannot be empty.");
getDB().getCollection(collection).insertMany(datas);
}
public static void saveToMongo(List<Document> datas, String collection) {
Assert.notEmpty(datas, "The set cannot be empty.");
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = mongo.startSession()) {
clientSession.startTransaction(txnOptions);
getDB().getCollection(collection).insertMany(clientSession, datas);
commitWithRetry(clientSession);
}
}
private static void commitWithRetry(ClientSession clientSession) {
while (true) {
try {
clientSession.commitTransaction();
logger.info("MongoDB Transaction committed");
break;
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
logger.error("UnknownTransactionCommitResult, retrying commit operation ...");
continue;
} else {
logger.error("Exception during commit ...");
throw e;
}
}
}
}
}
Copy the code