• Implementation in Java
    • ThreadLocal
    • MessageQueue
      • Asynchronous messages and message barriers
        • MessageQueue.postSyncBarrier
        • Consumption of message barriers
        • The role of asynchronous messages
    • Looper
    • Handler
    • MessageQueue.IdleHandler
  • The realization of the Native
    • MessageQueue
    • Looper# creates an epoll event
    • epoll
    • nativePollOnce
    • nativeWake
    • The realization of the postDelay
  • HandlerThread
  • IntentService
  • References and questions

The outline

I strongly recommend reading Gityuan’s article

  • Handler
  • Looper
  • ThreadLocal
  • MessageQueue
  • HandlerThread
  • IntentService
  • MessageQueue.IdleHandler
    • Do you know android messagequeue.idleHandler?

In short, a message in a Looper will be temporarily processed, the interface will be called back to false, it will be removed, return true, and the callback will continue the next time the message is processed

/** * Callback interface for discovering when a thread is going to block * waiting for more messages. */ public static interface IdleHandler { /** * Called when the message queue has run out of messages and will now * wait for more. Return  true to keep your idle handler active, false * to have it removed. This may be called if there are still messages * pending in the queue, but they are all scheduled to be dispatched * after the current time. */ boolean queueIdle(); }Copy the code

Implementation in Java

The four classes involved:

  • Handler
  • Looper
  • ThreadLocal
  • MessageQueue

ThreadLocal

Each thread has some variables associated with it, and ThreadLocal’s job is to store these variables. All variables are stored via the internal static class Value. Although all threads access the same ThreadLocal, each thread holds a separate variable:

public void set(T value) {    
      Thread currentThread = Thread.currentThread();    
      Values values = values(currentThread);    
      if (values == null) {        
            values = initializeValues(currentThread);    }    
      values.put(this, value);
}
Copy the code

In the set method above, the values method returns the localValues member variable for the current thread:

/** 
  * Gets Values instance for this thread and variable type. 
  */
Values values(Thread current) {    
      return current.localValues;
}
Copy the code

So what are the localValues inside the thread?

public class Thread implements Runnable { /** * Normal thread local values. */ ThreadLocal.Values localValues; /* omit some code */}Copy the code

As you can see, localValues in Thread are thread-local variables defined in ThreadLocal. If null is obtained in the values () method, the initializeValues method is executed. How is initializeValues implemented?

Values initializeValues(Thread current) {    
      return current.localValues = new Values();
}
Copy the code

Then place the value of value in localValues for the current thread. Thus, even though it looks like a ThreadLocal is being accessed, the resulting value is different from thread to thread.

Note: The internal implementation of ThreadLocal varies from SDK to SDK. For example, the implementation of ThreadLocal in version 6.0 is not the same as above, but the principle is the same

For example

public class JsonTestMetaData {
    public String json_str;
}

public class MainActivity extends Activity {

    ThreadLocal<JsonTestMetaData> local = new ThreadLocal<>();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        Log.d(TAG, "onCreate");
       
        JsonTestMetaData data = new JsonTestMetaData();
        data.json_str = "main_thread";
        local.set(data);

        Log.e(TAG, local.get().json_str);

        new Thread(new Runnable() {
            @Override
            public void run() {
                JsonTestMetaData data = new JsonTestMetaData();
                data.json_str = "other_thread";
                local.set(data);
                Log.e(TAG, local.get().json_str);
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                if (local.get() != null) {
                    Log.e(TAG, local.get().json_str);
                } else {
                    Log.e(TAG, "local.get() is null");

                }
            }
        }).start();


        Log.e(TAG, local.get().json_str);

    }
}
Copy the code

Results obtained:

01-09 14:28:36. 410, 29303-29303 / com. Xx. App. Javabcsxtest E/MainActivity: Main_thread 01-09 14:28:36. 412, 29303-29303 / com. Xx. App. Javabcsxtest E/MainActivity: Main_thread 01-09 14:28:36. 412, 29303-29331 / com. Xx. App. Javabcsxtest E/MainActivity: Other_thread 01-09 14:28:36. 413, 29303-29332 / com. Xx. App. Javabcsxtest E/MainActivity: local. The get () is nullCopy the code

TheadLocal may cause a memory leak if Thread does not exit.

MessageQueue

MessageQueue is a Message queue containing the member variable Message mMessages; You can think of it as the head of a linked list. Storage is not a queue, but a singly linked list. Internal contains 5 native methods:

private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native static void nativePollOnce(long ptr, int timeoutMillis);
private native static void nativeWake(long ptr);
private native static boolean nativeIsIdling(long ptr);
Copy the code

The bottom level is done through native code. The latter part is expanded.

At the Java level, mainly

  • nextMethod to get the next message;
  • enqueueMessageInserts the message into the queue.

Asynchronous messages and message barriers

This code is found in the scheduleTraversals method of ViewRootImpl

void scheduleTraversals() { if (! mTraversalScheduled) { mTraversalScheduled = true; mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier(); mChoreographer.postCallback( Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null); if (! mUnbufferedInputDispatch) { scheduleConsumeBatchedInput(); } notifyRendererOfFramePending(); pokeDrawLockIfNeeded(); }}Copy the code

Found that a message barrier was added using MessageQueue

MessageQueue.postSyncBarrier

The code is relatively simple, just insert a Msg into the queue, when is systemclock.uptimemillis (), and this Msg has no target

public int postSyncBarrier() { return postSyncBarrier(SystemClock.uptimeMillis()); } private int postSyncBarrier(long when) { // Enqueue a new sync barrier token. // We don't need to wake the queue because the purpose of a barrier is to stall it. synchronized (this) { final int token = mNextBarrierToken++; final Message msg = Message.obtain(); msg.markInUse(); msg.when = when; msg.arg1 = token; Message prev = null; Message p = mMessages; if (when ! = 0) { while (p ! = null && p.when <= when) { prev = p; p = p.next; } } if (prev ! = null) { // invariant: p == prev.next msg.next = p; prev.next = msg; } else { msg.next = p; mMessages = msg; } return token; }}Copy the code

Consumption of message barriers

MessageQueue’s next method, if a message barrier is encountered, looks for the first asynchronous message after the linked list and processes the asynchronous message first

Message next() { nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { Message prevMsg = null; Message msg = mMessages; if (msg ! = null && msg.target == null) { do { prevMsg = msg; msg = msg.next; } while (msg ! = null && ! msg.isAsynchronous()); } if (msg ! = null) {if (now < msg.when) { NextPollTimeoutMillis = (int) math.min (MSG. When - now, integer.max_value); } else {// The asynchronous message is removed from the list and returned when it is time to process it. mBlocked = false; if (prevMsg ! = null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { nextPollTimeoutMillis = -1; } / /... }}Copy the code

The role of asynchronous messages

Take a look at the annotations for the setAsynchronous method of Message

A synchronization barrier is introduced during the View’s invalidate redrawing, which ensures that asynchronous messages are processed first. ?

Certain operations, such as view invalidation, may introduce synchronization barriers into the {@link Looper}’s message queue to prevent subsequent messages from being delivered until some condition is met. In the case of view invalidation, messages which are posted after a call to {@link android.view.View#invalidate} are suspended by means of a synchronization barrier until the next frame is ready to be drawn. The synchronization barrier ensures that the invalidation request is completely handled before resuming.

Asynchronous messages are exempt from synchronization barriers. They typically represent interrupts, input events, and other signals that must be handled independently even while other work has been suspended.

Looper

What does Looper do? Looper’s job is to create a MessageQueue for a thread, bind to that thread, and execute a message loop for that thread.

The Looper contains references to MessageQueue and Thread

MessageQueue is created in the prepare method and the loop is started in the loop method.

Java layer Looper and MessageQueue have corresponding classes in C++, namely Looper (Native) and NativeMessageQueue classes respectively

The thread does not have looper by default, unless you call prepare on the thread before executing loop for message processing.

What did Prepare do?

private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() ! = null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }Copy the code

You can see that the prepare() method can only be called once. At the end, a Looper is created and stored in ThreadLocal. How was Looper created?

private Looper(boolean quitAllowed) {    
      mQueue = new MessageQueue(quitAllowed);    
      mThread = Thread.currentThread();
}
Copy the code

As you can see, the constructor is private, a MessageQueue is created, and the mThread is the current thread. So how does Looper execute the message loop?

Public static void loop() {/* final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; for (;;) { Message msg = queue.next(); msg.target.dispatchMessage(msg); msg.recycleUnchecked(); }}Copy the code

As you can see, through an infinite loop, messages are kept in the message queue and sent to the specified location.

The target of Message is actually a Handler

So, in your writing thread, you can use:

class LooperThread extends Thread { public Handler mHandler; public void run() { Looper.prepare(); mHandler = new Handler() { public void handleMessage(Message msg) { // process incoming messages here } }; Looper.loop(); }}Copy the code

However, the above method is too low, in the code is not convenient, can write:

HandlerThread

HandlerThread thread = new HandlerThread("new_handler_thread"); Handler handler = new Handler(thread.getLooper(), new Handler.Callback() { @Override public boolean handleMessage(Message msg) { return false; }});Copy the code

Handlerthreads inherit from Threads and create a Looper of their own.

For details about HandlerThread, see Android HandlerThread

Most of the interaction with the message loop is done through handlers, as you saw in the Handler section. Remember to call Looper’s quit method when you are no longer executing the message loop.

Handler

The Handler sends and processes the Message or Runnable associated with a MessageQueue. Each Handler is associated with a separate thread, which is the thread on which you created the Handler, and the MessageQueue that needs to be handled is that thread’s MessageQueue.

Please look at:

Public Handler(Callback Callback, Boolean async) {/* omit some code */ mLooper = looper.mylooper (); if (mLooper == null) { throw new RuntimeException( "Can't create handler inside thread that has not called Looper.prepare()"); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async; } public Handler(Looper looper, Callback callback, boolean async) { mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; }Copy the code

Handler has several constructors, and this constructor will eventually be called. You can see that the Looper members in Handler are obtained from the static method myLooper. What does myLooper do? You can look at the contents of Looper and get a Looper associated with this thread in the code above. If the mLooper member is null, an exception is thrown. You might ask, I created a handler in my activity without calling looper.mylooper (). That’s because while your application is running, Android has already created it from Looper’s static method prepareMainLooper, which can only be executed once or it will throw an exception. This Looper is thread bound, and if you look at mQueue, you get it from mLooper.

The order of the calls is as follows:

  • Is the callback of Message null? The Message callback is a Runnable object passed through the Handler’s POST method.
  • Is the Handler callback null? Instead of calling callback. The type of the callback here isHandler.Callback()
  • The Handler’s handleMessage method is eventually called.
/** * Handle system messages here. */ public void dispatchMessage(Message msg) { if (msg.callback ! = null) { handleCallback(msg); } else { if (mCallback ! = null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); }}Copy the code

MessageQueue.IdleHandler

  • Do you know android messagequeue.idleHandler?

In short, a message in a Looper will be temporarily processed, the interface will be called back to false, it will be removed, return true, and the callback will continue the next time the message is processed

The realization of the Native

See Gityuan – Android messaging mechanism 2-Handler(Native layer)

MessageQueue

MessageQueue.java

MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); [2]} private native static long nativeInit(); private native static void nativeDestroy(long ptr); private native void nativePollOnce(long ptr, int timeoutMillis); private native static void nativeWake(long ptr); private native static boolean nativeIsPolling(long ptr); private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);Copy the code

The Java layer mPtr is actually a pointer to the native layer address for a reinterpret_cast

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, Jclass clazz) {// Initialize native message queue [3] NativeMessageQueue* NativeMessageQueue = new NativeMessageQueue(); nativeMessageQueue->incStrong(env); Return reinterpret_cast<jlong>(nativeMessageQueue); }Copy the code

Unlike Java, native MessageQueue has a looper inside

NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); If (mLooper == NULL) {mLooper = new Looper(false); // Create native Looper [4] Looper::setForThread(mLooper); // Save native layer Looper to TLS}}Copy the code
  • NativeInit () method:
    • The NativeMessageQueue object is created, its reference count is increased, and the NativeMessageQueue pointer mPtr is saved in MessageQueue in the Java layer
    • Create a Native Looper object
    • Call epoll_create()/epoll_ctl() of epoll to listen for readable events for mWakeEventFd and mRequests
  • NativeDestroy () method
    • Call RefBase::decStrong() to reduce the object’s reference count
    • When the reference count is 0, the NativeMessageQueue object is deleted
  • NativePollOnce () method
    • This is done by calling Looper::pollOnce() and staying idle in the epoll_wait() method, which is used to wait for events to occur or timeout
  • NativeWake () method
    • Call Looper:: Wake () to do this, writing characters to the pipe mWakeEventfd;

Looper# creates an epoll event

Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK); // Construct the fd AutoMutex _l(mLock); rebuildEpollLocked(); Epoll [5]}Copy the code

eventfd

void Looper::rebuildEpollLocked() { if (mEpollFd >= 0) { close(mEpollFd); MEpollFd = epoll_create(EPOLL_SIZE_HINT); // Create a new epoll instance and register the wake pipeline struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); Eventitem. events = EPOLLIN; eventitem. events = EPOLLIN; // Eventitem.data. fd = mWakeEventFd; // Add the wake up event (mWakeEventFd) to the epoll instance (mEpollFd) int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); Int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); }}Copy the code

epoll

Gityuan-select /poll/epoll Comparative analysis

The maximum number of FD descriptors supported is the maximum number of files that can be opened. The specific number can be checked by cat /proc/sys/fs/file-max. Generally, this number depends on the system memory, for 3G mobile phones, this value is 200,000-300,000.

IO performance does not decrease as the number of monitoring FDS increases. Epoll differs from select and poll polling in that it is implemented through callbacks defined by each FD, which are only executed by ready FDS.

nativePollOnce

Message next() {
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }

    for (;;) {
        ...
        nativePollOnce(ptr, nextPollTimeoutMillis); //阻塞操作 【2】
        ...
    }
Copy the code

android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, Jint timeoutMillis) {// Convert the mPtr passed down from the Java layer to nativeMessageQueue * nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); [3]}Copy the code

android_os_MessageQueue.cpp

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); 【4】 mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; }}Copy the code

Looper.h

inline int pollOnce(int timeoutMillis) { return pollOnce(timeoutMillis, NULL, NULL, NULL); [5]}Copy the code

Looper.cpp

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) While (mResponseIndex < mResponses. Size ()) {const Response& Response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; If (ident >= 0) {POLL_CALLBACK = -1, int fd = response.request.fd; int events = response.events; void* data = response.request.data; if (outFd ! = NULL) *outFd = fd; if (outEvents ! = NULL) *outEvents = events; if (outData ! = NULL) *outData = data; return ident; } } if (result ! = 0) { if (outFd ! = NULL) *outFd = 0; if (outEvents ! = NULL) *outEvents = 0; if (outData ! = NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); [6]}}Copy the code

nativeWake

Call Looper:: Wake () to do this, writing characters to the pipe mWakeEventfd; (Epoll wake up event)

boolean enqueueMessage(Message msg, long when) { ... // Insert Message chronologically into MessageQueue if (needWake) {nativeWake(mPtr); [2]}}Copy the code
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); [3]}Copy the code
void NativeMessageQueue::wake() { mLooper->wake(); [4]}Copy the code
void Looper::wake() { uint64_t inc = 1; Ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(Uint64_t)))); if (nWrite ! = sizeof(uint64_t)) { if (errno ! = EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); }}}Copy the code

The realization of the postDelay

  • 1. Call sendMessageAtTime to calculate the current time and delay time, and call enqueMessage of MessageQueue to join the queue
  • 2. The enqueueMessage of MessageQueue will traverse the queue and calculate the position to insert into the queue according to when of Message. If the next infinite loop is blocked, the native method will be called to wake up NativeMessageQueue
  • 3. The method in which nativePollOnce passes the time to looper.cpp is actuallyepoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);That’s the blocking time of epoll.
public final boolean postDelayed(Runnable r, long delayMillis)
{
    return sendMessageDelayed(getPostMessage(r), delayMillis);
}

public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
 public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}   
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
} 
Copy the code
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is  a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr ! = 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }Copy the code

HandlerThread

Looper.prepare() is called when Thread runs. There are two important approaches

public Looper getLooper()

public Handler getThreadHandler()

Call quit() after use

IntentService

OnCreate () creates the HandlerThread and starts it. The ServiceHandler of the Looper that created the HandlerThread Overide’s onHandleIntent method is executed in an asynchronous thread

References and questions

  • Why doesn’t the main thread in Android get stuck because of the dead loop in looper.loop ()?
  • Gityuan – Android Messaging mechanism 1-Handler(Java Layer)
  • Gityuan – Android Messaging 2-Handler(Native Layer)
  • Gityuan-select /poll/epoll Comparative analysis
  • Gityuan – Source code interpretation of epoll kernel mechanism