The threaded message communication mechanism consisting of Handler, MessageQueue, and Looper is very common in Android development, but most people only have a superficial look at the Java layer implementation, the details are not very clear. This post will examine the implementation of Android messaging from the Java layer to the Native layer.

The message mechanism is more abstract, so the overall structure is simpler, including only the generation and distribution of messages, unlike the Input subsystem, there is no sorting, filtering, etc. The overall structure is shown as follows:

Android Java layer messaging mechanism

Message generation

In the Java layer, messages are generated from user-created Message objects, encapsulated Runnable objects, or obtainMessage calls from the Message Pool, which refers to the Message loop queue within the Message class. The queue header is the static Message object sPool, which can hold MAX_POOL_SIZE (50) messages:

Message reuse by MessagePool saves the overhead caused by continuous Message creation. If all 50 messages have been used, MessagePool is a circular queue, and the Message will be returned to the queue head, empty the Message, and reuse down.

BlockingRunnable

Java layer Handler: BlockingRunnable: BlockingRunnable: BlockingRunnable: BlockingRunnable: BlockingRunnable: BlockingRunnable

private static final class BlockingRunnable implements Runnable {
    private final Runnable mTask;
    private boolean mDone;

    public BlockingRunnable(Runnable task) {
        mTask = task;
    }

    @Override
    public void run(a) {
        try {
            mTask.run();
        } finally {
            synchronized (this) {
                mDone = true; notifyAll(); }}}public boolean postAndWait(Handler handler, long timeout) {
        if(! handler.post(this)) {
            return false;
        }

        synchronized (this) {
            if (timeout > 0) {
                final long expirationTime = SystemClock.uptimeMillis() + timeout;
                while(! mDone) {long delay = expirationTime - SystemClock.uptimeMillis();
                    if (delay <= 0) {
                        return false; // timeout
                    }
                    try {
                        wait(delay);
                    } catch (InterruptedException ex) {
                    }
                }
            } else {
                while(! mDone) {try {
                        wait();
                    } catch (InterruptedException ex) {
                    }
                }
            }
        }
        return true; }}Copy the code

As you can see, BlockingRunnable is a Runnable of Runnable passed in the BlockingRunnable constructor. Calling BlockingRunnable’s postAndWait does the following:

  1. Return false if Posting BlockingRunnable fails
  2. Locks the thread that posts BlockingRunnable
  3. If timeout is greater than 0, calculate the Runnable expiration time. As long as Runnable is not finished, poll for the remaining time and call wait(delay) to let the thread sending BlockingRunnable continue to wait. The process does not end until the straight argument Runnable is processed (mDone is true)
  4. If timeout is less than or equal to 0 and the Runnable argument is not finished, wait until the Runnable argument is finished (mDone is true).

The instructions and use risks for this item can be found in the comment for the runWithScissors method, but I’m not going to translate it here.

Message delivery and processing

After receiving the Message, enqueueMessage of MessageQueue will be called through Handler’s sendMessageAtTime to deliver the Message to MessageQueue. Before learning further, we must first understand the creation of Handler. Because the rest of the story has to do with it.

Handler creation and initialization

The Handler initialization is nothing to look at, save references to Callback, mLooper’s MessageQueue, and declare whether the Handler sends all messages asynchronously. But there is a memory leak check that you can learn from, which is that if FIND_POTENTIAL_LEAKS is turned on, it checks for memory leaks, and it does the following:

  1. Gets the current Handler class
  2. If Handler is an anonymous inner class, or a member class, or a local class, and Handler’s modifier is not static
  3. A log message is displayed indicating a possible memory leak
public Handler(Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) {
        final Class<? extends Handler> klass = getClass();
        if((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) = =0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                klass.getCanonicalName());
        }
    }

    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

public static @Nullable Looper myLooper(a) {
    return sThreadLocal.get();
}Copy the code

If Handler creation is so simple, why is it relevant to what we’ll learn later? We can see that the Looper is returned by the sThreadLocal.

ThreadLocal – Maintains the uniqueness of objects in a thread

ThreadLocal is a class that creates thread-local variables.

Normally, we create variables that can be accessed and modified by any thread. Variables created using ThreadLocal can only be accessed by the current thread and cannot be accessed or modified by other threads. Its implementation principle is as follows:

As you can see, ThreadLocalRef is actually a reference to the same ThreadLocal object. To keep the lines from getting messy, I used two squares to represent the ThreadLocal object, but it’s actually the same object. A ThreadLocal is a Key of a ThreadA, a ThreadB, or even a ThreadLocalMap within a ThreadN.

ThreadLocalMap

ThreadLocalMap is a custom HashMap used to maintain ThreadLocal values only within the Thread class. To avoid that Key->ThreadLocal of ThreadLocalMap cannot be reclaimed during GC, the elements inside are wrapped with WeakReference. ThreadLocalMap is nothing special other than that, so I won’t go into details.

One thing to note: ThreadLocalMap can cause memory leaks, but the root cause is not ThreadLocalMap itself, but poor code quality. First of all, since the ThreadLocal used as the Key of the Map is a weak reference, the ThreadLocal will be reclaimed during GC. At this time, there are a pair of key-value pairs in the Map whose Key is null, and the Value is still strongly referenced by the thread. Therefore, if we do not remove the ThreadLocal after using it up, You’re going to leak memory. In fact, you can circumvent this problem by actively calling remove when ThreadLocal runs out, as it should.

Entry

Entry, as an element of ThreadLocalMap, represents a key-value pair: weak references to ThreadLocal are keys, and objects to be stored with ThreadLocal are values.

static class Entry extends WeakReference<ThreadLocal> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal k, Object v) {
        super(k); value = v; }}Copy the code

ThreadLocal summary

In other words, local variables that cannot be modified by other threads mean that each thread maintains a ThreadLocalMap with ThreadLocal as the key and local variables as the value, controlling access and data consistency through key-value pairs rather than locks.

Looper

Since a thread has only one Looper, what is in a Looper? As you can see from the source code, the Looper constructor is private, which means that obtaining Looper objects is almost always a singleton, which is consistent with thread <->Looper one-to-one mapping.

private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

private static void prepare(boolean quitAllowed) {
    if(sThreadLocal.get() ! =null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}Copy the code

From the Looper member variables we know that Looper contains the following:

  • SMainLooper: Applies the Looper of the main thread, null when creating Looper of other threads
  • MQueue: MessageQueue associated with Looper
  • MThread: indicates the thread associated with Looper
  • SThreadLocal: Key of a thread-local variable

From this we know that a thread corresponds to a Looper, and a Looper corresponds to a MessageQueue

— — — — — — — — — — — — — — — – line, then back to the end of the message delivery place — — — — — — — — — — — — — — — –

After receiving the Message, enqueueMessage of MessageQueue will be called through Handler’s sendMessageAtTime to deliver the Message to MessageQueue. Before learning further, we must first understand the creation of Handler. Because the rest of the story has to do with it.

Now that we know where the Message will be sent to the MessageQueue, what happens to the Message once it is sent? This code is very long, and it’s just a queue entry process, so I don’t post it and do the following:

  1. Legality check
  2. The tag Message is in use
  3. loaded
  4. Wake up native MessageQueue

Adding a Barrier Message to a MessageQueue will block all synchronous messages whose execution time is later than the Barrier Message, leaving asynchronous messages unaffected. This will continue until the Barrier Message is removed.

Tip: In the figure, green indicates that Message can be retrieved and executed, red indicates that Message cannot be retrieved and executed

MessageQueue () {Message () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier () {postSyncBarrier ();

boolean enqueueMessage(Message msg, long when) {
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    ……
}

private int postSyncBarrier(long when) {
    // Enqueue a new sync barrier token.
    // We don't need to wake the queue because the purpose of a barrier is to stall it.
    synchronized (this) {
        final int token = mNextBarrierToken++;
        final Message msg = Message.obtain();
        msg.markInUse();
        msg.when = when;
        msg.arg1 = token;

        Message prev = null;
        Message p = mMessages;
        if(when ! =0) {
            while(p ! =null&& p.when <= when) { prev = p; p = p.next; }}if(prev ! =null) { // invariant: p == prev.next
            msg.next = p;
            prev.next = msg;
        } else {
            msg.next = p;
            mMessages = msg;
        }
        returntoken; }}Copy the code

Message distribution

Now that we know that a Message is delivered to a MessageQueue, let’s look at how the Message is iterated. The loop() method must be called by Looper after prepare is created. That’s because you’re using it on the main thread, which already calls the loop() method when creating Looper.

After we create a Looper for another thread, calling the loop() method does the following:

  1. Loop to get the Message in MessageQueue
  2. The Message is dispatched to the corresponding Handler using the Handler’s dispatchMessage method
  3. Empty the Message and recycle it to the Message Pool for next use
public static void loop(a) {...for (;;) {
        Message msg = queue.next(); // might block...try {
            msg.target.dispatchMessage(msg);
        } finally{... }... msg.recycleUnchecked(); }}Copy the code

Handler “dispatchMessage” has a priority for processing messages:

  1. If Message sets a callback, Message is passed to Message’s callback
  2. If the Handler sets a callback, Message is passed to the Handler’s callback first
  3. Otherwise, the Message is passed to the Handler’s handleMessage
public void dispatchMessage(Message msg) {
    if(msg.callback ! =null) {
        handleCallback(msg);
    } else {
        if(mCallback ! =null) {
            if (mCallback.handleMessage(msg)) {
                return; } } handleMessage(msg); }}Copy the code

For MessageQueue, it actually represents the MessageQueue of Java layer and Native layer. The MessageQueue of Java layer is the circular queue represented by mMessages, and the MessageQueue of Native layer is mPtr. Its next() method does the following:

  1. Calling nativePollOnce causes the Native layer’s MessageQueue to process the Native layer’s Message first and then the Java layer’s Message, which may block
  2. If a Barrier Message (i.e., a Message with an empty handler) is found during a time-order traversal of a MessageQueue, all synchronous messages following it are skipped and only asynchronous messages are processed
  3. If the Message is delayed, calculate the difference between the current time and the target time and sleep the difference before retrieving the Message
  4. If the Message is not a delayed Message, mark the Message in use in the Message Pool and return it

The entire process of Java layer Android messaging can be summarized as follows:

For those of you who have studied Java layer code, there’s also a MessengerImpl for cross-process Message communication, which I’m not going to talk about here, because it’s just a simple cross-process communication, And the whole Handler, Looper, MessageQueue.

Android Native layer messaging mechanism

The Android messaging mechanism in the Native layer is actually very similar to the Java layer, retaining the structure of Handler, Looper, and MessageQueue. However, the concepts of Message, Handler and MessageQueue in Native layer have been greatly weakened and are basically just “empty shells”, the core logic is all in Looper.

Other differences are not big, just in the implementation of a little different, specific differences in the source code to find the answer. The overall structure diagram is as follows:

Message generation

In the Native layer, messages are made up of MessageEnvelope and epoll_event that encapsulates the fd information (Java layer handlers can add fd listeners, and Native can of course).

fd

Looper does the following for the fd message to be monitored:

  1. Legality check
  2. Encapsulate the relevant information into the Request and initialize it as epoll_event
  3. Register the FD and the epoll_event event (obtained by converting Request in Step 2) to the current Looper’s epollFd
  4. If an error occurs, handle the error
  5. Update mRequests
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
    ……

    { // acquire lock
        AutoMutex _l(mLock); ... struct epoll_event eventItem; request.initEventItem(&eventItem); ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex < 0) {
            intepollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); ... mRequests.add(fd, request); }else {
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
            ……
            mRequests.replaceValueAt(requestIndex, request);
        }
    } // release lock
    return 1;
}Copy the code

MessageEnvelope

MessageEnvelope is much simpler than FD. Uptime, MessageHandler and Native layer Message will be encapsulated into MessageEnvelope when calling sendMessage related functions of Native layer Looper. Then insert it into mMessageEnvelopes.

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, constThe Message & Message) {... size_t i =0;
    { // acquire lock
        AutoMutex _l(mLock);

        size_t messageCount = mMessageEnvelopes.size();
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }

        MessageEnvelope messageEnvelope(uptime, handler, message);
        mMessageEnvelopes.insertAt(messageEnvelope, i, 1); ... }... }Copy the code

Message delivery and processing

As mentioned above, when MessageQueue of Java layer processes messages, it will first call nativePollOnce() of Native layer MessageQueue, which actually calls pollOnce() of Native layer MessageQueue. Native pollOnce calls Native layer Looper’s pollOnce:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
    ……
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, intTimeoutMillis) {... mLooper->pollOnce(timeoutMillis); ... }Copy the code

Before we look at the pollOnce method for Native Looper, let’s see if there are any differences between Native Looper and Java Looper.

Looper Native

Like the Java layer Looper, the Native layer Looper also requires prepare, which is an object stored in a thread-local variable, one per thread. So how do you implement thread-local variables in the Native layer?

Linux TSD (Thread – specific Data pool

The idea of thread-local variables in the Native layer is similar to that in the Java layer. The Native layer maintains a global pthread_keys array that holds the keys of thread-local variables. Seq is used to mark “in_use” and destr is a function pointer that can be used as a destructor to release thread-local variables corresponding to the thread when the thread exits.

static struct pthread_key_struct pthread_keys[PTHREAD_KEYS_MAX] ={{0,NULL}};

int pthread_key_create(pthread_key_t *key, void (*destr_function) (void*));

struct pthread_key_struct
{
  /* Sequence numbers. Even numbers indicated vacant entries. Note that zero is even. We use uintptr_t to not require padding on 32- and 64-bit machines. On 64-bit machines it helps to avoid wrapping, too. */
  uintptr_t seq;

  /* Destructor for the data. */
  void (*destr) (void *);
};Copy the code

When a pthread is created, it maintains an array of Pointers that point to data blocks of thread-local variables. The overall deconstruction is as follows:

Create stars

When Looper is created, it does the following:

  1. MWakeEventFd is created through eventfd for interthread communication to wake up Looper, and when it needs to wake up Looper, it writes 1 to it
  2. Create a mEpollFd that listens for epoll_event and initialize the type of epoll_event that mEpollFd listens for
  3. Register mWakeEventFd with mEpollFd via epoll_ctl, and wake Looper when mWakeEventFd has an event to read
  4. If mRequests is not empty, a Request for listening fd was previously registered. Request in mRequests is iterated, initialized as epoll_event, registered with mEpollFd through epoll_ctl, and Looper is also woken up when there is a readable event
Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); ... rebuildEpollLocked(); }voidStars: : rebuildEpollLocked () {...// Allocate the new epoll instance and register the wake pipe.mEpollFd = epoll_create(EPOLL_SIZE_HINT); ... struct epoll_event eventItem; memset(& eventItem,0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    intresult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); ...for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        ……
    }
}Copy the code

pollOnce

For the Native layer Looper pollOnce, finding its function definition is a little more cryptic. It states in looper. h, Inline to pollOnce(int timeoutMillis, int outFd, int outEvents, void** outData), which does the following:

  1. Response in mResponses, that is, events from FD, are preferentially processed
  2. If there is no Response to be processed, call pollInner again
inline int pollOnce(int timeoutMillis) { return pollOnce(timeoutMillis, NULL, NULL, NULL); } int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; If (ident >= 0) {... return ident; } } if (result ! = 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); # endif... return result; } result = pollInner(timeoutMillis); }}Copy the code

The pollInner function is longer, and it does the following:

  1. Adjust the interval between Message fetching timeoutMillis based on the next Message
  2. Empty mResponses
  3. Gets the epoll event, the Message to be processed
  4. Update mPolling to prevent idle
  5. Perform validity check
  6. If the fd of epoll_event is mWakeFd, it is Looper’s wake event and Looper is woken up
  7. Otherwise, epoll_event is first encapsulated as Request, the event type of epoll_event is updated, and then encapsulated as Response to load mResponses
  8. Loop out the MessageEnvelope of mMessageEnvelopes header, and hand the Message in the MessageEnvelope to the Handler of the corresponding Native layer for processing
  9. Callback of all responses in mResponses is called through loop

This is the end of learning about Android messaging.

digression

If you find my sharing helpful, buy me a snack/cup of coffee