Blog.csdn.net/baudgg1992/…

The leader/follower pattern, specifically, designs a thread pool mechanism so that only one thread at a time (the leader) waits for the queue to be empty and the other threads (the followers) queue up to be the leader. When a task is fetched from the queue, the current leader first promotes a follower thread to the new leader thread and then takes on the role of processing thread. While the current leader thread is waiting for the element in the queue to be enqueued, there may be multiple processing threads working on the task simultaneously. After processing the task, the processing thread again panyu follower thread role, waiting to become the leader thread again.

 

There are several ways to promote a follower thread to a leader thread and determine which thread is currently the leader thread:

LIFO order: In many applications, it doesn’t matter which follower thread is promoted to lead thread. In this case, the follower thread can be promoted to the leader thread in a last-in, first-out fashion. In this way, CPU cache affinity is maximized. If the thread of the most recent event is rerunked with nearly the same code and data, it will improve system performance due to cache affinity. However, to implement the LIFO promotion protocol, you need an additional data structure (such as implementing a stack of waiting threads to wake up (or otherwise) the most recently promoted thread as a leader thread) rather than just using native operating system synchronization objects (such as semaphores).

Priority order: In some applications, threads may have different priorities, in which case follower threads may have to be promoted based on priority. You can use a priority queue where, when a thread is promoted, the highest priority thread ID is taken out and the thread is signaled (or otherwise) to wake up and be promoted to the lead thread.

Implement a defined order: The most common way to promote follower threads is by using an operating system synchronizer (such as a semaphore or condition variable), which usually dispatchthe waiting threads in an implementation-defined order. The advantage of this protocol is that it corresponds directly to the native operating system synchronizer.

 

This article will implement the third way of thread pooling in c++.

Start with a simple implementation of a single-entry, single-out lockless queue, which can be converted to a double-lock queue if used in multiple threads. The thread pool used in this article is a double-lock queue. The slightly more complex implementation of a multi-threaded lockless Disruptor queue will be implemented later.

 

 

 
Copy the code
  1. #pragma once
  2. #include <stdint.h>
  3. #include <vector>
  4.  
  5. using std::vector;
  6. template <class T>class SimpleQueue
  7. {
  8. public:
  9. SimpleQueue(uint32_t nSize=4096)
  10. {
  11. SetMaxSize(nSize);
  12. m_nInPos=0;
  13. m_nOutPos=0;
  14.  
  15. }
  16. ~SimpleQueue()
  17. {
  18. }
  19. bool Put(T &in)
  20. {
  21. if(m_nInPos-m_nOutPos>=m_nMaxSize)
  22. {
  23. return false;
  24. }
  25. uint32_t nPos=m_nInPos&m_nMaxSizeLess;
  26. m_vDataBuffer[nPos]=in;
  27. ++m_nInPos;
  28. return true;
  29. }
  30. bool Fetch(T& out)
  31. {
  32. if(m_nInPos-m_nOutPos==0)
  33. {
  34. return false;
  35. }
  36. uint32_t nPos=m_nOutPos&m_nMaxSizeLess;
  37. out=m_vDataBuffer[nPos];
  38. ++m_nOutPos;
  39. return true;
  40. }
  41. void SetMaxSize(uint32_t nMaxSize)
  42. {
  43. m_nMaxSize=1;
  44. while(nMaxSize>>=1)
  45. {
  46. m_nMaxSize<<=1;
  47. }
  48. m_nMaxSizeLess=m_nMaxSize-1;
  49. m_vDataBuffer.resize(m_nMaxSize);
  50. }
  51. uint32_t MaxSize()
  52. {
  53. return m_nMaxSize;
  54. }
  55. uint32_t Size()
  56. {
  57. return m_nInPos-m_nOutPos;
  58. }
  59. private:
  60. volatile uint32_t m_nInPos;
  61. volatile uint32_t m_nOutPos;
  62. uint32_t m_nMaxSize;
  63. uint32_t m_nMaxSizeLess;
  64. vector<T> m_vDataBuffer;
  65. };

 

Next is the implementation of the Leader/Follower ThreadPool, starting with the threadpool.h header

 

 
Copy the code
  1. #pragma once
  2. #include <pthread.h>
  3. #include <stdio.h>
  4. #include "SimpleQueue.h"
  5. const int NO_CURRENT_LEADER=0;
  6. struct Job
  7. {
  8. void *arg;
  9. void *(*process)(void *arg);
  10. };
  11.  
  12.  
  13. class ThreadPool
  14. {
  15. public:
  16. ThreadPool(uint32_t nQueueSize=4096,uint32_t nThreadNum=8);
  17. ~ThreadPool();
  18. bool AddWorker(void *(*process)(void *arg),void* arg);
  19. void Destroy();
  20. private:
  21. static void * WorkProcess(void *arg);
  22. void JoinFollower();
  23. void PromoteNewLeader();
  24. SimpleQueue<Job*> m_oQueue;
  25. pthread_cond_t m_pQueueNotEmpty;
  26. pthread_cond_t m_pQueueNotFull;
  27. pthread_cond_t m_pQueueEmpty;
  28. pthread_cond_t m_pNoLeader;
  29. pthread_mutex_t m_pLeaderMutex;
  30. pthread_mutex_t m_pQueueHeadMutex;
  31. pthread_mutex_t m_pQueueTailMutex;
  32. bool m_bQueueClose;
  33. bool m_bPoolClose;
  34. pthread_t *m_vThreadID;
  35. pthread_t m_oLeaderID;
  36. uint32_t m_nThreadNum;
  37. uint32_t m_nMaxTaskNum;
  38. };

Then there is the implementation of ThreadPool.cpp

 

 

 
Copy the code
  1. #include "ThreadPool.h"
  2. using namespace std;
  3. ThreadPool::ThreadPool(uint32_t nQueueSize,uint32_t nThreadNum):m_oQueue(nQueueSize),m_oLeaderID(NO_CURRENT_LEADER)
  4. {
  5. m_nThreadNum=nThreadNum;
  6. m_bQueueClose=false;
  7. m_bPoolClose=false;
  8. m_nMaxTaskNum=m_oQueue.MaxSize();
  9. pthread_cond_init(&m_pQueueNotEmpty,NULL);
  10. pthread_cond_init(&m_pQueueNotFull,NULL);
  11. pthread_cond_init(&m_pQueueEmpty,NULL);
  12. pthread_cond_init(&m_pNoLeader,NULL);
  13. pthread_mutex_init(&m_pLeaderMutex,NULL);
  14. pthread_mutex_init(&m_pQueueHeadMutex,NULL);
  15. pthread_mutex_init(&m_pQueueTailMutex,NULL);
  16. pthread_attr_t attr;
  17. pthread_attr_init (&attr);
  18. pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
  19. m_vThreadID=new pthread_t[m_nThreadNum];
  20. for(size_t i=0; i<nThreadNum; ++i)
  21. {
  22. pthread_create(&(m_vThreadID[i]),&attr,WorkProcess,this);
  23. }
  24.  
  25. }
  26. ThreadPool::~ThreadPool()
  27. {
  28. Destroy();
  29.  
  30. pthread_cond_destroy(&m_pQueueNotEmpty);
  31. pthread_cond_destroy(&m_pQueueNotFull);
  32. pthread_cond_destroy(&m_pQueueEmpty);
  33. pthread_cond_destroy(&m_pNoLeader);
  34. pthread_mutex_destroy(&m_pQueueHeadMutex);
  35. pthread_mutex_destroy(&m_pQueueTailMutex);
  36. pthread_mutex_destroy(&m_pLeaderMutex);
  37. }
  38. void ThreadPool::Destroy()
  39. {
  40. if (m_bPoolClose)
  41. return;
  42. // The queue is closed and no new tasks are accepted
  43. m_bQueueClose=1;
  44.  
  45. pthread_mutex_lock(&m_pQueueTailMutex);
  46. while (m_oQueue.Size()! = 0)
  47. {
  48. pthread_cond_wait(&(m_pQueueEmpty), &(m_pQueueTailMutex));
  49. }
  50. m_bPoolClose=1;
  51. pthread_mutex_unlock(&m_pQueueTailMutex);
  52. // Wake up all threads, ready to exit
  53. pthread_cond_broadcast(&m_pNoLeader);
  54. pthread_cond_broadcast(&m_pQueueNotEmpty);
  55.  
  56. delete [] m_vThreadID;
  57.  
  58.  
  59. }
  60. bool ThreadPool::AddWorker(void *(*process)(void *arg),void* arg)
  61. {
  62. if(m_bQueueClose)
  63. return false;
  64. Job *pNewJob=new Job;
  65. pNewJob->arg=arg;
  66. pNewJob->process=process;
  67.  
  68. pthread_mutex_lock(&m_pQueueHeadMutex);
  69. while(m_oQueue.Size()>=m_nMaxTaskNum&&! m_bQueueClose)
  70. {
  71. pthread_cond_wait(&m_pQueueNotFull, &m_pQueueHeadMutex);
  72. }
  73. if(m_bQueueClose)
  74. {
  75. delete pNewJob;
  76. pthread_mutex_unlock(&m_pQueueHeadMutex);
  77. return false;
  78. }
  79. m_oQueue.Put(pNewJob);
  80. pthread_mutex_unlock(&m_pQueueHeadMutex);
  81. pthread_cond_signal(&m_pQueueNotEmpty);
  82. return true;
  83.  
  84. }
  85.  
  86. void * ThreadPool::WorkProcess(void *arg)
  87. {
  88. ThreadPool *pThreadPool=(ThreadPool*)arg;
  89. pThreadPool->JoinFollower();
  90. while(true)
  91. {
  92. pthread_mutex_lock(&(pThreadPool->m_pQueueTailMutex));
  93. while(pThreadPool->m_oQueue.Size()==0&&! pThreadPool->m_bPoolClose)
  94. {
  95. pthread_cond_wait(&(pThreadPool->m_pQueueNotEmpty),&(pThreadPool->m_pQueueTailMutex));
  96. }
  97. pthread_mutex_unlock(&(pThreadPool->m_pQueueTailMutex));
  98. if(pThreadPool->m_bPoolClose)
  99. {
  100. pthread_exit(NULL);
  101. }
  102. Job *pJob;
  103. pThreadPool->m_oQueue.Fetch(pJob);
  104. if(pThreadPool->m_bQueueClose&&pThreadPool->m_oQueue.Size()==0)
  105. {
  106. pthread_cond_signal(&(pThreadPool->m_pQueueEmpty));
  107. }
  108. pthread_cond_signal(&(pThreadPool->m_pQueueNotFull));
  109. pThreadPool->PromoteNewLeader();
  110. pJob->process(pJob->arg);
  111. delete pJob;
  112. pThreadPool->JoinFollower();
  113. }
  114. return NULL;
  115. }
  116. void ThreadPool::JoinFollower()
  117. {
  118. pthread_mutex_lock(&m_pLeaderMutex);
  119. while(m_oLeaderID! =NO_CURRENT_LEADER&&! m_bPoolClose)
  120. {
  121. pthread_cond_wait(&m_pNoLeader,&m_pLeaderMutex);
  122. }
  123. if(m_bPoolClose)
  124. {
  125. pthread_mutex_unlock(&m_pLeaderMutex);
  126. pthread_exit(NULL);
  127. }
  128. m_oLeaderID=pthread_self();
  129. pthread_mutex_unlock(&m_pLeaderMutex);
  130. }
  131. void ThreadPool::PromoteNewLeader()
  132. {
  133. m_oLeaderID=NO_CURRENT_LEADER;
  134. pthread_cond_signal(&m_pNoLeader);
  135. }

That’s the implementation of the leader/follower thread pool, and the next section will combine this thread pool to implement a simple Epoll model.