The article directories

    • preface
    • Observer model
    • Observer Pattern example (Thread pool)
    • Advantages of the observer model
    • Matters needing attention

preface

In terms of design patterns, this time I’m going to change the norm and pick out the important ones. Forget the bits and pieces.

Observer model

When it comes to the observer model, it’s naturally the thread.

What is the observer model? As the name suggests, it’s a trigger mechanism. You ever seen a grenade on TV? Some unlucky guy accidentally pulled the string of a grenade, and it exploded with a bang, leaving the unlucky guy with no residue at all.

This is the observer pattern, where the elements are: monitor, messaging, responder. The wire is the monitor, and the message is that the wire pulls the safety of the grenade, the responder is the grenade, and boom is its response.


Observer Pattern example (Thread pool)

You’ll see it later in this code, because a classic example of the meta-pattern is thread pooling…

//Pthread_pool.h

#pragma once

#include <pthread.h>
#include <unistd.h>
#include <list>	// It is said that the list is not safe. If it is not safe, it is not safe
#include "Cond.h"	// The encapsulated condition variable class inherits from the encapsulated mutex lock class, so it has the dual properties of the lock and condition variable

using namespace std;

class Task// Task interface, which each task must implement in order for the worker thread to schedule the execution of the task {
public:
    Task() {}
    virtual ~Task() {}
    virtual int run(a) = 0; // Leave it to subclasses
};

typedef list<Task*> list_task; // A task queue is used to temporarily store tasks that are waiting to be processed when the thread wakes up, providing a buffer mechanism.

class Pthread_Pool// Thread pool class {
public:
    Pthread_Pool(unsigned int max = 100.unsigned int min = 10.unsigned int wait = 60);
    ~Pthread_Pool(a);void addTask(Task* task);	// Add a new thread to the task queue

private:
    static void* taskThread(void* arg);// The worker thread
    void createThread(a);		// Create a thread
    void destroyThread(a);		// Destroy a thread pool

    unsigned int maxcount;		// Maximum number of threads
    unsigned int mincount; 		// Minimum number of threads
    unsigned int count;	 		// Number of threads in the current thread pool
    unsigned int waitcount; 	// Number of waiting threads
    unsigned int waitsec;		// Wait time
    list_task	 taskList;      // Task queue
    Cond taskCond;    // Task lock, used when thread receives task
    Cond cond;        // Thread lock, used when creating threads
    bool Stop;                  // Whether the thread pool is allowed to operate, set to 0 when the thread pool object is initialized and 1 when the thread pool is destroyed
};
Copy the code
#include "Pthread_Pool.h"

// Open interface 1
Pthread_Pool::Pthread_Pool(unsigned int max, unsigned int min, unsigned int wait)
{
    // Set basic parameters
    count = 0;		// The current thread pool is empty
    waitcount = 0;  // There is no waiting thread
    mincount = min;	// Number of core threads (factory configuration)
    maxcount = max;	// Maximum number of threads (maximum configuration)
    waitsec = wait;	// Thread lifetime (if no task is received after this time, then it is cut)
    Stop = false;	// Allow operation

    // Create a number of threads as the initial thread pool
    cond.lock(a);for (unsigned i = 0; i < mincount; i++)
    {
        createThread(a);// Jump to the implementation of this function ->->->->->
    }
    cond.unlock(a); } Pthread_Pool::~Pthread_Pool()
{
    destroyThread(a);// Destroy the thread pool
}

void Pthread_Pool::createThread(a)
{
    pthread_t tid;
    int ret = pthread_create(&tid, NULL, taskThread, (void*)this);
    // Create a thread for the purpose of executing taskThread(), jump to the implementation of taskThread() ->->->->

    if (ret < 0)
        perror("pthread create error");
    else
        count++;
}

// The worker thread
void* Pthread_Pool::taskThread(void* arg)
{
    pthread_detach(pthread_self()); // Set the thread self-detach property
    Pthread_Pool* pool = (Pthread_Pool*)arg;
    while (1)
    {
        pool->cond.lock(a);// If no worker thread is waiting
        if (pool->taskList.empty())
        {
            if (pool->Stop)	// When a message is received that the thread pool has stopped running
            {
                pool->count--;	// The number of threads is reduced by one
                pool->cond.unlock(a);pthread_exit(NULL); // This thread forces exit
            }

            pool->waitcount++;	// The number of threads waiting for the task is increased by one
            bool bSignal = pool->cond.timewait(pool->waitsec); // A new task is waiting to be awakened
            pool->waitcount--;	// No, I have nothing to do

            // Delete useless threads
            if(! bSignal && pool->count > pool->mincount)// If there is nothing to do && there are extra threads
            {
                pool->count--;	// In the while loop, there are plenty of opportunities to lay off people when there is nothing to do
                pool->cond.unlock(a);pthread_exit(NULL);
            }
        }
        pool->cond.unlock(a);// Remember to release the lock

// If a worker thread is waiting
        if(! pool->taskList.empty())
        {
            pool->taskCond.lock(a);// Add task lock
            Task* t = pool->taskList.front(a);// Get the most advanced task in the task queue and execute it
            pool->taskList.pop_front(a);// Remove the claimed task
            pool->taskCond.unlock(a);// Remember to unlock

            t->run(a);// Start the task
            delete t; // Delete it as soon as possible}}pthread_exit(NULL);
}

// Open interface 2 to add tasks to the task queue
void Pthread_Pool::addTask(Task* task)
{
    if (Stop)	// Whether the thread pool stops working
        return;

    // Add a new task to the task queue
    taskCond.lock(a);// Add task lock
    taskList.push_back(task);	// Add a task
    taskCond.unlock(a);// Remember to unlock

    cond.lock(a);// add a thread lock
    if (waitcount)	// If there are idle threads
    {
        cond.signal(a);// Wake up a thread
    }
    else if (count < maxcount)	// If there are no free threads, in general, walking into the pool, then the design of the thread pool is a bit of a failure
    {
        createThread(a);// Create one
        cond.signal(a);// Then wake up
    }
    cond.unlock(a); }void Pthread_Pool::destroyThread(a)
{
    printf("destroy? \n");

#if 0   // Forcibly clean up
    list_task::iterator it = taskList.begin(a);for(; it! = taskList.end(a); it++) { Task* t = *it;delete t;

        t = NULL;
    }
    taskList.clear(a);#endif

    // Wait for all threads to complete
    Stop = true;
    while (count > 0)
    {
        cond.lock(a); cond.broadcast(a);/ / radio
        cond.unlock(a);sleep(1); }}Copy the code

This is also configured to ensure the synchronization of the thread lock, and the observer mode, that is, the use of conditional variables to wake up, once the arrival of a task, will determine whether there is a free thread, if there is, will directly wake up one to deal with, if not, will join the task queue to go to.


Advantages of the observer model

  • The observer and the observed are abstractly coupled, so that the design, both the observer and the observed, can be independently expanded.
  • Set up a trigger mechanism.

Matters needing attention

  • If an object is both the observer and the observed, it is more complicated. I have yet to come across a particularly perky broadcast chain. Simple single-line broadcast chains can be handled (each chain is three objects, using “mediation + observation” can be solved).