Android interthread messaging mechanism

The principle of creating message queues for threads

Can a Handler be created on a child thread?

Create handler in child thread:

Thread {
  Handler()
}.start()
Copy the code

The following exception is thrown:

Handler cannot be created in a thread that has not called prepare.

Why is this exception thrown? Let’s go to the source code.

Here is the Handler constructor:

public Handler(a) {
    this(null.false);
}
Copy the code
public Handler(Callback callback, boolean async) {...// Get the current thread's Looper object, which is stored in ThreadLocal
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}
Copy the code

Looper is created in the prepare method of the Looper class:

// There is one parameter, meaning whether to allow exit
private static void prepare(boolean quitAllowed) {
    if(sThreadLocal.get() ! =null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}
Copy the code
// Create looper for the main thread
public static void prepareMainLooper(a) {
  	// The main thread looper cannot exit, so pass the parameter false
    prepare(false);
    synchronized (Looper.class) {
        if(sMainLooper ! =null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
      // The main thread looper is created successfully and stored in a static variable, which can be retrieved at any timesMainLooper = myLooper(); }}Copy the code

Looper constructor:

private Looper(boolean quitAllowed) {
  	/ / create a MessageQueue
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}
Copy the code

MessageQueue constructor:

MessageQueue(boolean quitAllowed) {
  	// Save the variable to allow exit
    mQuitAllowed = quitAllowed;
  	// The real initialization is done in the native layer
    mPtr = nativeInit();
}
Copy the code

The following figure shows the relationship between threads, Looper, MessageQueue, and Handler:

A Looper is stored in a ThreadLocal. A Looper is stored in a MessageQueue. MessageQueue receives messages from different handlers and polls them. If a Message exists, it is passed to the corresponding Handler, whose relationship to Message is determined by a target attribute in the Message.

Then continue to see the initialization process of MessageQueue’s native layer:

// Native layer initialization method
jlong android_os_MessageQueue_nativeInt(JNIEnv* env, jclss clazz){
  NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(a);return reinterpret_cast<jlong>(naticeMessageQueue);
}
Copy the code
// MessageQueue constructor for natvie layer
NativeMessageQueue::NativeMessageQueue(){
	mLooper = Looper::getForThread(a);// Get local cache
  // If the local cache is empty
  // Create a Natice layer Looper and place it in the local cache
  if(mLooper == NULL){		
    mLooper = new Looper(false);
    Looper::setForThread(mLooper); }}Copy the code

Then there is the Looper constructor for the native layer:

Looper::Looper(bool allowNonCallbacks){
	mWakeEventFd = eventfd(0,EFD_NONBLOCK);	/ / create the fd
  rebuildEpollLocked(a); }Copy the code

Native layer which is the core part of the message loop, in its constructor, create a fd, in the early version, here is not to create fd, but USES the ordinary pipe, because the pipeline involves two copy of the memory, so in later editions of better performance of fd, its internal is a counter, Two copies are avoided.

Then there is the rebuildEpollLocked() function, which adds the above fd to epoll’s listening queue:

void Looper::rebuildEpollLocked(a){
  // Create an epoll
  mEpollFd = epoll_create(EPOLL_SIZE_HINT); .// Add the fd to the epoll listener queue to listen for its read events
  struct epoll_event eventItem;
  memset(&eventItem,0.sizeof(epoll_event));
  eventItem.events = EPOLLIN;	
  eventItem.data.fd = mWakeEventFd;
  epoll_ctl(mEpollFd,EPOLL_CTL_ADD,mWakeEventFd,&eventItem); . }Copy the code

The real listening is in the loop below:

int Looper::pollOnce(int timeoutMills, int* outFd,...){
  for(;;) {pollInner(timeoutMills); }}Copy the code
int Looper::pollInner(int timeoutMills){
  struct epoll_event eventItems[EPOLL_MAX_EVENTS];
  // block the method and wait for the event to occur
  int eventCount = epoll_wait(mEpollFd, eventItems,...) ;// If you have time, do it in a loop
  for(int i = 0; i < eventCount; i++){
    int fd = eventItems[i].data.fd;
    uint32_t epollEvents = eventItems[i].events;
    if(fd == mWakeEventFd && (epollEvents & EPOLLIN)){
      // If the event is the same as the read event, it will be consumed
      awoken(a);// The method of consumption}}}Copy the code

What is the difference between the main thread Looper and the child thread Looper?

Looper of child thread can exit, Looper of main thread cannot exit.

What is the relationship between Looper and MessageQueue and Handler?

In Java layer, Looper and MessageQueue are one-to-one relationship, while in native layer, NativeMessageQueue contains a native layer Looper, which is also one-to-one relationship.

4. How was MessageQueue created?

Java layer MessageQueue constructor will call a native function to create a Native layer NativeMessageQueue, NativeMessageQueue will create a Looper,Looper will create an EventFD, And create epoll, and then add the fd’s readable events to epoll.

Second, the message cycle process

The message loop is Looper’s loop function:

public static void loop(a) {
  	// Get the current thread's looper
    finalLooper me = myLooper(); .// Get the MessageQueue corresponding to Looper
    finalMessageQueue queue = me.mQueue; .for (;;) {
      	// In a loop, the next message is continually fetched
        Message msg = queue.next(); // might block if the queue is empty
        if (msg == null) {
            // No message indicates that the message queue is quitting.
          	// No message indicating that Looper is finished, return
            return;
        }
        try {
          	// Use the corresponding MSG handler to distribute the message.
            msg.target.dispatchMessage(msg);
        } finally{... }...// Reclaim message, that is, reset the state of message and put it into the object pool (single linked list)msg.recycleUnchecked(); }}Copy the code

The two most important steps are fetching and distributing messages.

Take a look at the message first: the next() method in MessageQueue

Message next(a) {...int nextPollTimeoutMillis = 0;
    for(;;) {...// Block here, waiting for another thread to send a message, or after the timeout, wake up and return
        nativePollOnce(ptr, nextPollTimeoutMillis); 
        synchronized (this) {
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if(msg ! =null && msg.target == null) {
                // Stalled by a barrier. Find the next asynchronous message in the queue.
              	// This is a message barrier
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while(msg ! =null && !msg.isAsynchronous());
            }
          	// The following is the normal message processing flow
            if(msg ! =null) {
                if (now < msg.when) {
                    // If the message is not ready, set the timeout and wake up when the message is ready
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Get the message if it is ready
                    mBlocked = false;
                    if(prevMsg ! =null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    // The message is marked for use
                    msg.markInUse();
                    returnmsg; }}else {
                // If there is no message, the queue is empty and the timeout is set to -1
                nextPollTimeoutMillis = -1; }... }}Copy the code

NativePollOnce:

void android_os_MessageQueue_naticePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis){
  NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
  nativeMessageQueue->pollOnce(env,obj,timeoutMills);
}
Copy the code
void NativeMessageQueue::pollOnce(JNIEnv* env,jobject pollObj,int timeoutMillis){...// Core, loop
  mLooper->pollOnce(timeoutMillis); . }Copy the code

As can be seen, Looper’s pollOnce function of the Native layer is finally called, and the pollOnce function finally calls its pollInner function, which is the final method to listen to messages.

3. Message sending process

Handler sendMessage, senMessageEmpty sendMessageDelayed, post, postDelay, postAt method and so on, finally all calls to:

public final boolean sendMessage(Message msg) {
    return sendMessageDelayed(msg, 0);
}
Copy the code
public final boolean sendMessageDelayed(Message msg, long delayMillis){
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
Copy the code
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    // A reference to messageQueue is savedMessageQueue queue = mQueue; .// Pass message and timestamp information into MQ
    return enqueueMessage(queue, msg, uptimeMillis);
}
Copy the code
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
  	// Set the target of message to this, that is, the handler that sends the message
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}
Copy the code

Then go to enqueueMessage of MQ:

boolean enqueueMessage(Message msg, long when) {...synchronized (this) {... msg.markInUse(); msg.when = when; Message p = mMessages;boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue. Usually we don't have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

        // We can assume mPtr ! = 0 because mQuitting is false.
        if(needWake) { nativeWake(mPtr); }}return true;
}
Copy the code

NativeWake writes messages to the underlying implementation of the queue, that is, writes a number to eventFD in the Looper of the native layer, and then the FD can receive the event.

void android_os_MessageQueue_nativeWake(JNIEnv* env,jclass clazz, jlong ptr){
  NativeMessageQueue* naticeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
  nativeMessageQueue->wake(a); }Copy the code

NativeMessageQueue finally calls its internal Looper’s wake method:

void Looper::wake(a){
  uint64_t inc = 1;
  // Write a number to mWakeEventFd
  write(mWakeEventFd,&inc,sizeof(uint64_t));
}
Copy the code

To sum up, the process of adding a message starts with the enqueueMessage method of MessageQueue in the Java layer. This method adds the message to the MessageQueue and calls the wake() method of the local MQ corresponding to the message. Finally, a number is written to a counter FD through the local Looper to notify the queue of the message addition, and the counter FD is listened in the loop of the local Looper to wake up the native method called by the upper layer to get the message and process it.

4. The process of message distribution

Message distribution is in the Handler’s dispatchMessage

/** * Handle system messages here. */
public void dispatchMessage(Message msg) {
    if(msg.callback ! =null) {
      	// See if MSG has a callback. If so, call handleCallback
        handleCallback(msg);
    } else {
      	// if MSG has no callback, see if there is a global mCallback
        if(mCallback ! =null) {
          	// If so, the global handleMessage will be called, and the logic will be based on the return value of this callback.
       			// If false is returned, the handleMessage method will be executed
            if (mCallback.handleMessage(msg)) {
                return; }}// Our own copy of the handleMessage methodhandleMessage(msg); }}Copy the code