background

The students who are familiar with Android should be very familiar with Handler, Looper, MessageQueue and Message. Android is a message-driven system, and we use Message team in many places in daily development. Android also encapsulates a powerful and easy-to-use message processing API, but how can we implement this functionality in C++ layer?

We don’t need a complicated IPC mechanism, we just need to implement a simple message queue mechanism. Android’s message queuing mechanism is too complicated to implement. In audio and video, many things don’t need to be that complicated. We just need to cut out the parts that we need.

Android Message Queue

The Android message distribution classes are:

  • HandlerThread
  • Looper
  • Handler
  • MessageQueue
  • Message

The code for building a normal message distribution mechanism is as follows:

HandlerThread thread = new HandlerThread("Message Thread"); thread.start(); Handler handler = new Handler(thread.getLooper()); / /... handler.sendMessage(....)Copy the code

The general process is as follows:

  • By creating an instance of HandlerThread, a Looper instance is constructed within the HandlerThread instance
  • The message queue rotation is started by calling the start() method of the HandlerThread instance to enter the rotation in Looper
  • The Handler instance holds the newly created Looper instance
  • Build a MessageQueue MessageQueue in the Looper instance
  • Each time a Handler sends a message, it is added to the message queue via a Looper instance held by the Handler
  • The Looper rotates and processes messages

The simple process is shown as follows:

You can see that in Looper.java there is an infinite loop executing in the rotation function, which continually consumes messages in the message queue (if there are any) and waits if there are none.

From our brief analysis above, we can get a clear picture of Android’s native message queuing mechanism, but there are some areas where it is too complicated to be necessary in audio and video SDK processing, which I will mention below. Let’s provide the C++ layer message queuing mechanism based on an analysis of Android’s native message queuing.

C++ message queue

We followed suit and defined several files in C++ :

  • handler_thread.cc
  • handler.cc
  • looper.cc
  • message_queue.cc
  • message.cc

Each file provides basically the same functionality as Android, but let’s take a quick look at the code for later elaboration.

1.handler_thread

handler_thread.h

#include <pthread.h> #include <string> #include "handler.h" #include "looper.h" namespace thread { class HandlerThread {  public: static HandlerThread *Create(std::string name); void RunInternal(); private: HandlerThread(std::string name); public: ~HandlerThread(); void Quit(); bool QuitSafely(); Looper *GetLooper(); private: std::string name_; pthread_t thread_; pthread_mutex_t mutex_; pthread_cond_t cond_; Looper *looper_; bool exiting_; bool exited_; }; } // namespace threadCopy the code

handler_thread.cc

#include "handler_thread.h" #include "log.h" namespace thread { HandlerThread *HandlerThread::Create(std::string name) {  return new HandlerThread(name); } static void *RunTask(void *context) { auto handler_thread = reinterpret_cast<HandlerThread *>(context); handler_thread->RunInternal(); pthread_exit(nullptr); } HandlerThread::HandlerThread(std::string name) : name_(name) , looper_(nullptr) , exiting_(false) , exited_(false) { pthread_mutex_init(&mutex_, nullptr); pthread_cond_init(&cond_, nullptr); pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&thread_, &attr, RunTask, (void *) this); } void HandlerThread::RunInternal() { pthread_mutex_lock(&mutex_); exiting_ = false; exited_ = false; pthread_mutex_unlock(&mutex_); Looper::Prepare(); pthread_mutex_lock(&mutex_); looper_ = Looper::MyLooper(); pthread_cond_broadcast(&cond_); pthread_mutex_unlock(&mutex_); Looper::Loop(); Looper::Exit(); pthread_mutex_lock(&mutex_); exiting_ = false; looper_ = nullptr; exited_ = true; pthread_mutex_unlock(&mutex_); } HandlerThread::~HandlerThread() { if (looper_) { looper_->Quit(true); } pthread_join(thread_, nullptr); pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&cond_); if (looper_) { delete looper_; looper_ = nullptr; } } void HandlerThread::Quit() { pthread_mutex_lock(&mutex_); if (exiting_ || exited_) { pthread_mutex_unlock(&mutex_); return; } exiting_ = true; pthread_mutex_unlock(&mutex_); Looper *looper = GetLooper(); if (looper) { looper->Quit(false); } } bool HandlerThread::QuitSafely() { pthread_mutex_lock(&mutex_); if (exiting_ || exited_) { pthread_mutex_unlock(&mutex_); return false; } exiting_ = true; pthread_mutex_unlock(&mutex_); Looper *looper = GetLooper(); if (looper) { looper->Quit(true); return true; } return false; } Looper *HandlerThread::GetLooper() { pthread_mutex_lock(&mutex_); if (exited_) { LOGE("Thread has been exited"); pthread_mutex_unlock(&mutex_); return nullptr; } if (looper_ == nullptr) { LOGE("Thread should wait"); pthread_cond_wait(&cond_, &mutex_); } pthread_mutex_unlock(&mutex_); return looper_; }Copy the code

2.handler

handler.h

#include "looper.h" #include "message.h" namespace thread { class Looper; class Message; Class HandlerCallback {public: virtual void HandleMessage(Message * MSG) {}}; class Handler { public: Handler(Looper *looper, HandlerCallback *callback); ~Handler(); /** * @param MSG */ void SendMessage(Message * MSG); Void DispatchMessage(Message * MSG); @param what */ void RemoveMessage(int what); /** * the number of messages in the queue * @return */ int Size(); private: Looper *looper_; HandlerCallback *callback_; }; } // namespace threadCopy the code

handler.cc

H "#include "handler.h" #include "log.h" /** ** HandlerThread holds Looper * Handler holds Looper * Handler sends messages through Looper to rotate messages * Thread {Handler: Handler(Looper * Looper, HandlerCallback *callback) : looper_(looper) , callback_(callback) { } Handler::~Handler() { } void Handler::SendMessage(Message *msg) { if (looper_) { msg->target = this; looper_->SendMessage(msg); } } void Handler::DispatchMessage(Message *msg) { if (callback_) { callback_->HandleMessage(msg); } } void Handler::RemoveMessage(int what) { if (looper_) { looper_->RemoveMessage(what); } } int Handler::Size() { if (looper_) { return looper_->Size(); } return 0; }}Copy the code

3.looper

looper.h

#include "message.h" #include "message_queue.h" #include <cstdlib> #include <map> #include <mutex> namespace thread { class Message; class MessageQueue; class Looper { public: Looper(); ~Looper(); /** * Create a message looper instance */ static void Prepare(); /** * start message Loop */ static void Loop(); static Looper *MyLooper(); static int64_t MyLooperId(); static void Exit(); /** * If the safely = true; * @param safely */ void Quit(bool safely); /** * If the safely = true; void Dump(); /** * the number of messages in the queue * @return */ int Size(); void SendMessage(Message *msg); void RemoveMessage(int what); Private: /** * message loop handler */ void LoopInternal(); void EnqueueMessage(Message *msg); Message *Take(); private: bool exiting_; bool exited_; bool exit_safely_; bool looping_; pthread_mutex_t variable_mutex_; MessageQueue *message_queue_; }; class LooperManager { public: friend Looper; static LooperManager *GetInstance(); public: LooperManager(); ~LooperManager(); Looper *Create(int64_t tid); Looper *Get(int64_t tid); void Remove(int64_t tid); int Size(); private: static LooperManager *instance_; std::map<int64_t, Looper *> looper_map_; std::mutex looper_mutex_; }; } // namespace threadCopy the code

looper.cc

#include "looper.h" #include "thread.h" #include "log.h" #include <cassert> #include "time_utils.h" namespace thread { Looper::Looper() : exiting_(false) , exited_(false) , exit_safely_(false) , looping_(false) { message_queue_ = new MessageQueue(); pthread_mutex_init(&variable_mutex_, nullptr); } Looper::~Looper() { pthread_mutex_destroy(&variable_mutex_); } void Looper::Prepare() { int64_t tid = Thread::CurrentThreadId(); Looper *looper = LooperManager::GetInstance()->Create(tid); if (looper == nullptr) { LOGE("Current thread looper has been called"); } } void Looper::Loop() { MyLooper()->LoopInternal(); } Looper * Looper::MyLooper() { int64_t tid = Thread::CurrentThreadId(); Looper *looper = LooperManager::GetInstance()->Get(tid); if (looper == nullptr) { LOGE("Please invoke Looper::Prepare first"); } assert(looper); return looper; } int64_t Looper::MyLooperId() { return reinterpret_cast<int64_t>(MyLooper()); } void Looper::Exit() { int64_t tid = Thread::CurrentThreadId(); LooperManager::GetInstance()->Remove(tid); } void Looper::Quit(bool safely) { pthread_mutex_lock(&variable_mutex_); if (exiting_ || exited_) { pthread_mutex_unlock(&variable_mutex_); return; } exit_safely_ = safely; exiting_ = true; pthread_mutex_unlock(&variable_mutex_); message_queue_->Notify(); } void Looper::Dump() { message_queue_->Dump(); } int Looper::Size() { return message_queue_->Size(); } void Looper::SendMessage(Message *msg) { pthread_mutex_lock(&variable_mutex_); if (exiting_ || exited_) { pthread_mutex_unlock(&variable_mutex_); return; } pthread_mutex_unlock(&variable_mutex_); EnqueueMessage(msg); } void Looper::RemoveMessage(int what) { message_queue_->RemoveMessage(what); } void Looper::LoopInternal() { pthread_mutex_lock(&variable_mutex_); if (looping_ || exiting_ || exited_) { pthread_mutex_unlock(&variable_mutex_); return; } looping_ = true; pthread_mutex_unlock(&variable_mutex_); for (;;) { Message *msg = Take(); if (msg) { if (msg->target) { msg->target->DispatchMessage(msg); } delete msg; } pthread_mutex_lock(&variable_mutex_); if (exit_safely_) { if (exiting_ && message_queue_->Size() == 0) { pthread_mutex_unlock(&variable_mutex_); break; } } else { if (exiting_) { pthread_mutex_unlock(&variable_mutex_); break; } } pthread_mutex_unlock(&variable_mutex_); } int64_t time = TimeUtils::GetCurrentTimeUs(); while (message_queue_->Size() > 0) { Message *msg = message_queue_->Take(); if (msg) { delete msg; } } message_queue_->Clear(); LOGI("Clear message_queue cost time=%lld us", (TimeUtils::GetCurrentTimeUs() - time)); pthread_mutex_lock(&variable_mutex_); exiting_ = false; exited_ = true; looping_ = false; pthread_mutex_unlock(&variable_mutex_); } void Looper::EnqueueMessage(Message * MSG) {/// TODO message_queue_->Offer(MSG); } Message * Looper::Take() { return message_queue_->Take(); } /// ------------------------------------------------------------------ LooperManager *LooperManager::instance_ = new LooperManager(); LooperManager::LooperManager() { } LooperManager::~LooperManager() { } LooperManager * LooperManager::GetInstance() { return instance_; } Looper * LooperManager::Create(int64_t tid) { std::lock_guard<std::mutex> guard(looper_mutex_); auto it = looper_map_.find(tid); if (it == looper_map_.end()) { Looper *looper = new Looper(); looper_map_[tid] = looper; return looper; } return nullptr; } Looper * LooperManager::Get(int64_t tid) { std::lock_guard<std::mutex> guard(looper_mutex_); auto it = looper_map_.find(tid); if (it == looper_map_.end()) { return nullptr; } return it->second; } void LooperManager::Remove(int64_t tid) { std::lock_guard<std::mutex> guard(looper_mutex_); auto it = looper_map_.find(tid); if (it ! = looper_map_.end()) { looper_map_.erase(it); } } int LooperManager::Size() { std::lock_guard<std::mutex> guard(looper_mutex_); return looper_map_.size(); }}Copy the code

4.message_queue

message_queue.h

#include "message.h" #include <pthread.h> #include <list> namespace thread { class Message; class MessageQueue { public: MessageQueue(); ~MessageQueue(); * @param MSG */ void Offer(Message * MSG); @param MSG/void OfferAtFront(Message * MSG); /** * get Message * @return */ Message *Take(); /** * unlock */ void Notify(); int Size(); bool IsEmpty(); void Clear(); /** * delete MSG. What = what message * @param what */ void RemoveMessage(int what); void Dump(); private: pthread_mutex_t queue_mutex_; pthread_cond_t queue_cond_; STD ::list<Message *> queue_; bool is_destroyed_; }; } // namespace threadCopy the code

message_queue.cc

#include "message_queue.h" #include "log.h" #include <sstream> namespace thread { MessageQueue::MessageQueue() : is_destroyed_(false) { pthread_mutex_init(&queue_mutex_, nullptr); pthread_cond_init(&queue_cond_, nullptr); } MessageQueue::~MessageQueue() { LOGI("Enter"); pthread_mutex_lock(&queue_mutex_); is_destroyed_ = true; pthread_mutex_unlock(&queue_mutex_); Clear(); pthread_mutex_destroy(&queue_mutex_); pthread_cond_destroy(&queue_cond_); LOGI("Leave"); } void MessageQueue::Offer(Message *msg) { pthread_mutex_lock(&queue_mutex_); if (is_destroyed_) { pthread_mutex_unlock(&queue_mutex_); return; } queue_.push_back(msg); pthread_cond_broadcast(&queue_cond_); pthread_mutex_unlock(&queue_mutex_); } void MessageQueue::OfferAtFront(Message *msg) { pthread_mutex_lock(&queue_mutex_); if (is_destroyed_) { pthread_mutex_unlock(&queue_mutex_); return; } queue_.push_front(msg); pthread_cond_broadcast(&queue_cond_); pthread_mutex_unlock(&queue_mutex_); } Message *MessageQueue::Take() { pthread_mutex_lock(&queue_mutex_); if (is_destroyed_) { pthread_mutex_unlock(&queue_mutex_); return nullptr; } if (Size() <= 0) { pthread_cond_wait(&queue_cond_, &queue_mutex_); } if (queue_.empty()) { pthread_mutex_unlock(&queue_mutex_); return nullptr; } Message *msg = queue_.front(); queue_.pop_front(); pthread_mutex_unlock(&queue_mutex_); return msg; } void MessageQueue::Notify() { pthread_mutex_lock(&queue_mutex_); pthread_cond_broadcast(&queue_cond_); pthread_mutex_unlock(&queue_mutex_); } int MessageQueue::Size() { return queue_.size(); } bool MessageQueue::IsEmpty() { return queue_.empty(); } void MessageQueue::Clear() { Notify(); if (queue_.empty()) { return; } pthread_mutex_lock(&queue_mutex_); while (! queue_.empty()) { Message *msg = queue_.front(); queue_.pop_front(); if (msg) { delete msg; } } queue_.clear(); pthread_mutex_unlock(&queue_mutex_); } void MessageQueue::RemoveMessage(int what) { pthread_mutex_lock(&queue_mutex_); if (is_destroyed_) { pthread_mutex_unlock(&queue_mutex_); return; } std::list<Message *>::iterator it = queue_.begin(); while (it ! = queue_.end()) { Message *msg = *it; if (what == msg->what) { delete msg; it = queue_.erase(it); continue; } ++it; } pthread_mutex_unlock(&queue_mutex_); } void MessageQueue::Dump() { std::ostringstream os; std::list<Message *>::iterator it = queue_.begin(); while (it ! = queue_.end()) { Message *msg = *it; os << msg->what<<"\n"; ++it; } LOGI("Result=%s", os.str().c_str()); }}Copy the code

5.message

message.h

#include "handler.h"
#include <string>

namespace thread {

class Handler;

class Message {
public:
  Message();

  ~Message();

public:
  int what;
  int arg1;
  int arg2;
  int arg3;
  int arg4;
  int arg5;
  int arg6;
  int arg7;
  void *obj1;
  void *obj2;

public:
  Handler *target;
};

}  // namespace thread
Copy the code

message.cc

#include "message.h" #include "log.h" namespace thread { Message::Message() : what(-1) , arg1(-1) , arg2(-1) , arg3(-1) , arg4(-1) , arg5(-1) , arg6(-1) , arg7(-1) , obj1(nullptr) , obj2(nullptr) , Target (nullptr) {} Message::~Message() {** * obj1 * obj2 * target * should not be destroyed in the Message destructor.Copy the code

How to use C++ message queue

Initialization:

  std::string name("AV Message Queue");
  thread::HandlerThread *handler_thread = thread::HandlerThread::Create(name);
  thread::Handler *handler = new thread::Handler(handler_thread->GetLooper(), this);
Copy the code

HandleMessage(thread::Message * MSG);

Don’t forget to destroy the handler_thread and handler Pointers in the destructor.

Send a message:

  thread::Message *msg = new thread::Message();
  msg->what = MSG_WHAT;
  msg->obj1 = XXXX;
  handler->SendMessage(msg);
Copy the code

legacy

  • Synchronous waiting message processing
  • Delayed message processing

Delayed message processing needs to use the result of linked list. At present, we use the double-ended queue, but the AUDIO and video SDK is sufficient for now. I will supplement it if there is any change later.