preface

Redis (a) how to implement fixed size cache?

Java from zero handwriting implementation of Redis (c) Redis expire principle

Java from zero handwriting implementation redis (three) how to restart memory data is not lost?

Java from zero handwriting implementation redis (four) add listener

Java from zero handwriting redis (five) another way to implement expiration strategy

Java from zero handwriting implementation redis (three) how to restart memory data is not lost? In the RDB mode similar to Redis.

Redis aof basis

Redis AOF persistence details

Some personal understanding of AOF

Why AOF?

The AOF mode performs particularly well, how well?

Those of you who have used Kafka know that kafka also uses the sequential write feature.

Sequential write adds file content, avoiding the random write problem of file IO, and the performance is basically comparable to memory.

AOF has better real-time performance, which is relative to RDB mode.

We used to use the RDB mode to persist all cached content. This is a time-consuming action, which is usually persisted every few minutes.

AOF mode is mainly for modifying the contents of the instructions, and then add all the instructions in order to the file. That way, the real time is much better, can be second level, even second level.

Throughput of AOF

The AOF pattern can persist for every operation, but this leads to a significant throughput drop.

The most common way to improve throughput is to batch, and this is similar in Kafka. For example, we can persist for 1s and store all operations within 1s into buffer.

This is actually a trade-off problem, the art of balancing real-time and throughput.

In actual business, the error of 1s is generally acceptable, so this is also a relatively accepted way in the industry.

AOF asynchrony + multithreading

All operations in Kafka are actually asynchronous + callback.

Asynchrony + multithreading can really improve the performance of operations.

Of course, until Redis 6, it was actually single-threaded. So why is the performance still so good?

In fact, multithreading also has a cost, that is, the thread context switch is time-consuming, maintain the safety of concurrency, also need to lock, thus reducing performance.

So here we have to consider whether the benefits of asynchrony are proportional to the time spent.

AOF trading

Our AOF and RDB models, after all, are based on the operating system’s file system to do persistence.

For developers, this may be achieved by calling an API, but the actual action of persistent dropping is not necessarily completed in one step.

File systems also use buffers to improve throughput. It’s suddenly a little bit of a Matryoshka.

But good design is always the same, such as cache from the CPU design L1/L2 and so on, the idea is the same.

Ali’s many open source technologies will further optimize the operating system, which we will study in depth later.

The defect of AOF

Avenue is missing one, no silver bullet.

AOF is good, and RDB comparison also has a defect, that is the instruction

Java implementation

interface

The interface must be consistent with RDB

/** * Persistent cache interface *@author binbin.hou
 * @since 0.0.7
 * @param <K> key
 * @param <V> value
 */
public interface ICachePersist<K.V> {

    /** * Persistent cache information *@paramCache cache *@since0.0.7 * /
    void persist(final ICache<K, V> cache);

}
Copy the code

Annotations to define

In order to keep consistent with time statistics, refresh and other features, the actions of the operation class are added to the file (append to file) based on the annotation attribute, rather than fixed in the code, for later expansion and adjustment.

/** * cache interceptor *@author binbin.hou
 * @since0.0.5 * /
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheInterceptor {

    /** * Specifies whether to append to file. The default value is false. * Including delete, add, expiration and other operations. *@returnWhether *@since0.0.10 * /
    boolean aof(a) default false;

}
Copy the code

We added the aOF attribute to the original @cacheInterceptor annotation, which specifies whether to enable aOF mode for an operation.

Method to specify the AOF schema

We specify this attribute on methods that will change the data:

Overdue operation

Similar to Spring’s transaction interceptor, we invoke expireAt using a proxy class.

The EXPIRE method does not need to add aOF intercepts.

/** * Set the expiration time *@param key         key
 * @paramTimeInMills Expires after milliseconds *@return this
 */
@Override
@CacheInterceptor
public ICache<K, V> expire(K key, long timeInMills) {
    long expireTime = System.currentTimeMillis() + timeInMills;
    // Use proxy invocation
    Cache<K,V> cachePoxy = (Cache<K, V>) CacheProxy.getProxy(this);
    return cachePoxy.expireAt(key, expireTime);
}

/** * Specifies expiration information *@param key key
 * @paramTimeInMills timestamp *@return this
 */
@Override
@CacheInterceptor(aof = true)
public ICache<K, V> expireAt(K key, long timeInMills) {
    this.expire.expire(key, timeInMills);
    return this;
}
Copy the code

Changes to the operating

@Override
@CacheInterceptor(aof = true)
public V put(K key, V value) {
    //1.1 Attempt to dislodge
    CacheEvictContext<K,V> context = new CacheEvictContext<>();
    context.key(key).size(sizeLimit).cache(this);
    boolean evictResult = evict.evict(context);
    if(evictResult) {
        // Execute the elimination listener
        ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code());
        for(ICacheRemoveListener<K,V> listener : this.removeListeners) { listener.listen(removeListenerContext); }}//2. Determine the information after removal
    if(isSizeLimit()) {
        throw new CacheRuntimeException("Current queue is full, data addition failed!");
    }
    //3. Perform add
    return map.put(key, value);
}

@Override
@CacheInterceptor(aof = true)
public V remove(Object key) {
    return map.remove(key);
}

@Override
@CacheInterceptor(aof = true)
public void putAll(Map<? extends K, ? extends V> m) {
    map.putAll(m);
}

@Override
@CacheInterceptor(refresh = true, aof = true)
public void clear(a) {
    map.clear();
}
Copy the code

AOF persistence interception implementation

Persist object definitions

/** * AOF persistence details *@author binbin.hou
 * @since0.0.10 * /
public class PersistAofEntry {

    /** * Parameter information *@since0.0.10 * /
    private Object[] params;

    /** * method name *@since0.0.10 * /
    private String methodName;

    //getter & setter &toString
}
Copy the code

All we need here is the method name and the parameter object.

Temporarily implement some simple can be.

Persistent interceptors

We define an interceptor that puts the operation information into the buffer list of CachePersistAof when the persistent class defined in the cache is CachePersistAof.

public class CacheInterceptorAof<K.V> implements ICacheInterceptor<K.V> {

    private static final Log log = LogFactory.getLog(CacheInterceptorAof.class);

    @Override
    public void before(ICacheInterceptorContext<K,V> context) {}@Override
    public void after(ICacheInterceptorContext<K,V> context) {
        // Persist classes
        ICache<K,V> cache = context.cache();
        ICachePersist<K,V> persist = cache.persist();

        if(persist instanceof CachePersistAof) {
            CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist;

            String methodName = context.method().getName();
            PersistAofEntry aofEntry = PersistAofEntry.newInstance();
            aofEntry.setMethodName(methodName);
            aofEntry.setParams(context.params());

            String json = JSON.toJSONString(aofEntry);

            // Direct persistence
            log.debug("AOF starts append file contents: {}", json);
            cachePersistAof.append(json);
            log.debug("AOF completes append file contents: {}", json); }}}Copy the code

Interceptor call

When the annotation attribute of AOF is true, the interceptor is called.

To avoid waste, this is only called if the persistent class is in AOF mode.

/ / 3. AOF additional
final ICachePersist cachePersist = cache.persist();
if(cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) {
    if(before) {
        persistInterceptors.before(interceptorContext);
    } else{ persistInterceptors.after(interceptorContext); }}Copy the code

AOF persistent implementation

The AOF schema here is just a different schema from the previous RDB persistent class, which is actually the same interface.

interface

Here we define the time of different persistence classes uniformly, so that RDB and AOF can be triggered at different time intervals.

public interface ICachePersist<K.V> {

    /** * Persistent cache information *@paramCache cache *@since0.0.7 * /
    void persist(final ICache<K, V> cache);

    /** * Delay time *@returnDelay *@since0.0.10 * /
    long delay(a);

    /** * Time interval *@returnInterval *@since0.0.10 * /
    long period(a);

    /** * time unit *@returnTime unit *@since0.0.10 * /
    TimeUnit timeUnit(a);
}
Copy the code

Persistent class implementation

Implement a list of buffers that can be added sequentially with each interceptor.

Persistence is also easy to implement. After appending to a file, you can simply clear the buffer list.

/** * Cache persistence -AOF persistence mode *@author binbin.hou
 * @since0.0.10 * /
public class CachePersistAof<K.V> extends CachePersistAdaptor<K.V> {

    private static final Log log = LogFactory.getLog(CachePersistAof.class);

    /** * Cache list *@since0.0.10 * /
    private final List<String> bufferList = new ArrayList<>();

    /** * Data persistence path *@since0.0.10 * /
    private final String dbPath;

    public CachePersistAof(String dbPath) {
        this.dbPath = dbPath;
    }

    /** * persistent * key length key+value * the first space, get the length of the key, and then intercept *@paramCache cache * /
    @Override
    public void persist(ICache<K, V> cache) {
        log.info("Start AOF persistence to file");
        1. Create a file
        if(! FileUtil.exists(dbPath)) { FileUtil.createFile(dbPath); }// 2. Persistent appending to files
        FileUtil.append(dbPath, bufferList);

        // 3. Clear the buffer list
        bufferList.clear();
        log.info("Complete AOF persistence to file");
    }

    @Override
    public long delay(a) {
        return 1;
    }

    @Override
    public long period(a) {
        return 1;
    }

    @Override
    public TimeUnit timeUnit(a) {
        return TimeUnit.SECONDS;
    }

    /** * Add the file contents to the buffer list *@paramJson Json information *@since0.0.10 * /
    public void append(final String json) {
        if(StringUtil.isNotEmpty(json)) { bufferList.add(json); }}}Copy the code

Persistent testing

The test code

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .persist(CachePersists.<String, String>aof("1.aof"))
        .build();
cache.put("1"."1");
cache.expire("1".10);
cache.remove("2");
TimeUnit.SECONDS.sleep(1);
Copy the code

The test log

Expire actually calls expireAt.

[the DEBUG] [the 2020-10-02 12:20:41. 979] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF began to add the file content: {" methodName ":" put ", "params" : [" 1 ", "1"]} [DEBUG] [the 2020-10-02 12:20:41. 980] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF complete additional content of the file: {" methodName ":" put ", "params" : [" 1 ", "1"]} [DEBUG] [the 2020-10-02 12:20:41. 982] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF began to add the file content: {"methodName":"expireAt","params":["1",1601612441990]} [DEBUG] [2020-10-02 12:20:41.982] [main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF complete additional content of the file: {"methodName":"expireAt","params":["1",1601612441990]} [DEBUG] [2020-10-02 12:20:41.984] [main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF began to add the file content: {" methodName ":" remove ", "params:"/" 2 "} [DEBUG] [the 2020-10-02 12:20:41. 984] [the main] [C.G.H.C.C.S.I.A.C acheInterceptorAof. After] - AOF complete additional content of the file: {" methodName ":" remove ", "params:"/" 2 "} [DEBUG] [the 2020-10-02 12:20:42. 088] [] - thread pool - 1-1 [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: Expire [INFO] [the 2020-10-02 12:20:42. 789] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.I nnerCachePersist. Run] - began to persistent cache information [INFO] [the 2020-10-02 12:20:42. 789] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.C achePersistAof. Persist] - start AOF persisted to the document [INFO] [in the 2020-10-02 s 12:20:42. 798] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.C achePersistAof. Persist] - complete AOF persisted to the document [INFO] [the 2020-10-02 12:20:42. 799] [the pool - 2 - thread - 1] [C.G.H.C.C.S.P.I nnerCachePersist. Run] - complete the persistent cache informationCopy the code

The file content

1. The contents of AOF files are as follows

{"methodName":"put","params":["1","1"]}
{"methodName":"expireAt","params":["1",1601612441990]}
{"methodName":"remove","params":["2"]}
Copy the code

Each operation is simply stored in a file.

AOF load implementation

loading

Similar to the loading mode of RDB, the loading mode of AOF is similar.

We need to restore the contents of the previous cache based on the contents of the file.

Implementation idea: traversal file content, reflection call the original method.

Code implementation

Parse the file

@Override
public void load(ICache<K, V> cache) {
    List<String> lines = FileUtil.readAllLines(dbPath);
    log.info("[load] start processing path: {}", dbPath);
    if(CollectionUtil.isEmpty(lines)) {
        log.info("[load] path: {} file content is empty, return directly", dbPath);
        return;
    }

    for(String line : lines) {
        if(StringUtil.isEmpty(line)) {
            continue;
        }
        / / execution
        // Simple types are ok, complex deserializations will fail
        PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class);
        final String methodName = entry.getMethodName();
        final Object[] objects = entry.getParams();
        final Method method = METHOD_MAP.get(methodName);
        // reflection callsReflectMethodUtil.invoke(cache, method, objects); }}Copy the code

Preloading of method maps

Method reflection is fixed, so to improve performance, let’s do some preprocessing.

/** * method cache ** is simple for now, and can be determined directly by methods. There is no need to introduce parameter types to add complexity. *@since0.0.10 * /
private static final Map<String, Method> METHOD_MAP = new HashMap<>();
static {
    Method[] methods = Cache.class.getMethods();
    for(Method method : methods){
        CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class);
        if(cacheInterceptor ! =null) {
            / / for the time being
            if(cacheInterceptor.aof()) { String methodName = method.getName(); METHOD_MAP.put(methodName, method); }}}}Copy the code

test

The file content

  • default.aof
{"methodName":"put","params":["1","1"]}
Copy the code

test

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .load(CacheLoads.<String, String>aof("default.aof"))
        .build();

Assert.assertEquals(1, cache.size());
System.out.println(cache.keySet());
Copy the code

Load the default.aof file directly into the cache.

summary

Redis file persistence is actually richer.

RDB and AOF modes can be used together.

The size of files in AOF mode can be very large. Redis periodically compresses commands to solve this problem.

You can think of aOF as a flow table of operations, and all we really care about is the final state, no matter how many steps we go through, all we care about is the final value.

The article mainly tells about the idea, the realization part because of the space limitation, did not post all.

Open source address: github.com/houbb/cache

If you found this article helpful, please feel free to like it

Your encouragement, is my biggest motivation ~