Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

The author has written a number of articles about spring-data-redis and lettuce:

  • This new monitoring method of Redis connection pool does not poke ~ I add a little seasoning
  • Spring-data-redis connection is leaking, I’m fucking stupid
  • Spring-data-redis dynamically switches data sources
  • Spring-data-redis millions of QPS are under too much pressure to connect and fail, I’m fucking stupid

Spring data-redis and lettuce can be used together. Pipeline does not take effect after the package is captured. How to configure it to take effect?

First of all, in the above article, we analyzed the basic principle of spring-data-redis + Lettuce. In this environment, the connection internals used by RedisTemplate include:

  • AsyncSharedConn: can be null. If connection sharing is enabled, it is not null and is enabled by default. LettuceConnection Redis connections shared by all LettuceConnection are actually the same connection for each LettuceConnection; Used to execute simple commands because Netty clients share the same connection quickly with Redis’s single-thread nature. If connection sharing is not enabled, this field is empty and asyncDedicatedConn is used to execute the command.
  • AsyncDedicatedConn: Private connections that must be used to execute Redis commands if you need to maintain sessions, perform transactions, and Pipeline commands, or fix connections.

Execute (RedisCallback);

For executePipelined(RedisCallback), if used correctly, asyncDedicatedConn private connections are used. So how do we use it correctly?

RedisTemplate redisTemplate redisTemplate redisTemplate redisTemplate redisTemplate redisTemplate

Pipeline effect:

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { connection.get("test".getBytes()); connection.get("test2".getBytes()); return null; }});Copy the code

Pipeline does not take effect:

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { redisTemplate.opsForValue().get("test"); redisTemplate.opsForValue().get("test2"); return null; }});Copy the code

This allows us to ensure that the API layer uses pipeline correctly, but the underlying layer still does not execute pipeline by default.

Redis Pipeline analogy to AutoFlushCommands in Lettuce

Redis Pipeline is a batch operation in Redis. It can assemble a group of Redis commands and send them to Redis in a single transmission and return result sets, greatly reducing the RTT time required for the single transmission of commands (including the Redis client, Redis server switches the time when the system call sends and receives data, as well as the network transmission time).

If the original command had been sent like this:

Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4
Copy the code

So using PIPELINE, the command is sent like this

Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4
Copy the code

As you can see, the way it works is that the client concatenates all the commands together and then caches them locally, and then sends them to the server, and the server responds to all the commands.

The oracle connection has an AutoFlushCommands configuration, which refers to the commands to be executed on the connection if sent to the server. The default is False, that is, when a command is received, it is sent to the server. If set to false, Pipeline will cache all commands and send the cached commands to the server when it calls flushCommands manually.

Configure spring-data-redis + Lettuce using Pipeline

Spring-data-redis is compatible with Pipeline configuration for Lettuce as of 2.3.0, see:

  • DATAREDIS-1011 – Allow configuration of Lettuce pipelining flush behavior
  • Github.com/spring-proj…

We can configure it like this:

@Bean public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() { return new BeanPostProcessor() { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// After the LettuceConnectionFactory Bean is initialized, Set PipeliningFlushPolicy to flushOnClose if (Bean Instanceof LettuceConnectionFactory) {LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean; lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose()); } return bean; }}; }Copy the code

PipeliningFlushPolicy (PipeliningFlushPolicy) ¶

Public interface PipeliningFlushPolicy {// Each command is deployed at Redis Server static PipeliningFlushPolicy flushEachCommand().  { return FlushEachCommand.INSTANCE; } // When the connection is closed, send the command to Redis static PipeliningFlushPolicy flushOnClose() {return flushonclose.instance; } // Manually set the number of commands to be sent to Redis, but again, Redis static PipeliningFlushPolicy Buffered (int bufferSize) {return () -> new flushing (bufferSize);  }}Copy the code

All three classes also implement the PipeliningFlushState interface:

Public interface PipeliningFlushState {// For executePipelined, connection.openPipeline() is called; Void onOpen(StatefulConnection<? ,? > connection); // This method is called for each command in executePipelined (StatefulConnection<? ,? > connection); // Connection.pipelined () is called at the end of executePipelined, which calls void onClose(StatefulConnection<? ,? > connection); }Copy the code

By default, every command is sent directly to Redis Server.

private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState { INSTANCE; @Override public PipeliningFlushState newPipeline() { return INSTANCE; } @Override public void onOpen(StatefulConnection<? ,? > connection) {} @Override public void onCommand(StatefulConnection<? ,? > connection) {} @Override public void onClose(StatefulConnection<? ,? > connection) {} }Copy the code

For flushOnClose:

private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState { INSTANCE; @Override public PipeliningFlushState newPipeline() { return INSTANCE; } @Override public void onOpen(StatefulConnection<? ,? > connection) {/ / the first configuration AutoFlushCommands to false, such command won't immediately sent to Redis connection. SetAutoFlushCommands (false); } @Override public void onCommand(StatefulConnection<? ,? } @override public void onClose(StatefulConnection<? ,? Connection.flushcommands (); // Send all connection.flushCommands() on pipeline shutdown; / / restore the default configuration, this connection if returned to the connection pool will not affect the subsequent use connection. SetAutoFlushCommands (true); }}Copy the code

For buffered:

private static class BufferedFlushing implements PipeliningFlushState { private final AtomicLong commands = new AtomicLong(); private final int flushAfter; public BufferedFlushing(int flushAfter) { this.flushAfter = flushAfter; } @Override public void onOpen(StatefulConnection<? ,? > connection) {/ / the first configuration AutoFlushCommands to false, such command won't immediately sent to Redis connection. SetAutoFlushCommands (false); } @Override public void onCommand(StatefulConnection<? ,? > connection) {// Redis if (commands.incrementandget () % flushAfter == 0) {connection.flushcommands (); } } @Override public void onClose(StatefulConnection<? ,? Connection.flushcommands (); // Send all connection.flushCommands() on pipeline shutdown; / / restore the default configuration, this connection if returned to the connection pool will not affect the subsequent use connection. SetAutoFlushCommands (true); }}Copy the code

Wechat search “my programming meow” public account, a daily brush, easy to improve skills, won a variety of offers