Redis 6 introduces multithreaded IO, so let’s compare it to Netty’s multithreaded model

Analysis:

  • Initialize the thread?
  • How do I assign clients to Threads?
  • How and on what thread are read and write events handled?
  • How is command logic handled in what thread?

Netty’s multithreaded model

The user code

ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true) .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue") .handler(new ServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public  void initChannel(SocketChannel ch) { ch.pipeline().addLast(new AuthHandler()); / /.. }}); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync();Copy the code
Initialize the thread (serverBootsrap.bind ())

Netty initializes the thread, creates a boss thread pool, a work thread pool, and gives new a channel to handle the connection to the registered thread. It also adds a ServerBootstrapAcceptor channel to this channel.

  • Operation thread: main thread execution
  • Execution time: Initializes the thread
  • Serverbootsrap.bind ()

How do I assign clients to Threads?

  • Operation thread: main thread execution
  • Execution time: A new connection is added

The establishment of a new connection can be divided into three steps: 1. A new connection is detected; 2. Registers a read event for a new connection

Nioeventloop.run () of the BOSS thread group constantly checks all pipes and reads them when their state is readable or connected.

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! = 0 || readyOps == 0) { unsafe.read(); }Copy the code

And then it goes down the chain of responsibility of the Channel

  • unsafe.read()
  • —->
  • pipeline.fireChannelRead(byteBuf);
  • —->
  • ServerBootstrapAcceptor.channelRead()
  • —->
  • MultithreadEventLoopGroup. Register (child) distribution of a thread to the channel, a thread may have more than one channel
How to match threads
DefaultEventExecutorChooserFactory. Java more than by the number of threads to allocate @ Override public EventExecutor next () {return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } abstractchannel.java.override public final void register(EventLoop EventLoop, final ChannelPromise promise) { ! This thread will be linked in the channel above the AbstractChannel. Enclosing eventLoop = eventLoop; // Listen for the NIO underlying register0(promise); }}Copy the code
How and on what thread are read and write events handled?

ChannelInboundHandler.channelRead

How is command logic handled in what thread?

ChannelInboundHandler.channelRead

Conclusion: Netty starts by registering a Boss thread pool (usually one) to listen for the connected channel (NioEventLoop,run). Through the chain of responsibility find ServerBootstrapAcceptor. ChannelRead () assigned to the channel a thread (NioEventLoop), the thread (NioEventLoop) through the run () to continue to read the inside of the channel, Processing commands.




Redis multithreaded model

Initialize the thread (initThreadedIO() function)

  • Operation thread: main thread execution
  • Execution time: Initializes the thread

If the user has not enabled multi-threaded IO (io_thread_num ==1), it is handled as a single thread. If the number of threads IO_THREADS_MAX_NUM exceeds the upper limit, the system exits unexpectedly.

Create io_threads_num threads (listCreate) and process the threads that divide the main thread (id==0) :

  1. The wait task of the initialization thread is 0
  2. Acquires the lock so that the thread cannot operate
  3. Map thread tid to thread ID in Redis
/* Initialize the data structures needed for threaded I/O. */ void initThreadedIO(void) { io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ / If (server.io_threads_num == 1) return; If (server.io_threads_num > IO_THREADS_MAX_NUM) {serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } /* Spawn and initialize the I/O threads. i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Things we do only for the additional threads. */ Pthread_mutex_init (&io_threadS_mutex [I],NULL); Io_threads_pending [I] = 0; Pthread_mutex_lock (&io_threads_mutex[I]); if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) ! = 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[I] = tid; }}Copy the code

Read event coming (readQueryFromClient)

  • Operation thread: main thread execution
  • Mechanism timing: Read event arrival

To check whether the Thread IO condition is met, run the postponeClientRead command. After the postponeClientRead command is executed, the Client is placed in the queue waiting for reading, and the Client state is set to waiting for reading.

Int postponeClientRead(client *c) {if (io_threads_active && // whether the thread is spining for I/OS Server.io_threads_do_reads && (c - > flags & (CLIENT_MASTER | CLIENT_SLAVE | CLIENT_PENDING_READ))) {/ / client can't be a master-slave, And not in a state of waiting to be read / / set the Client to wait for reading the status Flag c - > flags | = CLIENT_PENDING_READ; // Add client to the waiting list listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; }}Copy the code

At this point, the server maintains a clients_pending_read client list that contains all pending read events.

How to allocate the client to the thread (thread) (handleClientsWithPendingReadsUsingThreads)

  • Operation thread: main thread execution
  • Execution time: After the processing event is executed

First, Redis checks for pending read Client listLength(server.clients_pending_read)

If the length is not 0, a while loop is carried out to allocate each waiting client to the thread. When the waiting length exceeds the thread, each thread may allocate more than one client.

int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); Int target_id = item_ID % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } and change the number of threads to complete (initialized to 0) : // all threads for (int j = 1; j < server.io_threads_num; Int count = listLength(io_threads_list[j]); int count = listLength(io_threads_list[j]); Io_threads_pending [j] = count; } Wait until there are no remaining tasks: while(1) {unsigned long pending = 0; For (int j = 1; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } Client_pending_read empties client_pending_read: the main thread processes the command listRewind(server.clients_pending_read,&li); client_pending_read (server.clients_pending_read,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; if (c->flags & CLIENT_PENDING_COMMAND) { c->flags &= ~ CLIENT_PENDING_COMMAND; processCommandAndResetClient(c); } processInputBufferAndReplicate(c); } listEmpty(server.clients_pending_read);Copy the code

How to handle read Requests (IOThreadMain)

  • Operation thread: child thread
  • Execution timing: While executes when the child thread starts

In the above process, after the task is distributed, each thread processes the contents of the Client’s read buffer according to the normal process, which is not much different from the original single thread.

Redis assigns each client an input buffer that temporarily stores commands sent by the client, while Redis pulls commands from the input buffer and executes them. The input buffer provides a buffer for the client to send commands to Redis.

In the Thread IO model of Redis, all threads can only perform or write/read operations at a time, controlled by io_threads_op. At the same time, the client in charge of each thread executes once:

Void *IOThreadMain(void * myID) {// Void *IOThreadMain(void * myID); Long ID = (unsigned long) myID; While (1) {/* * The wait operation here is special and does not use a simple sleep, * to avoid poor performance due to improper setting of sleep time, For (int j = 0; int j = 0; j < 1000000; j++) { if (io_threads_pending[id] ! = 0) break; } // Assign tasks based on the thread ID and the list of tasks to be assigned. listNode *ln; listRewind(io_threads_list[id],&li); While ((ln = listNext(&li))) {client *c = listNodeValue(ln); If (io_threads_op == IO_THREADS_OP_WRITE) {writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) {readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0; if (tio_debug) printf("[%ld] Done\n", id); }Copy the code

ReadQeuryFromClient ()->processInputBuffer(c)->processCommand() distributes and processes commands.

Here the readQueueFromClient only serves as the input buffer for writing clients:

Else if (c->flags & CLIENT_MASTER) {c->pending_querybuf = sdSCATlen (c->pending_querybuf, c->querybuf+qblen,nread); } void processInputBuffer(client *c) {while(c->qb_pos < sdslen(c->querybuf)) {void processInputBuffer(client *c) { Flags are set to CLIENT_PENDING_COMMAND / / and then let the main thread to perform the if (c - > flags & CLIENT_PENDING_READ) {c - > flags | = CLIENT_PENDING_COMMAND; break; }}}Copy the code

Each thread executes readQueryFromClient, putting the corresponding request into a queue, single-threaded execution (reading from the input cache), and the thread writes the result to the client’s buff.

In each round of processing, it is necessary to open the locks of each thread and set the relevant flag bits:

void startThreadedIO(void) { if (tio_debug) { printf("S"); fflush(stdout); } if (tio_debug) printf("--- STARTING THREADED IO ---\n"); serverAssert(io_threads_active == 0); for (int j = 1; j < server.io_threads_num; Pthread_mutex_unlock (&io_threads_mutex[j]); Io_threads_active = 1; io_threads_active = 1; }Copy the code

At the end of the thread, first check to see if there is any IO waiting to be read, if not, the thread said, flag closed:

Void stopThreadedIO (void) {/ / need to stop when there may be waiting to read the Client before stop processing handleClientsWithPendingReadsUsingThreads (); if (tio_debug) { printf("E"); fflush(stdout); } if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n", (int) listLength(server.clients_pending_read), (int) listLength(server.clients_pending_write)); serverAssert(io_threads_active == 1); for (int j = 1; j < server.io_threads_num; Pthread_mutex_lock (&io_threads_mutex[j]); // Set the I/O status to disabled io_threads_active = 0; }Copy the code

Summary: Threaded IO changes the server reading Client’s input cache and writing execution results to the output buffer to a Threaded model, while keeping all threads in read/write state at the same time. But the execution of the command is done in a single thread (queue). Because Redis wants to keep results firm to avoid locking and contention issues, and because the read/write cache takes up a large proportion of the command execution declaration cycle, handling this part of the IO model provides a significant performance boost.




Netty and Redis6 differences:

How do I assign clients to Threads?

Netty: When a Boss listens for a connection event, netty assigns a thread to a channel. This thread is responsible for reading and writing events for the channel, either parsing or executing commands

Redis6: Each time a read event is received, the Client places it in a queue waiting to be read. After the processing event is executed, the main thread uniformly allocates clients to threads in the thread pool. The thread puts all buffers that the client reads into the client cache. The main thread waits for all I/O threads to complete, and then executes the client cache into a command

How and on what thread are read and write events handled?

Netty: the logic is executed on the child thread after reading and writing data. Redis6: the logic is executed on the child thread and stored in the client buffer after reading and writing data

How is command logic handled in what thread?

Netty: executes the logic directly on the child thread. Redis6: executes the logic directly on the main thread. The main thread traverses the queue and waits for the read buffer to compile the command before executing it

Why did Redis choose to use this model?

Article References:

Redis multithreading article source: www.web-lovers.com/redis-sourc… Blog.csdn.net/gupaoxueyua… www.cnblogs.com/madashu/p/1… antirez.com/news/126

Redis event mechanism: www.web-lovers.com/redis-sourc…

Redis official github: github.com/antirez/red…