Redisson is a redis client that uses Netty. Netty is asynchronous, the upper layer is synchronous, to take the result, at the same time, redis protocol is not possible according to redisson’s requirements, in the request and response to carry the request ID, that, it is how to achieve synchronous to asynchronous, asynchronous results back, and how to put the result on the corresponding?

One of my predecessors raised this question, and I was also very curious about how Redisson did it. After reading it, I might have a new understanding of same-asynchronous. With this question, I went to see the source code of Redisson.

Senior blog: www.cnblogs.com/grey-wolf/p…

Normally we return in redis client:

redis> HSET test1 1
(integer) 2
Copy the code

Use the redisson code as follows:

RMap<Integer, Integer> map = client.getMap("test1"); RFuture<Integer> Future = map.getAsync(1); Future.oncomplete ((number, throwable) -> {if (number == null) {system.out.println (throwable); } System.out.println(number); }); // execute map.get(1);Copy the code

First look at asynchronous, look at the source also have a breakpoint, call stack as follows:

Read the train of thought

The call stack is a bit long, but we see CommandDecoder as a method that must come from receiving messages. The map.getAsync(1) we just called; This code must be sending a request to the Redis server, so this request is most likely the return of this message.

CommandDecoder:

@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// Get a QueueCommand from a channel map // Then someone just saved the QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); if (state() == null) { state(new State()); } / /... decode(ctx, in, data); / /... Protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel Channel, boolean skipConvertor, List<CommandData<? ,? >> commandsData) throws IOException { if (code == '$') { ByteBuf buf = readBytes(in); Object result = null; if (buf ! = null) { Decoder<Object> decoder = selectDecoder(data, parts); result = decoder.decode(buf, state()); } handleResult(data, parts, result, false); Private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor) { // ... completeResponse(data, result); / /... Protected void completeResponse(CommandData<Object, Object> data, Object result) {if (data! = null) {data.getPromise().trysuccess (result); }} DefaultPromise: private void notifyListeners0(DefaultFutureListeners) {GenericFutureListener<? >[] a = listeners.listeners(); int size = listeners.size(); for(int i = 0; i < size; ++i) { notifyListener0(this, a[i]); }}Copy the code

Results:

Processing result (calling back to our own method) :

QueueCommand value:

We can see that this command is the one we just issued to the Redis client.

Public class CommandData<T, R> implements QueueCommand {final RPromise<R> Promise; final RedisCommand<T> command; final Object[] params; final Codec codec; final MultiDecoder<Object> messageDecoder; }Copy the code

DefaultPromise’s main parameters:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8)); private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); private static final Object SUCCESS = new Object(); private static final Object UNCANCELLABLE = new Object(); private static final DefaultPromise.CauseHolder CANCELLATION_CAUSE_HOLDER = new DefaultPromise.CauseHolder(ThrowableUtil.unknownStackTrace(new CancellationException(), DefaultPromise.class, "cancel(...) ")); private static final StackTraceElement[] CANCELLATION_STACK; private volatile Object result; private final EventExecutor executor; private Object listeners; // Waiters/server list private short waiters; private boolean notifyingListeners; }Copy the code

First, let’s read the idea:

  1. First, when the message is received, we try to fetch CommandData from the channel
  2. If so, take CommandData with you to parse the message
  3. After the result is parsed, all of CommandData’s listeners are notified (calling the methods we defined) and the listeners are deleted when the notification is complete

Now you can figure out where to put CommandData into the channel and when to put the listener into it.

Write ideas

When did you put CommandData into a channel?

The CURRENT_COMMAND key has only one source:

CommandsQueue: Public class CommandsQueue extends ChannelDuplexHandler {static final AttributeKey<QueueCommand> CURRENT_COMMAND = AttributeKey.valueOf("promise"); Private final Queue<QueueCommandHolder> Queue = new ConcurrentLinkedQueue<>(); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof QueueCommand) { QueueCommand data = (QueueCommand) msg; QueueCommandHolder holder = queue.peek(); if (holder ! = null && holder.getCommand() == data) { super.write(ctx, msg, promise); Queue. Add (new QueueCommandHolder(data, promise)); QueueCommandHolder(data, promise); SendData (ctx.channel()); } } else { super.write(ctx, msg, promise); Private void sendData(Channel ch) {QueueCommandHolder command = queue.peek(); // If there is more than one, the special operation if (command! = null && command.trySend()) { QueueCommand data = command.getCommand(); List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations(); if (! pubSubOps.isEmpty()) { for (CommandData<Object, Object> cd : pubSubOps) { for (Object channel : cd.getParams()) { ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd); }} else {// For the first time, or write only a CMD // put the current CMD in the channel key as CURRENT_COMMAND ch.attr(CURRENT_COMMAND).set(data); } command.getChannelPromise().addListener(listener); ch.writeAndFlush(data, command.getChannelPromise()); }}}Copy the code

Note: The ChannelDuplexHandler is a two-way handler through which messages go in and out. So it puts the current CMD on the channel when it writes write().

When does our listener go in (RedissonPromise)?

Look again at dome code:

Future. onComplete((number, throwable) -> {if (number == null) {system.out.println (throwable); } System.out.println(number); });Copy the code
Override public void onComplete(BiConsumer<? super T, ? super Throwable> action) { promise.addListener(f -> { if (! f.isSuccess()) { action.accept(null, f.cause()); return; } action.accept((T) f.getNow(), null); }); }Copy the code

Let’s rearrange our ideas.

  1. CommandsQueue listens for write methods
  2. If it is CMD, put the current CMD command into a channel and into a CMD queue

Combine reading and writing ideas

  1. CommandsQueue listens for write methods
  2. If it is CMD, put the current CMD command into a channel and into a CMD queue
  3. When the message is received, we try to fetch CommandData from the channel
  4. If so, take CommandData with you to parse the message
  5. After the result is parsed, all of CommandData’s listeners are notified (calling the methods we defined) and the listeners are deleted when the notification is complete

The problem

What if we suddenly lose contact with Redis? (disconnection reconnection) Empties the current queue to prevent contamination of relinks

CommandsQueue. ChannelInactive () the client disconnected @ Override public void channelInactive ChannelHandlerContext (CTX) throws the Exception {while (true) {QueueCommandHolder command = queue.poll(); if (command == null) { break; } command.getChannelPromise().tryFailure( new WriteRedisConnectionException("Channel has been closed! Can't write command: " + LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel())); } super.channelInactive(ctx); }Copy the code

How does syncing work?

RMap<Integer, Integer> map = client.getMap("test1"); // execute map.get(1);Copy the code

It’s simple, just loop through the RFuture until it returns.

@Override public <V> V get(RFuture<V> future) { // ... future.await(); / /... } public Promise<V> await() throws InterruptedException {// Complete if (this.isdone ()) {return this; } else if (Thread.interrupted()) { throw new InterruptedException(this.toString()); } else { this.checkDeadLock(); Synchronized (this) {// do not return while(! this.isDone()) { this.incWaiters(); Try {// stop this.wait(); } finally { this.decWaiters(); } } return this; }}} // 32767 times the loop stops, Private void incWaiters() {if (this. Waiters == 32767) {throw new IllegalStateException(" Too many waiters: " + this); } else { ++this.waiters; }}Copy the code