This is the 7th day of my participation in Gwen Challenge.More article challenges

Demand Background:

In the following article, “Out of the box | Implementing local message queue based on BDB”, we will find that when a large amount of data is queued, there will be performance problems, memory surge is extremely fast, and the latency is relatively high, so we make an optimization in this period. AbstractQueue is not used to implement local message queues through queue head and queue end, and compare the performance and usage of other local disk data of the same type.

The Berkeley DB:

Berkeley DB is an open source KV stand-alone file database of Oracle.

Level DB:

LevelDB LevelDB is a LevelDB database with high write and sequential performance. LevelDB is a LevelDB database with high write and sequential performance. LevelDB is a LevelDB database with high write and sequential performance.

Implement local queue based on BDB pointer:

The BDB queue can be traversed by pointer in order but not as good as queue in the same way as levenDB (see below) implements queue

Code implementation:
import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import com.sleepycat.je.*; import lombok.extern.slf4j.Slf4j; ** @author Taoze * @version 1.0 * @date 4/9/21 15:31 */ @slf4j public class BdbOperator {// declaration of the Environment variable /** * Environment object, this object is the Environment of BDB * BDB JE can only have one writing process, can have multiple read-only processes, * but when the writing process updates the data, The read process cannot detect data changes, * only the close environment is enabled, * so an environment can use one process (proccess) to operate * Note that closing the environment is time-consuming * not particularly necessary. */ private Environment myDbEnvironment = null; Private Database myDatabase = null; /** * BDB operating environment variables and database initialization * setAllowCreate Whether to allow the creation of this environment, true is yes, false is no * setCacheSize sets the cache unit to bytes, SetCacheSize (1000000); * setReadOnly sets transactional access to read-only mode True: read-only * setLocking Sets whether the environment is locked * More parameters can be set using * setConfigParam * this method has many options * envConfig.setConfigParam("je.log.fileMax","20000000"); Set the maximum log file size to 20M. The default is 10M * je.log.bufferSize Sets the log buffer to 1048576 (1M) * je.lock.timeout Lock time * * @param dbEnvFilePath * @param databaseName  */ public BdbOperator(String dbEnvFilePath, String databaseName) {/** * Initializes database parameters */ try {// Initializes data store root folder File f = new File(dbEnvFilePath); if (! f.exists()) { f.mkdirs(); } // Initialize DatabaseConfig dbConfig = new DatabaseConfig(); // Open the database dbconfig.setallowCreate (true); EnvironmentConfig envConfig = new EnvironmentConfig(); EnvironmentConfig envConfig = new EnvironmentConfig(); // Envconfig.setallowCreate (true) is automatically created when the database configuration variable used does not exist; MyDbEnvironment = new Environment(f, envConfig); // Open a database, if it does not exist, it will be created automatically; The first parameter is the transaction myDatabase = myDbEnvironment. OpenDatabase (null, databaseName, dbConfig); } Catch (Exception e) {log.warn("BdbOperator init DBD environment Exception ", e); }} /** * store specified KV in BDB, * * @param key * @param value * @param isSync * @return */ public Boolean put(String key, String value, Boolean isSync) {// Data key // data value try {// Encapsulate both key and value into DatabaseEntry DatabaseEntry theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8)); DatabaseEntry theData = new DatabaseEntry(value.getBytes(StandardCharsets.UTF_8)); // write to the database mydatabase. put(null, theKey, theData); If (isSync) {// Data is synchronized to disk this.sync(); } return true; } Catch (Exception e) {log.warn("BdbOperator put failed ", e); } return false; } public Boolean delete(String key) {DatabaseEntry theKey; try { theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8)); myDatabase.delete(null, theKey); return true; } Catch (Exception e) {log.warn("BdbOperator delete failed ", e); } return false; } /** * public String getValue(String key) {// Public String getValue(String key) {// Encapsulate theKey that reads data into DatabaseEntry DatabaseEntry theKey = new DatabaseEntry(key.getbytes (standardcharsets.utf_8)); DatabaseEntry theData = new DatabaseEntry(); Mydatabase. get(null, theKey, theData, lockmode.default); if (theData.getData() == null) { return null; Return new String(thedata.getData (), standardCharsets.utf_8); } Catch (Exception e) {log.warn("BdbOperator getValue failed ", e); } return null; } /** * SelectAll (Here describes this method function with a few words)* * <p> * void */ public List<String> selectAll()  { Cursor cursor = null; cursor = myDatabase.openCursor(null, null); DatabaseEntry theKey = null; DatabaseEntry theData = null; theKey = new DatabaseEntry(); theData = new DatabaseEntry(); ArrayList<String> list = new ArrayList<>(); while (cursor.getNext(theKey, theData, LockMode.DEFAULT) == OperationStatus.SUCCESS) { list.add(new String(theData.getData())); } cursor.close(); return list; } public Database getMyDatabase() { return myDatabase; } @return */ public Boolean sync() {if (myDbEnvironment! = null) { try { myDbEnvironment.sync(); } Catch (Exception e) {log.warn("BdbOperator sync failure ", e); } return true; } return false; } /** * public Boolean close() {try {if (myDatabase! = null) { myDatabase.close(); } if (myDbEnvironment ! = null) { myDbEnvironment.sync(); myDbEnvironment.cleanLog(); myDbEnvironment.close(); } return true; } Catch (DatabaseException e) {log.warn("BdbOperator close failed ", e); } return false; }}Copy the code

Leveldb-based local queue implementation:

Principle:

Maintain two values in memory: head: team head tail: team tail Insert both values into levelDB at the same time

InitLevelDB: initialized when the container is loaded to get the values at the top and bottom of the queue

@PostConstruct public void initLevelDB() throws IOException { if (StringUtils.isNotBlank(dbPath)) { DBFactory factory = new Iq80DBFactory(); Options options = new Options(); options.createIfMissing(true); db = factory.open(new File(dbPath), options); String headString = get(HEAD_KEY); if (headString ! = null) { head = Long.parseLong(headString); } String tailString = get(TAIL_KEY); if (tailString ! = null) { tail = Long.parseLong(tailString); } // Read to the end at startup, resetting the queue cursor. If (head == tail && head! = 0) { head = 0; tail = 0; put(HEAD_KEY, String.valueOf(head)); put(TAIL_KEY, String.valueOf(tail)); } else if (head > tail) { head = tail; }}}Copy the code

(tail+=1,value); (tail+=1,value); (tail+=1,value)

Public synchronized void push(String value) {if (db == null) {return; } if (tail == long.max_value) {log.error(" The local cache queue has exceeded the system bottleneck, please restart the service; msg={}", value); return; } tail += 1; put(TAIL_KEY, String.valueOf(tail)); put(String.valueOf(tail), value); }Copy the code

Pop: Get the currently initialized held header +1(head = tail after initialization, tail +1 for each push), get the smallest valued header for empty dichotomy, delete the current popped header, and update head_key to head+1

Public synchronized String pop() {if (db == null) {return null; } String find = String.valueOf(head + 1); String value = get(find); if (value == null) { Long hasValueHead = findHasValueHead(head + 1, tail); if (hasValueHead ! = null) { delete(String.valueOf(hasValueHead.longValue())); head = hasValueHead; put(HEAD_KEY, String.valueOf(head)); return get(String.valueOf(head)); } return null; } delete(find); head += 1; put(HEAD_KEY, String.valueOf(head)); return value; }Copy the code

Dichotomy to find the least valued header:

@param head @param tail @return */ private Long findHasValueHead(Long head, Long head) long tail) { if (head > tail) { return null; } long mid = (head + tail) / 2; if (get(String.valueOf(mid)) == null) { return findHasValueHead(mid + 1, tail); } else { if (head == mid) { return head; } return findHasValueHead(head, mid); }}Copy the code

Specific code implementation:

import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBFactory; import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.Options; import org.iq80.leveldb.impl.Iq80DBFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.File; import java.io.IOException; import java.util.LinkedHashMap; import java.util.Map; @Service @Slf4j public class LevelDb { @Value("${levelDB.folder:}") private String dbPath; private DB db = null; private static final String HEAD_KEY = "head_key"; private long head = 0; private static final String TAIL_KEY = "tail_key"; private long tail = 0; @PostConstruct public void initLevelDB() throws IOException { if (StringUtils.isNotBlank(dbPath)) { DBFactory factory = new Iq80DBFactory(); Options options = new Options(); options.createIfMissing(true); db = factory.open(new File(dbPath), options); String headString = get(HEAD_KEY); if (headString ! = null) { head = Long.parseLong(headString); } String tailString = get(TAIL_KEY); if (tailString ! = null) { tail = Long.parseLong(tailString); } // Read to the end at startup, resetting the queue cursor. If (head == tail && head! = 0) { head = 0; tail = 0; put(HEAD_KEY, String.valueOf(head)); put(TAIL_KEY, String.valueOf(tail)); } else if (head > tail) { head = tail; Public synchronized void push(String value) {if (db == null) {return; } if (tail == long.max_value) {log.error(" The local cache queue has exceeded the system bottleneck, please restart the service; msg={}", value); return; } tail += 1; put(TAIL_KEY, String.valueOf(tail)); put(String.valueOf(tail), value); Public synchronized String pop() {if (db == null) {return null; } String find = String.valueOf(head + 1); String value = get(find); if (value == null) { Long hasValueHead = findHasValueHead(head + 1, tail); if (hasValueHead ! = null) { delete(String.valueOf(hasValueHead.longValue())); head = hasValueHead; put(HEAD_KEY, String.valueOf(head)); return get(String.valueOf(head)); } return null; } delete(find); head += 1; put(HEAD_KEY, String.valueOf(head)); return value; } @param key * @return */ public String get(String key) {if (db == null) {return null; } byte[] bytes = db.get(Iq80DBFactory.bytes(key)); return Iq80DBFactory.asString(bytes); } @param Max @return public LinkedHashMap<String, String> iteratorDb(int max) { LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>(); if (db == null) { return linkedHashMap; } DBIterator iterator = db.iterator(); int num = 0; while (iterator.hasNext()) { if (num ++ > max) { break; } Map.Entry<byte[], byte[]> next = iterator.next(); String key = Iq80DBFactory.asString(next.getKey()); String value = Iq80DBFactory.asString(next.getValue()); linkedHashMap.put(key, value); } return linkedHashMap; } public long getHead() {return head; } @return */ public long getTail() {return tail; } @return */ public long getLength() {return tail-head; Private void put(String key, private void put(String key, private void put(String key, private void put)) String value) { db.put(Iq80DBFactory.bytes(key), Iq80DBFactory.bytes(value));; } /** * delete key * @param key */ public void delete(String key) {db.delete(iq80dbFactory.bytes); } @param head @param tail @return */ private Long findHasValueHead(Long head) long tail) { if (head > tail) { return null; } long mid = (head + tail) / 2; if (get(String.valueOf(mid)) == null) { return findHasValueHead(mid + 1, tail); } else { if (head == mid) { return head; } return findHasValueHead(head, mid); }}}Copy the code

Performance comparison results:

Comparison of push performance:

DBD filling MS Single time: 3 500:548 5000 :2820

LDB filling MS Time for a single item: 3 500:36 5000 :129

LDB throughput test:

Throughput :1: Push: 2ms POP: 2ms 100: Push: 10ms POP :11ms 1000: Push: 33ms POP :138ms 10000: Push: 375ms POP :1213ms 100000: Push: 2707ms POP :7293ms 1000000 pieces: Push: 33601ms POP :159013ms 10000000 pieces: push: 494457ms Maximum capacity 10000000+ concurrent: 200 / each thread thread push1000 message java.net.SocketTimeoutException: Read timed outCopy the code