C language to implement thread pool
Very streamlined Linux thread pool implementation (I) – using mutex and condition variables
Introduction: Thread pool is a form of multi-threading processing, which is mostly used in high-concurrency servers. It can make reasonable and effective use of thread resources on high-concurrency servers.
In Unix network programming, threads and processes are used to handle various branch sub-functions. Our usual operations are: receiving messages ==> Message classification ==> Thread creation ==> Passing messages to sub-threads ==> Thread separation ==> Executing tasks in sub-threads ==> Task termination exit; For most small LAN communication, the above method is sufficient to meet the needs; But when our communication scope expands to wide area network or large local area network communication, we will face a large number of messages frequently request server; In this case, creating and destroying threads has become a luxury, especially for embedded servers to ensure the rational use of memory resources; Therefore, thread pool technology came into being; Thread pooling allows a thread to be reused multiple times, and the message processing within the reused thread can be different, eliminating the overhead of creating and destroying a thread without having to open one thread for each request.
Structure explanation:
A thread pool is an abstract concept, internally defined byTask queue, a bunch of threads, manager threads;
We will implement a basic thread pool in the above diagram, which will be explained in sections. Thread pool structure 2. Thread array 3. Task Queue 4. Managing threads 5. Example of using the thread pool interface
Overall structure of thread pool
The logical structure of a thread pool is explained here; The structure threadpool_t contains threadpool state information, task queue information, and mutex in multithreaded operations. The task structure contains a function pointer that can place many different task functions, and a void* parameter passed to the task function. Note: when used, you need to put your message classification handler into the task’s (*function); It then places it on the task queue and notifies idle threads;
Thread pool status information: describes the basic information of the current thread pool, such as whether to enable, minimum number of threads, maximum number of threads, number of active threads, number of busy threads, number of threads to be destroyed, etc. … Task queue information: describes the basic information of the current task queue, such as the maximum number of tasks, the condition variable that the queue is not full, the condition variable that the queue is not empty, etc. … Multi-thread mutex: ensure that only one thread can fetch a task from the task queue and modify the task queue information and the thread pool information at the same time. Function pointer: During the packaging phase, the classified message handler is placed in (*function); Void * parameters: used to pass information needed by the message processing function;
Struct {void *(*function)(void *); void *arg; } threadpool_task_t; Struct threadpool_t{pthread_mutex_t lock; /* lock the entire structure */ pthread_mutex_t thread_counter; /* Lock for busy threads */ pthread_cond_t queue_not_full; Pthread_cond_t queue_NOT_EMPTY; /* pthread_t *threads; Pthread_t admin_tid; pthread_t admin_tid; /* Manager thread tid */ threadpool_task_t *task_queue; /* Task queue */ / thread pool information */ int min_thr_num; /* Minimum number of threads in the thread pool */ int max_thr_num; /* The maximum number of threads in the thread pool */ int live_thr_num; /* Number of threads alive in the thread pool */ int busy_thr_num; /* Busy thread, working thread */ int wait_exit_thr_num; /* Number of threads to destroy */ /* task queue information */ int queue_front; /* queue_rear; /* queue end */ int queue_size; /* Number of existing tasks */ int queue_max_size; /* Maximum number of tasks a queue can hold */ /* thread pool status */ int shutdown; /* true to close */};Copy the code
*/** threadpool_t * threadpool_create(int min_thr_num, int max_thr_num, Int queue_max_size) {/* Minimum number of threads maximum number of threads maximum number of tasks */ int I; threadpool_t *pool = NULL; If ((pool=(threadpool_t *)malloc(sizeof(threadpool_t)) == NULL) {printf("malloc threadpool false; \n"); break; } / / pool->min_thr_num = min_thr_num; pool->max_thr_num = max_thr_num; pool->busy_thr_num = 0; pool->live_thr_num = min_thr_num; pool->wait_exit_thr_num = 0; pool->queue_front = 0; pool->queue_rear = 0; pool->queue_size = 0; pool->queue_max_size = queue_max_size; pool->shutdown = false; Threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); if (pool->threads == NULL) { printf("malloc threads false; \n"); break; } memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num); /* Pool ->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size); if (pool->task_queue == NULL) { printf("malloc task queue false; \n"); break; } /* Initialize mutex and condition variables */ if (pthread_mutex_init(&(pool->lock), NULL)! = 0 || pthread_mutex_init(&(pool->thread_counter), NULL) ! =0 || pthread_cond_init(&(pool->queue_not_empty), NULL) ! =0 || pthread_cond_init(&(pool->queue_not_full), NULL) ! =0) { printf("init lock or cond false; \n"); break; } /* Start min_thr_num */ for (I =0; i<min_thr_num; */ pthread_create(&(pool->threads[I]), NULL, threadpool_thread, (void *)pool); printf("start thread 0x%x... \n", (unsigned int)pool->threads[i]); */ pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool); return pool; } while(0); /* Release pool space */ threadpool_free(pool); return NULL; }Copy the code
Thread array
The thread array is actually a section of storage opened when the thread pool is initializedA bunch of thread tiDsSpace, logically forming a pool of pre-created threads; This space contains twoThe working thread.Threads waiting for work(Idle thread),Threads waiting to be destroyed.Declared but not initialized thread space;
Void * threadpool_thread(void *threadpool) {threadpool_t = (threadpool_t *)threadpool; threadpool_task_t task; while (true) { pthread_mutex_lock(&(pool->lock)); /* While ((pool->queue_size == 0) && (! pool->shutdown)) { printf("thread 0x%x is waiting \n", (unsigned int)pthread_self()); pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); If (pool->wait_exit_thr_num > 0) {pool->wait_exit_thr_num--; /* Check whether the number of threads in the thread pool is greater than the minimum number of threads. No longer exists */ if (pool->live_thr_num > pool->min_thr_num) {printf(" Thread 0x%x is no longer part of ", (unsigned int)pthread_self()); pool->live_thr_num--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); Pthread_mutex_unlock (&(pool->lock)); pthread_mutex_unlock(&(pool->lock)); printf("thread 0x%x is exiting \n", (unsigned int)pthread_self()); pthread_exit(NULL); Task. function = pool->task_queue[pool->queue_front].function; Task. arg = pool->task_queue[pool->queue_front].arg; pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; Pool ->queue_size--; // Notifications can add new tasks pthread_cond_broadcast(&(pool->queue_not_full)); Pthread_mutex_unlock (&(pool->lock)); Printf ("thread 0x%x start working \n", (unsigned int)pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); // pool->busy_thr_num++; pthread_mutex_unlock(&(pool->thread_counter)); (*(task.function))(task.arg); Printf ("thread 0x%x end working \n", (unsigned int)pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num--; pthread_mutex_unlock(&(pool->thread_counter)); } pthread_exit(NULL); }Copy the code
Task queue
Task queues exist in a manner similar to thread arrays; Space is cleared at thread pool initialization based on the maximum number of tasks passed in; When the server front after the arrival of the request, categorize and package the message into a task, put the task into the task queue and notify the idle thread to fetch;The difference is that the task queue has a clear order, first in, first out;The threads in the thread array are competing for the mutex;
Int threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg) { pthread_mutex_lock(&(pool->lock)); While ((pool->queue_size == pool->queue_max_size) && (! pool->shutdown)) pthread_cond_wait(&(pool->queue_not_full), &(pool->lock)); /* If (pool->shutdown) {pthread_mutex_unlock(&(pool->lock)); return -1; If (pool->task_queue[pool->queue_rear].arg! = NULL) { free(pool->task_queue[pool->queue_rear].arg); pool->task_queue[pool->queue_rear].arg = NULL; / / pool->queue [pool->queue_rear]. Function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* Pool ->queue_size++; Pthread_cond_signal (&(pool->queue_not_empty)); pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0; }Copy the code
Fourth, the manager thread
As the manager of the thread pool, the main functions of this thread include: checking the survival status and working status of threads in the thread pool; Responsible for dynamically adding or removing threads according to the current request status of the server, to ensure that the number of threads in the thread pool is maintained at a reasonable and efficient balance; At the end of the day, it’s a single thread, checked periodically, adding and removing threads according to our balancing algorithm;
/* admin thread */ void * admin_thread(void *threadpool) {int I; threadpool_t *pool = (threadpool_t *)threadpool; while (! pool->shutdown) { printf("admin -----------------\n"); sleep(DEFAULT_TIME); Pthread_mutex_lock (&(pool->lock)); Int queue_size = pool->queue_size; /* Number of tasks */ int live_thr_num = pool->live_thr_num; Pthread_mutex_unlock (&(pool->lock)); Pthread_mutex_lock (&(pool->thread_counter)); int busy_thr_num = pool->busy_thr_num; /* Number of busy threads */ pthread_mutex_unlock(&(pool->thread_counter)); printf("admin busy live -%d--%d-\n", busy_thr_num, live_thr_num); /* The actual number of tasks to create a new thread is greater than the minimum number of tasks waiting, If (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num) {printf("admin ") add-----------\n"); pthread_mutex_lock(&(pool->lock)); int add=0; /* Add DEFAULT_THREAD_NUM threads */ for (I =0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM && pool->live_thr_num < pool->max_thr_num; i++) { if (pool->threads[i] == 0 || ! is_thread_alive(pool->threads[i])) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool); add++; pool->live_thr_num++; printf("new thread -----------------------\n"); } } pthread_mutex_unlock(&(pool->lock)); } /* The busy thread x2 is smaller than the live thread, If ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num) {// printf("admin busy ") --%d--%d----\n", busy_thr_num, live_thr_num); Pthread_mutex_lock (&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_NUM; pthread_mutex_unlock(&(pool->lock)); for (i=0; i<DEFAULT_THREAD_NUM; I++) {// notify the thread that is idle, suicide pthread_cond_signal(&(pool->queue_not_empty)); printf("admin cler --\n"); } } } return NULL; /* Whether the thread is alive */ int is_thread_alive(pthread_t tid) {int kill_rc = pthread_kill(tid, 0); If (kill_rc == ESRCH) // Thread does not exist {return false; } return true; }Copy the code
Five, the release of
Int threadpool_free(threadpool_t *pool) {if (pool == NULL) return -1; if (pool->task_queue) free(pool->task_queue); if (pool->threads) { free(pool->threads); pthread_mutex_lock(&(pool->lock)); Pthread_mutex_destroy (&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free(pool); pool = NULL; return 0; }Copy the code
Int threadpool_destroy(threadpool_t *pool) {int I; if (pool == NULL) { return -1; } pool->shutdown = true; Pthread_join (pool->admin_tid, NULL); For (I =0; i<pool->live_thr_num; i++) { pthread_cond_broadcast(&(pool->queue_not_empty)); } /* Wait for pthread_exit and wait for it to end */ for (I =0; i<pool->live_thr_num; i++) { pthread_join(pool->threads[i], NULL); } threadpool_free(pool); return 0; }Copy the code
Six, interfaces,
Threadpool_t * THP = threadpool_create(10, 100, 100); printf("threadpool init ... . \n"); */ threadpool_add_task(THP, do_work, (void *)p); / /... . /* destroy */ threadpool_destroy(THP);Copy the code
Thank you for reading this article and hope your questions are answered
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
The complete source code is as follows:
threadpool.h
#ifndef __THREADPOOL_H_ #define __THREADPOOL_H_ typedef struct threadpool_t threadpool_t; /* Create threadpool */ threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size); Int threadpool_free(threadpool_t *pool); Int threadpool_destroy(threadpool_t *pool); /* admin thread */ void *admin_thread(void *threadpool); //int is_thread_alive(pthread_t tid); /* worker thread */ void *threadpool_thread(void *threadpool); Int threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg); #endifCopy the code
threadpool.c
#include "threadpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <signal.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#define DEFAULT_TIME 1 /*默认时间10s*/
#define MIN_WAIT_TASK_NUM 10 /*当任务数超过了它,就该添加新线程了*/
#define DEFAULT_THREAD_NUM 10 /*每次创建或销毁的线程个数*/
#define true 1
#define false 0
/*任务*/
typedef struct {
void *(*function)(void *);
void *arg;
} threadpool_task_t;
/*线程池管理*/
struct threadpool_t{
pthread_mutex_t lock; /* 锁住整个结构体 */
pthread_mutex_t thread_counter; /* 用于使用忙线程数时的锁 */
pthread_cond_t queue_not_full; /* 条件变量,任务队列不为满 */
pthread_cond_t queue_not_empty; /* 任务队列不为空 */
pthread_t *threads; /* 存放线程的tid,实际上就是管理了线 数组 */
pthread_t admin_tid; /* 管理者线程tid */
threadpool_task_t *task_queue; /* 任务队列 */
/*线程池信息*/
int min_thr_num; /* 线程池中最小线程数 */
int max_thr_num; /* 线程池中最大线程数 */
int live_thr_num; /* 线程池中存活的线程数 */
int busy_thr_num; /* 忙线程,正在工作的线程 */
int wait_exit_thr_num; /* 需要销毁的线程数 */
/*任务队列信息*/
int queue_front; /* 队头 */
int queue_rear; /* 队尾 */
int queue_size;
/* 存在的任务数 */
int queue_max_size; /* 队列能容纳的最大任务数 */
/*状态*/
int shutdown; /* true为关闭 */
};
/* 函数原型 */
/*创建线程池*/
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
/*释放线程池*/
int threadpool_free(threadpool_t *pool);
/*销毁线程池*/
int threadpool_destroy(threadpool_t *pool);
/*管理线程*/
void *admin_thread(void *threadpool);
/*线程是否存在*/
int is_thread_alive(pthread_t tid);
/*工作线程*/
void *threadpool_thread(void *threadpool);
/*向线程池的任务队列中添加一个任务*/
int threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg);
/* */
/*创建线程池*/
threadpool_t *
threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL;
do
{
if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
{
printf("malloc threadpool false; \n");
break;
}
/*信息初始化*/
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num;
pool->wait_exit_thr_num = 0;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->queue_size = 0;
pool->queue_max_size = queue_max_size;
pool->shutdown = false;
/*根据最大线程数,给工作线程数组开空间,清0*/
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if (pool->threads == NULL)
{
printf("malloc threads false;\n");
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/*队列开空间*/
pool->task_queue =
(threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if (pool->task_queue == NULL)
{
printf("malloc task queue false;\n");
break;
}
/*初始化互斥锁和条件变量*/
if ( pthread_mutex_init(&(pool->lock), NULL) != 0 ||
pthread_mutex_init(&(pool->thread_counter), NULL) !=0 ||
pthread_cond_init(&(pool->queue_not_empty), NULL) !=0 ||
pthread_cond_init(&(pool->queue_not_full), NULL) !=0)
{
printf("init lock or cond false;\n");
break;
}
/*启动min_thr_num个工作线程*/
for (i=0; i<min_thr_num; i++)
{
/*pool指向当前线程池*/
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
printf("start thread 0x%x... \n", (unsigned int)pool->threads[i]);
}
/*管理者线程*/
pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);
return pool;
} while(0);
/*释放pool的空间*/
threadpool_free(pool);
return NULL;
}
/*释放线程池*/
int
threadpool_free(threadpool_t *pool)
{
if (pool == NULL)
{
return -1;
}
if (pool->task_queue)
{
free(pool->task_queue);
}
if (pool->threads)
{
free(pool->threads);
pthread_mutex_lock(&(pool->lock)); /*先锁住再销毁*/
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}
/*销毁线程池*/
int
threadpool_destroy(threadpool_t *pool)
{
int i;
if (pool == NULL)
{
return -1;
}
pool->shutdown = true;
/*销毁管理者线程*/
pthread_join(pool->admin_tid, NULL);
//通知所有线程去自杀(在自己领任务的过程中)
for (i=0; i<pool->live_thr_num; i++)
{
pthread_cond_broadcast(&(pool->queue_not_empty));
}
/*等待线程结束 先是pthread_exit 然后等待其结束*/
for (i=0; i<pool->live_thr_num; i++)
{
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
return 0;
}
/*管理线程*/
void *
admin_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown)
{
printf("admin -----------------\n");
sleep(DEFAULT_TIME); /*隔一段时间再管理*/
pthread_mutex_lock(&(pool->lock)); /*加锁*/
int queue_size = pool->queue_size; /*任务数*/
int live_thr_num = pool->live_thr_num; /*存活的线程数*/
pthread_mutex_unlock(&(pool->lock)); /*解锁*/
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; /*忙线程数*/
pthread_mutex_unlock(&(pool->thread_counter));
printf("admin busy live -%d--%d-\n", busy_thr_num, live_thr_num);
/*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
{
printf("admin add-----------\n");
pthread_mutex_lock(&(pool->lock));
int add=0;
/*一次增加 DEFAULT_THREAD_NUM 个线程*/
for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM
&& pool->live_thr_num < pool->max_thr_num; i++)
{
if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
printf("new thread -----------------------\n");
}
}
pthread_mutex_unlock(&(pool->lock));
}
/*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/
if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num)
{
// printf("admin busy --%d--%d----\n", busy_thr_num, live_thr_num);
/*一次销毁DEFAULT_THREAD_NUM个线程*/
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
pthread_mutex_unlock(&(pool->lock));
for (i=0; i<DEFAULT_THREAD_NUM; i++)
{
//通知正在处于空闲的线程,自杀
pthread_cond_signal(&(pool->queue_not_empty));
printf("admin cler --\n");
}
}
}
return NULL;
}
/*线程是否存活*/
int
is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); //发送0号信号,测试是否存活
if (kill_rc == ESRCH) //线程不存在
{
return false;
}
return true;
}
/*工作线程*/
void *
threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
while (true)
{
pthread_mutex_lock(&(pool->lock));
//无任务则阻塞在 任务队列不为空 上,有任务则跳出
while ((pool->queue_size == 0) && (!pool->shutdown))
{
printf("thread 0x%x is waiting \n", (unsigned int)pthread_self());
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
//判断是否需要清除线程,自杀功能
if (pool->wait_exit_thr_num > 0)
{
pool->wait_exit_thr_num--;
//判断线程池中的线程数是否大于最小线程数,是则结束当前线程
if (pool->live_thr_num > pool->min_thr_num)
{
printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);//结束线程
}
}
}
//线程池开关状态
if (pool->shutdown) //关闭线程池
{
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
pthread_exit(NULL); //线程自己结束自己
}
//否则该线程可以拿出任务
task.function = pool->task_queue[pool->queue_front].function; //出队操作
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //环型结构
pool->queue_size--;
//通知可以添加新任务
pthread_cond_broadcast(&(pool->queue_not_full));
//释放线程锁
pthread_mutex_unlock(&(pool->lock));
//执行刚才取出的任务
printf("thread 0x%x start working \n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter)); //锁住忙线程变量
pool->busy_thr_num++;
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); //执行任务
//任务结束处理
printf("thread 0x%x end working \n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--;
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
/*向线程池的任务队列中添加一个任务*/
int
threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
pthread_mutex_lock(&(pool->lock));
/*如果队列满了,调用wait阻塞*/
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
/*如果线程池处于关闭状态*/
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
return -1;
}
/*清空工作线程的回调函数的参数arg*/
if (pool->task_queue[pool->queue_rear].arg != NULL)
{
free(pool->task_queue[pool->queue_rear].arg);
pool->task_queue[pool->queue_rear].arg = NULL;
}
/*添加任务到任务队列*/
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 逻辑环 */
pool->queue_size++;
/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
Copy the code
main.c
Threadpool_t * THP = threadpool_create(10, 100, 100); printf("threadpool init ... . \n"); */ threadpool_add_task(THP, do_work, (void *)p); / /... . /* destroy */ threadpool_destroy(THP);Copy the code