This paper analyzes the message processing mechanism of Android, mainly for the asynchronous message processing model composed of Handler, Looper and MessageQueue. First, think about the materials needed by this model:

  • Message queue: Messages sent by handlers are not executed immediately and therefore require a queue to maintain
  • Worker thread: A thread that is constantly retrieving messages and performing callbacks is called a Looper thread
  • Mutual exclusion, where different threads insert messages to the same message queue, requires synchronization
  • Synchronization mechanism when empty message queue, producer consumer model

The above three parts can be simply summarized as follows:

Looper runs model.jpg

All APP UI threads are Looper threads. Each Looper thread maintains a message queue. Other threads, such as Binder threads or custom threads, can use Handler objects to send messages, such as click events, to the message queue thread that the Handler is attached to. All are processed by InputManagerService, communicated with the Binder, sent to the Binder thread on the App side, which then sends a Message to the UI thread. In other words, the Handler inserts a Message into the UI MessageQueue. At the same time, Other threads can also send messages to the UI thread through Handler, obviously there is a need to synchronize, above is the simple description of Android message processing model, after tracking the source code, brief analysis of the specific implementation, as well as some small means inside, first, from the common use of Handler, analysis of its implementation principle,

A basic use of Handler – Message insertion

The key point1>
    Handler hanlder=new Handler();
    <The key point 2>
    hanlder.post(new Runnable() {
        @Override
        public void run() {
            //TODO 
        }
    });Copy the code

There are two things to note here, but let’s look at key point 1, the creation of a Handler object. It might not seem like anything very obvious, but if you create a Handler on a normal thread, you’re going to get an exception, because a normal thread can’t create a Handler object, it has to be a Looper thread to create a Handler object. Take a look at its constructor:

public Handler(Callback callback, boolean async) {

    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}Copy the code

As you can see from the code above, looper.mylooper () must be non-empty or RuntimeException will be thrown. When will looper.mylooper () be non-empty?

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

private static void prepare(boolean quitAllowed) {
    if(sThreadLocal.get() ! =null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}Copy the code

The above two functions involve a slightly distorted data storage model, so don’t analyze it. Just remember that only the thread that called Looper.prepare can generate a thread-only Looper object. While preparing creates a new Looper object, the key message queue object is created:

private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}Copy the code

After that, a thread has MessageQueue, and while loop.loop () hasn’t been called to turn the thread into a Loop thread, the new Handler is fine. Then look at the hanlder.post function, which creates a Message(if needed) and inserts the Message into the MessageQueue for the loop thread to pick up and execute.

   public final boolean post(Runnable r)
    {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }

  private static Message getPostMessage(Runnable r) {
    Message m = Message.obtain();
    m.callback = r;
    return m;
}

// Static method, synchronization
public static Message obtain() {
    synchronized (sPoolSync) {
        if(sPool ! =null) {
            Message m = sPool;
            sPool = m.next;
            m.next = null;
            m.flags = 0; // clear in-use flag
            sPoolSize--;
            returnm; }}return new Message();
}Copy the code

The default thread pool size is 50. Of course, it is also possible to create all new messages without using the thread pool. The main purpose of using the thread pool is to improve efficiency and avoid creating objects repeatedly. Because handlers interact with messages so frequently, there are two common methods used by Message thread pools: Obtain (), which is used to retrieve a clean Message from a thread pool, and recycle(), which is used to clean up used messages and put them back into the thread pool. After that, the Message is inserted into the MessageQueue via the Looper object, and sendMessageAtTime is eventually called by the Handler for sending messages

 public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}   

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}Copy the code

MAsynchronous =false (mAsynchronous=false); as you can see, the Handler finally inserts the MessageQueue enqueueMessage function.

boolean enqueueMessage(Message msg, long when)  {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
      // Synchronization is required
    synchronized (this) {
        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0|| when < p.when) { <! Key points -1-->
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else{<! Key points -2-->
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }}
            msg.next = p; // invariant: p == prev.nextprev.next = msg; } <! Key points -3-->
        if(needWake) { nativeWake(mPtr); }}return true; }Copy the code

It is obvious that enqueueMessage needs to be synchronized because there is a scenario where multiple threads insert messages into the MessageQueue of a Loop thread. If mMessages is empty, it means there is no Message. If the current inserted Message does not need to be delayed, or the delay is smaller than the delay of the mMessages header, The current inserted message needs to be placed in the header, and whether to wake up the queue needs to be determined according to the status of the current Loop thread. We will return to this later when we talk about the Loop thread. The first non-empty Message whose Delay event is smaller than the current Message is inserted in front of it. The Loop thread should not wake up when the Message is inserted into the queue if it is sleeping. Asynchronous Message processing is more special. Let’s not discuss it. Finally, look at key 3. If you need to wake up the Loop thread, wake it up through nativeWake.

MessageQueue Execution of a Message

Now that the sending part of the message has two requirements of the message model: message queue + mutex, let’s look at the other two requirements, Loop thread + synchronization mechanism of the consumer model. A MessageQueue can only make sense if it is coupled with a Loop thread (an infinite Loop thread). A normal thread must be able to become a Loop thread through Looper’s Loop function, which in addition to being an infinite Loop contains the logic to extract the message from MessageQueue and execute it. Take a look at this function:

public static void loop() {
  ` <! -- Keypoint 1 Make sure MessageQueue is ready --> Final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); }... <! --> for (;;) {<! Message MSG = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } <! - 4 key to perform message callback - > MSG. Target. DispatchMessage (MSG); . <! --> msg.recycleunchecked (); }}Copy the code

Take a look at key point 1, which ensures that the current thread has called looper. prepare and is ready for the MessageQueue MessageQueue. Key point 2 is to turn the thread into a Looper thread with an infinite loop that reads and executes messages. Key point 3 is the function that extracts the message from MessageQueue. If there is no message on the current MessageQueue, the Loop thread will block until another thread inserts the message, waking up the current thread. If the Message is read successfully, go to key 4 and execute the callback function of the target object. After execution, enter key 5 and recycle the Message object and put it into the Message cache pool. Go straight to key point 3, message extraction and blocking:

   Message next() {

        int pendingIdleHandlerCount = - 1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for(;;) {<! Key points -1--> nativePollOnce(PTR, nextPollTimeoutMillis);<! -- Key point 2 synchronization mutex -->
            synchronized (this) {
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
          <! -- Key 3 is there a barier-->if (msg ! = null && msg.target == null) { do { prevMsg = msg; msg = msg.next; } while (msg ! = null && ! msg.isAsynchronous()); }<! Key 4 whether the first message needs to block wait and calculate the block wait time -->if (msg ! = null) { if (now< msg.when{/ /Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now.Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg! =null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        msg.markInUse(a);return msg; }}else{<!--The key point5You have to wait forever-->
                    nextPollTimeoutMillis = -1;
                }         
          <! If there is no Message that can be executed immediately, check to see if there is an IdleHandler that needs to be processed. If there is no Message, return, block wait, if there is an IdleHandler-->
                if (pendingIdleHandlerCount < 0
                        && (mMessages= =null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount< =0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }
                if (mPendingIdleHandlers= =null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount.4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }
            <!--The key point7To deal withIdleHandler-->
            for (int i = 0; i < pendingIdleHandlerCount; i{+ +)final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler
                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG,"IdleHandler threw exception", t);
                }
                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler); }}} <!--processedIdleHandler, need to rejudgeMessageThe queuenextPollTimeoutMillisThe assignment for0-->pendingIdleHandlerCount = 0; nextPollTimeoutMillis = 0; }}Copy the code

Let’s take a look at key point 1. NativePollOnce is a native function. Its main function is to set a scheduled sleep, and its parameter timeoutMillis

  • TimeoutMillis =0: Returns directly without sleep
  • TimeoutMillis >0: If sleep exceeds timeoutMillis, return
  • TimeoutMillis =-1: Sleep until another thread wakes it up

NextPollTimeoutMillis =0, so the for loop must not block the first time. If it can find a message indicating that the Delay countdown has ended, it returns the message. Otherwise, it executes a second loop, sleeping until the first Delay in the header has ended. So the next function must return a Message object. Before looking at the nativePollOnce function of MessageQueue, we first go through the whole process. Then we look at key point 2, which actually involves a mutual exclusion problem to prevent multiple threads from retrieving messages from the MessageQueue at the same time. Key point 3 is mainly to see whether asynchronous messages need to be processed. See if the received message needs to be executed immediately, return the current message if it needs to be executed immediately, and calculate the wait time if it needs to wait. And then finally, if you need to wait, you need to see if the IdleHandler list is empty, if it’s not empty, you need to process the IdleHandler list, and finally, recalculate.

Then analyze the nativePollOnce function, which can be regarded as the entrance of sleep blocking. This function is a native function, involving Looper and MessageQueue of native layer, because MessageQueue of Java layer is only a simple class. There is no mechanism to handle sleep and wake up, so let’s start with the Java layer MessageQueue constructor, which is involved in thread blocking:

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}Copy the code

The nativeInit function of MessageQueue creates NativeMessageQueue and Looper in the Native layer. However, for the Java layer, the NativeMessageQueue of the Native layer only deals with the sleep and wake up of the thread. Messages sent by the Java layer are still processed at the Java layer:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if(! nativeMessageQueue) { jniThrowRuntimeException(env,"Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(- 1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { <! Key points -1-->
    <! -- eventfd creates an event object -->
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

    AutoMutex _l(mLock);
    rebuildEpollLocked();
}

void Looper::rebuildEpollLocked() {
if (mEpollFd >= 0) {
    close(mEpollFd);
}
mEpollFd = epoll_create(EPOLL_SIZE_HINT);

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);

for (size_t i = 0; i < mRequests.size(a);i{+ +)const Request& request = mRequests.valueAt(i);
    struct epoll_event eventItem;
    request.initEventItem(&eventItem);
    int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD.request.fd, & eventItem);
    if (epollResult < 0) {
        ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
              request.fd.strerror(errno)); }}Copy the code

}

The eventfd function creates an eventFD, which is a counter related FD. If the counter is not zero, a readable event occurs. After read, the counter is cleared and the counter is incresed. The returned fd can perform the following operations: Native layer has a set of MessageQueue and Looper. Java layer has a set of MessageQueue and Looper

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

}Copy the code

The Java layer has its own message queue, and pollOnce does not update the Java layer object. So what does the Native layer’s message queue do for the Java layer? None of the MessageQueue in the Native layer has the ability to send messages. However, later Native added the function of sending messages, but we do not use it in daily development. However, if there is a message in the Native layer, it will preferentially execute the message of the Native layer

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0; . result = pollInner(timeoutMillis); }}Copy the code

The pollInner function is longer, mainly by using epoll_wait to listen on the upper pipe or eventFD, waiting for timeout or other threads to wake up, but not much analysis

     int Looper::pollInner(int timeoutMillis) {

       mPolling = true;
        <! -- Key point 1-->
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
         <! -- Key point 2-->
        mPolling = false;
        mLock.lock();
           <! -- keypoint 3: check the write operation on that fd -->                for (int i = 0; i < eventCount; i{+ +)int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            <!--The key point5Wake up thefdThere is a write operation return onJavaLayer continues execution-->
            if (fd == mWakeEventFd) {
                if (epollEvents & EPOLLIN) {
                    awoken();
                } else { } } 
                else {
              <! -- Keypoint 6 Local MessageQueue has a message, execute local message -->}}Copy the code

Epoll_create, epoll_ctl, epoll_wait, close, etc. A thread blocking listens for multiple FD handles, and if one of them has a write operation, the current thread will wake up. In order to deal with the producer/consumer communication model between multiple threads, look at the 7.0 source code for native layer implementation of synchronization logic:

Looper Java layer and Native layer relationship 7.0.jpg

In earlier versions of Android, the synchronization logic was implemented using pipe communication, but the idea is the same. Take a look at the 4.3 code

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    int result = pipe(wakeFds);
    mWakeReadPipeFd = wakeFds[0];
    mWakeWritePipeFd = wakeFds[1];
    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
    // Allocate the epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0."Could not create epoll instance. errno=%d", errno);

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
}Copy the code

Looper Java layer and native layer relationship 4.3.jpg

summary

  • The principle of loop thread sleep: find the next message to be executed in MessageQueue. If there is no message, it needs to wait indefinitely for other threads to insert the message to wake up. If there is a message, it calculates the time to wait for the next message to be executed and blocks the wait until timeout.
  • There are two message queues in Java layer and Native layer: the Java layer is mainly for business logic, and the Native layer is mainly for sleep and wake up
  • Sleep and wake means: the early version through the pipeline, later, such as 6.0, 7.0, is through eventFD to achieve the same idea.