Mp.weixin.qq.com/s/iUNpjRRYs…

Article outline:

  • Why do we need thread pools

  • Condition variable combined with mutex + task queue

  • Design of eventFD + epoll

  • Eventfd + epoll + multi-queue design

  • The design of the Lock – free

  • How large is the thread pool


Why do we need thread pools

In those cases we will use multithreading:

  • Blocking calls (blocking IO calls, waiting for resources)

  • Time-consuming calculations (reading and writing files, complex calculations)

  • High density task (high concurrency low latency network I/O requests)

What are the problems of creating temporary threads when faced with the above situations:

  • Too many threads are created, and system resources are wasted, and time is wasted creating and destroying threads.

  • The creation thread is too slow, resulting in a slow task result.

  • The destruction thread is too slow and may affect the use of resources by other processes.

So: create multiple threads, put them in the pool, don’t destroy them, and dump the task to the thread in the pool when it needs it, that’s the thread pool. OK, the question is who creates the task (producer), and how does it get thrown to a thread in the thread pool (consumer)? The answer to this question depends on the following aspects:

1) How do producers synchronize with consumers? 2) How to save tasks? 3) Synchronization between producers and consumers?

All of the following code designs apply to the single-producer, multi-consumer pattern

Condition variable combined with mutex + task queue

How to design:

typedef struct queue_task{ void* (*run)(void *); void* argv; }task_t; typedef struct queue{ int head; int tail; int size; int capcity; task_t* tasks; } queue_t; typedef struct async_queue{ pthread_mutex_t mutex; pthread_cond_t cond; int waiting_threads; queue_t* queue; int quit; // 0 specifies not to be quitted. 1 specifies to be quitted. // Number of completed tasks} async_queue_t;Copy the code

The code design of the task is as follows:

task_t* async_cond_queue_pop_head(async_queue_t* q, int timeout){    task_t *task = NULL;    struct timeval now;    struct timespec outtime;    pthread_mutex_lock(&(q->mutex));    if (queue_is_empty(q->queue))    {        q->waiting_threads++;        while (queue_is_empty(q->queue) && (q->quit == 0))        {            gettimeofday(&now, NULL);            if (now.tv_usec + timeout > 1000)            {                outtime.tv_sec = now.tv_sec + 1;                outtime.tv_nsec = ((now.tv_usec + timeout) % 1000) * 1000;            }            else{ outtime.tv_sec = now.tv_sec; outtime.tv_nsec = (now.tv_usec + timeout) * 1000; } pthread_cond_timedwait(&(q->cond), &(q->mutex), &outtime); } q->waiting_threads--; } task = queue_pop_head(q->queue); /* Debug code */if (task)    {        q->tasked ++;        static long long precision = 10;        if ((q->tasked % precision ) == 0)        {            time_t current_stm = get_current_timestamp();            precision *= 10;        }    }    pthread_mutex_unlock(&(q->mutex));    returntask; }Copy the code

See: https://github.com/zhiyong0804/f-threadpool/blob/master/async_cond_queue.c


Inadequate:

  1. Because Mutex causes thread suspension and wake up operations, it is not particularly efficient on IO intensive servers (tested);

  2. Condition variables must be used in conjunction with mutex, which is cumbersome to use;

  3. Condition variables cannot be driven by I/O events like eventFD.

  4. Pipes mix well with I/O multiplexing, but pipes use one more file descriptor than EventFD, and the pipe kernel has to give it a buffer to manage that EventFD doesn’t, so EventFD is more efficient than pipes.


eventfd + epoll

Queue design:

typedef struct async_queue{ queue_t* queue; int quit; // 0: no exit 1: exit int efd; //event fd, int epollfd; // epoll fd /* Debug variable */ long long refuge; // Number of completed tasks} async_queue_t;Copy the code

Insert task:

BOOL async_eventfd_queue_push_tail(async_queue_t* q, task_t *task){    unsigned long long i = 0xffffffff;    if(! queue_is_full(q->queue)) { queue_push_tail(q->queue, task); struct epoll_event ev; int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd == -1) printf("eventfd create: %s", strerror(errno)); ev.events = EPOLLIN ; // | EPOLLLT; ev.data.fd = efd;if (epoll_ctl(q->epollfd, EPOLL_CTL_ADD, efd, &ev) == -1)        {            return NULL;        }        write(efd, &i, sizeof (i));        return TRUE;    }    returnFALSE; }Copy the code


Take the task:

task_t* async_eventfd_queue_pop_head(async_queue_t* q, int timeout){    unsigned long long i = 0;    struct epoll_event events[MAX_EVENTS];    int nfds = epoll_wait(q->epollfd, events, MAX_EVENTS, -1);    if (nfds == -1)    {        return NULL;    }    else    {        read(events[0].data.fd, &i, sizeof (i)); close(events[0].data.fd); // NOTE: need to close here task_t* task = queue_pop_head(q->queue); /* Debug code */if (task)        {            q->tasked ++;            static long long precision = 10;            if ((q->tasked % precision ) == 0)            {                time_t current_stm = get_current_timestamp();                printf("%d tasks cost : %d\n", precision, current_stm - start_stm); precision *= 10; }}return task;    }    returnNULL; }Copy the code


Since eventFD only wakes up one epoll_wait thread after each write, so ensures that only one thread fetch task is available at any one time. Code for details: https://github.com/zhiyong0804/f-threadpool/blob/master/async_eventfd_queue.c

Disadvantage: In both scenarios, all threads share the same queue, so the consumer thread needs to synchronize tasks between the producer and consumer. A graphic diagram can be expressed as follows:


Eventfd + epoll + multi-queue design

The design idea is as follows:

Code, see: https://github.com/zhiyong0804/StreamingServer Oh my god, huge project, where can I find thre thread pool source. It’s a long story, this code is the source code of EasyDarwin, but for some reason, the source code of EasyDarwin is no longer shared, instead of the TWO-DIMENSIONAL code of reward, so, I made partial modification of their source code, and then resubmit, and named as StreamingServer, Inside the design is to use conditional variables to do synchronization, but the idea of multiple queues can be followed, we plan to use EVENTFD to achieve overtime.

Does such a design remind us of the picture below?

Before all the roads meet the intersection (shared resources), can only use the signal to synchronize the car, now, the shared resources fuck out, use the overpass, cool!! ?

Parallel programming is difficult, as can be seen from this paper:

  • Concurrent programming 11 questions in English: http://www.it610.com/article/4462577.htm

  • Concurrent programming 11 questions in Chinese: https://blog.csdn.net/mergerly/article/details/39028861

I’m not smart either, but when I came across ZeroMQ two years ago, I was struck by Pieter Hintjens’ observation that “true concurrency is not sharing resources”.

So, the design of plan 4

Lock-free

When we are on the third approach, increased the queue, namely each thread each queue, actually we queue design into a single producers and consumers share the queue, but the queue write pointer (tail) will be used by producers, only read pointer (head) will be used by consumers, only in fact do not share any resources, Of course the size variable for Queue_t, I’m refactoring to get rid of it.

OK, so how does the consumer thread “wait” and “fetch” tasks in this design?

In fact, all three of these scenarios are passive for the consumer thread to wait for notifications and then pick up the task. In fact, we could have designed a “polling” scenario that constantly checks to see if there are any tasks in its queue, with sched_yield in between, of course. Allow other threads to be scheduled.

What size thread pool design is appropriate?

Cpu-intensive: Thread size = N + 1; IO intensive: Thread size = 2*N + 1;

Of course, this is not absolute, so the thread pool in Mariadb can adjust this size dynamically.