Messaging is one of the two cornerstones of Android, the Binder IPC mechanism and messaging. Android system uses a large number of message-driven ways to interact. For example, the startup process of the four major components of Android (Activity, Service, BroadcastReceiver, and ContentProvider) is inseparable from the message mechanism. In a sense it can also be said to be message driven.

Message mechanism in Java level mainly involves Handler, Looper, MessageQueue, Message these four classes.

  • Message: Messages are divided into hardware generated messages (such as buttons and touches) and software generated messages. Where Message holds the target (Handler) for Message processing;
  • MessageQueue: message queue’s main function is to (MessageQueue. EnqueueMessage) message inserted into the pool and take messages pool (MessageQueue. Next); MessageQueue holds a list of messages to be processed (one-way linked list).
  • Handler: A message helper class that sends various message events to the message pool (handler. sendMessage) and handles the corresponding message events (handler. handleMessage); Handler holds Looper and MessageQueue.
  • Looper: Execute continuously (looper.loop) to distribute the message to the target handler according to the distribution mechanism; Looper holds the MessageQueue MessageQueue.

How is the Handler mechanism used

A typical usage scenario is as follows:

class LooperThread extends Thread {
    public Handler mHandler;

    public void run(a) {
        // Step 1: Initialize Looper
        Looper.prepare(); 
		
        // Step 2: Initialize Handler
        mHandler = new Handler() { 
            public void handleMessage(Message msg) {
                // Different types of messages are processed here}};// Step 3: Start the message loopLooper.loop(); }}Copy the code

Let’s take a look at how the system source code uses Handler:

// API version 28
// android.app.ActivityThread.java

public static void main(String[] args) {
    / /... Omit some irrelevant code

    // Step 1: Initialize main thread Looper
    Looper.prepareMainLooper();

    / /... Omit some irrelevant code

    // Step 2: Initialize the system default Handler
    if (sMainThreadHandler == null) {
        sMainThreadHandler = thread.getHandler();
    }

    / /... Omit some irrelevant code

    // Step 3: Start the main thread message loop
    Looper.loop();

    throw new RuntimeException("Main thread loop unexpectedly exited");
}
Copy the code

By comparing the above two sections of code, we can see that all other steps are the same except the initialization of Looper. Simply put, when we call prepare() with no default parameters, we call the overloaded prepare(true) method, which means the Looper is allowed to exit. The system call prepareMainLooper() actually calls prepare(false), which means that the main thread Looper is not allowed to exit.

Step 1: Initialize Looper

The prepare(Boolean) method declaration is as follows:

private static void prepare(boolean quitAllowed) {
    if(sThreadLocal.get() ! =null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}
Copy the code

We found that each thread is allowed to execute this method only once. When executed multiple times, the thread already has a Looper object in TLS, it will throw an exception. The Looper object we create is stored in the TLS zone of the current thread.

ThreadLocal

ThreadLocal: each Thread has its own private Local Storage area. Different threads cannot access each other’s TLS area.

Threadlocal.set (T value) threadlocal.set (T value)

// API version 28
// java.lang.ThreadLocal.java

public void set(T value) {
    Thread t = Thread.currentThread(); // Get the current thread
    ThreadLocalMap map = getMap(t); // Find the local store of the current thread, which is the t.htreadlocals variable
    if(map ! =null)
        // Take the current ThreadLocal object as the key and store the value in TLS
    	map.set(this, value);
    else
        // If the current thread local store is not initialized, create a ThreadLocalMap object and store the value in it
    	createMap(t, value);
}
Copy the code

Threadlocal.get () : threadlocal.get () : threadlocal.get () : threadlocal.get () : threadlocal.get () :

// API version 28
// java.lang.ThreadLocal.java

public T get(a) {
    Thread t = Thread.currentThread(); // Get the current thread
    ThreadLocalMap map = getMap(t); // Find the local store of the current thread, which is the t.htreadlocals variable
    if(map ! =null) {
    	// Use the current ThreadLocal object as the key to find the corresponding data stored in TLS
        ThreadLocalMap.Entry e = map.getEntry(this);
        if(e ! =null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            returnresult; }}// If TLS is null, TLS is initialized and initialValue() is called to initialize the default value, which is null
    return setInitialValue();
}
Copy the code

ThreadLocal’s get() and set() methods make generic types. Looper defines sThreadLocal as sThreadLocal.

// API version 28
// android.os.Looper.java

// sThreadLocal.get() will return null unless you've called prepare().
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
Copy the code

Looper constructor: Looper constructor: Looper constructor

// API version 28
// android.os.Looper.java

private Looper(boolean quitAllowed) {
	// Create a MessageQueue object
    mQueue = new MessageQueue(quitAllowed);
    // Record the current created thread
    mThread = Thread.currentThread();
}
Copy the code

MessageQueue MessageQueue MessageQueue

// API version 28
// android.os.MessageQueue.java

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    // Initialize the message queue via native methods, where mPtr is a pointer reference for use by the Native layer
    mPtr = nativeInit();
}
Copy the code

There are many native methods involved in MessageQueue. In addition to the native method of MessageQueue, the Native layer itself also has a complete message mechanism for processing native messages. In the whole message mechanism, MessageQueue is the link between Java layer and native layer. In other words, Java layer can add messages to MessageQueue MessageQueue, and native layer can add messages to MessageQueue MessageQueue.

The native methods in MessageQueue are as follows:

// API version 28
// android.os.MessageQueue.java

private native static long nativeInit(a);
private native static void nativeDestroy(long ptr);
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
Copy the code

Where, the call chain of nativeInit() initialization process is as follows:

Among them, in the construction of native layerLooper.cppIs calledLooper::rebuildEpollLocked()Method, source code as follows:

// API version 28
// system/core/libutils/Looper.cpp

void Looper::rebuildEpollLocked(a) {
    // Close old epoll instance if we have one.
    if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set".this);
#endif
        close(mEpollFd); // Close the old epoll instance
    }

    // Allocate the new epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT); // Create a new epoll instance and register the WAKE pipe
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0."Could not create epoll instance: %s".strerror(errno));

    struct epoll_event eventItem;
    // Set the unused data area to 0
    memset(& eventItem, 0.sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN; // Readable events
    eventItem.data.fd = mWakeEventFd;
    // Add the wake event (mWakeEventFd) to the epoll instance (mEpollFd)
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result ! =0."Could not add wake event fd to epoll instance: %s".strerror(errno));

    for (size_t i = 0; i < mRequests.size(a); i++) {const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        // Add events from the Request queue to the epoll instance, respectively
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
                  request.fd, strerror(errno)); }}}Copy the code

In the source code of the native layer mentioned above, a very important mechanism is involved: IO multiplexing epoll mechanism.

Epoll mechanism

Select /poll/epoll are both IO multiplexing mechanisms in the Linux kernel, which can monitor multiple descriptors at the same time. When a descriptor is ready (read or write ready), the corresponding program is immediately notified to read or write operations. Essentially select/poll/epoll are synchronous I/O, that is, read and write are blocked.

Select faults:

  • The select call requires passing in a FD array and copying it into the kernel, which can be a huge resource drain in high-concurrency scenarios. (Can be optimized to not copy)

  • Select, at the kernel level, still checks the ready state of file descriptors by traversing them, a synchronous process without the overhead of system call context switching. (Kernel layer can be optimized for asynchronous event notification)

  • Select simply returns the number of file descriptors that can be read, which one the user must traverse. (Can be optimized to return only user-ready file descriptors without the user doing invalid traversal)

  • Limited number of file descriptors: There is a maximum limit on the number of file descriptors that can be monitored by a single process. On Linux, it is usually 1024. You can modify the macro definition to increase the upper limit, but it is also inefficient.

  • Severe performance degradation: The I/O performance deteriorates linearly as the number of monitored descriptors increases.

The entire select process is as follows:

As you can see, this approach allows one thread to handle multiple client connections (file descriptors) while reducing the overhead of system calls (multiple file descriptors have only one SELECT system call + n ready-to-state file descriptors).

Poll faults:

  • The main difference between poll and SELECT is that the limit of 1024 file descriptors is removed. Select and poll both need to iterate through the file descriptor to get the socket ready on return. A large number of clients connected at the same time may have few ready at any one time, so performance deteriorates linearly as the number of monitored descriptors increases.

Epoll was introduced in Linux kernel 2.6 and is an enhanced version of SELECT and poll. Compared to select and poll, epoll is more flexible and has no limit on the number of descriptors. Epoll uses a single file descriptor to manage multiple descriptors and stores user-space file descriptor events into a list of kernel events so that copying in user-space and kernel space takes only 1 time. The epoll mechanism, the most efficient IO multiplexing mechanism in Linux, waits for IO events with multiple file handles in one place.

Epoll is the optimal solution, which solves some problems with SELECT and poll.

Remember the three details of select from above?

  • The select call requires passing in a FD array and copying it into the kernel, which can be a huge resource drain in high-concurrency scenarios. (Can be optimized to not copy)
  • Select, at the kernel level, still checks the ready state of file descriptors by traversing them, a synchronous process without the overhead of system call context switching. (Kernel layer can be optimized for asynchronous event notification)
  • Select simply returns the number of file descriptors that can be read, which one the user must traverse. (Can be optimized to return only user-ready file descriptors without the user doing invalid traversal)

So Epoll mainly improves on these three points.

  • A collection of file descriptors is kept in the kernel, and the user doesn’t need to re-pass it each time, just tell the kernel what has been changed.
  • Instead of polling to find ready file descriptors, the kernel wakes them up with asynchronous IO events.
  • The kernel only returns file descriptors with IO events to the user; the user does not need to traverse the entire set of file descriptors.

Select /poll has only one method. Epoll has three methods:epoll_create(),epoll_ctl(),epoll_wait().

epoll_create()
int epoll_create(int size);Copy the code

Size refers to the number of listening descriptors. Now the kernel supports dynamic expansion. This value only means the number of FDS allocated for the first time. After the epoll handle is created, a fd value is occupied. After using epoll, you must call close() to turn it off, or you may run out of FDS.

epoll_ctl()
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);Copy the code

Used to perform op operations on the file descriptor (FD) that you want to listen on, such as adding the FD to the epoll handle.

  • Epfd: is the return value of epoll_create();
  • Op: indicates the op operation. It is represented by three macros, which respectively represent adding, deleting, and modifying the listening events of fd: EPOLL_CTL_ADD(add), EPOLL_CTL_DEL(delete), and EPOLL_CTL_MOD (modify).
  • Fd: the file descriptor to listen on.
  • Epoll_event: the event to be monitored.

Struct epoll_event struct epoll_event

struct epoll_event {
	__uint32_t events;  /* Epoll event */
	epoll_data_t data;  /* User available data */
};
Copy the code

Events (representing the actions of the corresponding file descriptor) can take the following values:

  • EPOLLIN: readable (including that the peer SOCKET is normally closed).
  • EPOLLOUT: writable.
  • EPOLLERR: error.
  • EPOLLHUP: interrupts.
  • EPOLLPRI: high priority readable (this should indicate additional data coming);
  • EPOLLET: Set EPOLL to edge trigger mode, as opposed to horizontal trigger mode;
  • EPOLLONESHOT: listens for only one event, and then no longer listens for the event after listening for this event.
epoll_wait()
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
Copy the code

Waiting for an event to be reported. This function returns the number of events that need to be processed, such as 0 indicating timeout.

  • Epfd: waits for I/O events on an EPFD and returns a maximum of maxEvents.
  • Events: a collection used to get events from the kernel;
  • Maxevents: Number of events. The value of maxEvents cannot be greater than the value createdepoll_create()When the size;
  • Timeout: timeout period (milliseconds, 0 is returned immediately).

In the select/poll mechanism, the kernel scans all monitored file descriptors only after a certain method is called, whereas epoll registers a file descriptor through epoll_ctl(). Once a file descriptor is ready, the kernel uses a callback mechanism similar to callback. Activate this file descriptor quickly, and the process is notified when it calls epoll_wait() (the traversal file descriptor is removed here, but instead the mechanism of listening for callbacks. That’s the magic of epoll.) .

Epoll advantage
  • There is no limit to the number of descriptors monitored. The maximum number of FDS supported is the maximum number of files that can be openedcat /proc/sys/fs/file-maxCommand view, generally speaking, this number and system memory is very much related to the mobile phone with 3G memory, this value is 200,000 to 300,000;
  • IO performance does not decrease as the number of monitoring FDS increases. Epoll differs from polling in SELECT and Poll. Instead, epoll is implemented through a callback function defined by each FD. Only ready FDS execute the callback function.
  • Without a large number of idle or dead connections, epoll is not much more efficient than SELECT /poll; However, in the case of a large number of idle connections, epoll is much more efficient than SELECT and poll.

Step 2: Initialize the Handler

Handler initializer (); Handler initializer ();

// API version 28
// android.os.Handler.java

public Handler(a) {
    this(null.false);
}

public Handler(Callback callback, boolean async) {
	// An anonymous class, an inner class, or a local class must all be declared static, otherwise it will warn of a possible memory leak
    if (FIND_POTENTIAL_LEAKS) {
        final Class<? extends Handler> klass = getClass();
        if((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) = =0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: "+ klass.getCanonicalName()); }}Looper.prepare() must be executed first. To get the Looper object, otherwise null.
    mLooper = Looper.myLooper(); // Get the Looper object from TLS of the current thread
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread " + Thread.currentThread()
                    + " that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue; // Assign the message queue in Looper to the Handler internal member variable
    mCallback = callback; // Callback method
    mAsynchronous = async; // Sets whether to be an asynchronous Handler. When this value is true, all messages sent through this Handler are asynchronous messages
}
Copy the code

When we build the Handler, we don’t do anything special. We just get the Looper and MessageQueue assigned by the current thread to its own member variables.

Step 3: Start the message loop

Looper.loop() : looper.loop () : looper.loop () : looper.loop ()

// API version 28
// android.os.Looper.java

public static void loop(a) {
    final Looper me = myLooper(); // Get the Looper object stored in TLS
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue; // Get the message queue in the Looper object

    // Make sure the identity of this thread is that of the local process,
    // and keep track of what that identity token actually is.
    Binder.clearCallingIdentity();
    // Make sure to base permission checks on local processes instead of calling processes.
    final long ident = Binder.clearCallingIdentity();

    / /... Omit some irrelevant code

    for (;;) { // Enter the main loop method of the loop
    	// It may block when retrieving messages through MessageQueue
        Message msg = queue.next(); // might block
        if (msg == null) { // When the message is null, the message loop is terminated
            // No message indicates that the message queue is quitting.
            return;
        }

        // This must be in a local variable, in case a UI event sets the logger
        final Printer logging = me.mLogging; // The default value is null. SetMessageLogging () can be used to specify output for debugging
        if(logging ! =null) {
            logging.println(">>>>> Dispatching to " + msg.target + "" +
                    msg.callback + ":" + msg.what);
        }

        / /... Omit some irrelevant code

        try {
            msg.target.dispatchMessage(msg); // Message is distributed through the Handler in Message
            dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
        } finally {
            if(traceTag ! =0) { Trace.traceEnd(traceTag); }}/ /... Omit some irrelevant code

        // Make sure that during the course of dispatching the
        // identity of the thread wasn't corrupted.
        // Make sure that the thread identity is not changed during scheduling
        final long newIdent = Binder.clearCallingIdentity();
        if(ident ! = newIdent) { Log.wtf(TAG,"Thread identity changed from 0x"
                    + Long.toHexString(ident) + " to 0x"
                    + Long.toHexString(newIdent) + " while dispatching to "
                    + msg.target.getClass().getName() + ""
                    + msg.callback + " what=" + msg.what);
        }

        // Reclaim the Message and place it in the Message cache pool within Messagemsg.recycleUnchecked(); }}Copy the code

Through the above source code, we can see that the main core logic of loop() method is:

  • Read the next Message from MessageQueue;
  • The target in Message is called in the Handler objectdispatchMessage()To distribute messages;
  • Finally, after the Message is used, the Message is recycled into the cache pool for reuse.

MessageQueue () : messagequeue.next () : messagequeue.next ()

// API version 28
// android.os.MessageQueue.java

Message next(a) {
    // Return here if the message loop has already quit and been disposed.
    // This can happen if the application tries to restart a looper after quit
    // which is not supported.
    final long ptr = mPtr;
    if (ptr == 0) { // When the message loop has exited, null is returned
        return null;
    }

    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if(nextPollTimeoutMillis ! =0) {
            Binder.flushPendingCommands();
        }

        // This call is the core logic of the Looper thread sleep; This method returns when the nextPollTimeoutMillis has been waited for, or when the message queue has been awakened
        // When nextPollTimeoutMillis is -1, it indicates that there are no messages to be processed in the message queue. The message queue will sleep until it is woken up
        // When nextPollTimeoutMillis is 0, the native layer's epoll_wait() method is finally executed, which returns immediately and does not go to sleep
        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            // Try to retrieve the next message. Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if(msg ! =null && msg.target == null) {
                // Stalled by a barrier. Find the next asynchronous message in the queue.
                
                // Special message type, indicating that there is a synchronization barrier in the message queue; This logic finds the first asynchronous message behind the synchronization barrier
                // If no asynchronous message is found, nextPollTimeoutMillis is assigned to -1, and the message queue will block in the next poll
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while(msg ! =null && !msg.isAsynchronous());
            }
            if(msg ! =null) {
                if (now < msg.when) {
                    // Next message is not ready. Set a timeout to wake up when it is ready.
                    // If the trigger time of the message is longer than the current time, set the sleep duration for the next polling
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if(prevMsg ! =null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }

                    // Remove the message from the message queue list
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg; // Successfully obtained the next message to be executed in MessageQueue}}else {
                // No more messages.
                nextPollTimeoutMillis = -1; // There is no message, enter wireless sleep, until wake up
            }

            // Process the quit message now that all pending messages have been handled.
            if (mQuitting) { If MessageQueue is exiting, null is returned
                dispose();
                return null;
            }

            // If first time idle, then get the number of idlers to run.
            // Idle handles only run if the queue is empty or if the first message
            // in the queue (possibly a barrier) is due to be handled in the future.
            
            // The message queue will go to sleep in the next poll
            PendingIdleHandlerCount = pendingIdleHandlerCount = pendingIdleHandlerCount = pendingIdleHandlerCount
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                pendingIdleHandlerCount = mIdleHandlers.size();
            }

            // If there is no IdleHandler to execute, the current message queue is marked as blocked and the next poll is blocked
            if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run. Loop and wait some more.
                mBlocked = true;
                continue;
            }

            // Initialize mPendingIdleHandlers
            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            }
            // Assigns a value to the IdleHandler array task to be run, which is converted to the IdleHandler array via mIdleHandlers
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
        }

        // Run the idle handlers.
        // We only ever reach this code block during the first iteration.
        // The current message queue is idle, execute IdleHandler logic
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; // Release the reference to IdleHandler from the array

            boolean keep = false;
            try {
            	// Executes IdleHandler idle logic, which returns a Boolean value
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf(TAG, "IdleHandler threw exception", t);
            }

            // When IdleHandler does not need to be saved, keep to false and remove the task from mIdleHandlers once
            if(! keep) {synchronized (this) { mIdleHandlers.remove(idler); }}}// Reset the idle handler count to 0 so we do not run them again.
        pendingIdleHandlerCount = 0; // Reset pendingIdleHandlerCount to 0 to ensure that this fetch is not repeated

        // While calling an idle handler, a new message could have been delivered
        // so go back and look again for a pending message without waiting.
        
        // When the IdleHandler finishes executing, it may be time for the next message to be executed, so the next message will be fetched without waiting for the next polling
        // When nextPollTimeoutMillis is 0, the underlying logic of the nativePollOnce() method returns immediately without blocking sleep
        nextPollTimeoutMillis = 0; }}Copy the code

Based on the above source code analysis, it is possible to put the current thread into blocking sleep when calling the nativePollOnce() method, depending on the nextPollTimeoutMillis parameter. The call chain of the nativePollOnce() method is as follows:

Among them, the core logic of native layerLooper::pollOnce()The source code is as follows:

// API version 28
// system/core/libutils/Looper.cpp

/** * @param timeoutMillis [timeout duration] * @param outFd [file descriptor of the event that occurred] * @param outEvents [Event that occurred on the current outFd, including 4 types of events: EVENT_INPUT (readable), EVENT_OUTPUT (writable), EVENT_ERROR (error), and EVENT_HANGUP (interrupt)] * @param outData [context data] */
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
    	// First handle the response event with no callback method
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) { //ident is greater than 0, which means no callback, because POLL_CALLBACK is -2, which is defined in looper.h
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p".this, ident, fd, events, data);
#endif
                if(outFd ! =NULL) *outFd = fd;
                if(outEvents ! =NULL) *outEvents = events;
                if(outData ! =NULL) *outData = data;
                returnident; }}if(result ! =0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d".this, result);
#endif
            if(outFd ! =NULL) *outFd = 0;
            if(outEvents ! =NULL) *outEvents = 0;
            if(outData ! =NULL) *outData = NULL;
            return result;
        }

        // Reprocess the internal polling
        result = pollInner(timeoutMillis); }}Copy the code

The pollOnce() method returns the following values in the source code:

  • POLL_WAKE: indicates that the POLL_WAKE function is enabledwake()Trigger, that is, the write event on the writer side of pipe is triggered.
  • POLL_CALLBACK: indicates that a monitored FD is triggered.
  • POLL_TIMEOUT: wait timeout.
  • POLL_ERROR: Indicates that an error occurred during the wait.

Again, Looper::pollInner() :

int Looper::pollInner(int timeoutMillis) {
	/ /... Omit some irrelevant code

    // Poll.
    int result = POLL_WAKE;
    mResponses.clear(a); mResponseIndex =0;

    // We are about to idle.
    mPolling = true; // About to be in idle state

    struct epoll_event eventItems[EPOLL_MAX_EVENTS]; // The maximum number of fd's is 16
    // Core function: waits for an event to occur or a timeout, which returns when the nativeWake() method is called to write characters to the pipe writer.
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false; // Indicates that the state is no longer idle

    // Acquire lock.
    mLock.lock(a);/ / request lock

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked(a);//epoll reconstructs, directly jumps to Done logic
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s".strerror(errno));
        result = POLL_ERROR; // If the number of epoll events is smaller than 0, an error occurs and the Done logic is switched directly
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout".this);
#endif
        result = POLL_TIMEOUT; // If the number of epoll events is 0, a timeout occurs and the Done logic is switched directly
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds".this, eventCount);
#endif

    // Loop through to handle all events
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                awoken(a);// The pipeline data is read and emptied
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); }}else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;

                // Process the request, generate the corresponding response object, push into the response array
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

    // Invoke pending message callbacks.
    // The native layer Message is processed and the corresponding callback method is called
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes. Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock(a);/ / releases the lock

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d".this, handler.get(), message.what);
#endif
                handler->handleMessage(message); // Handle native layer message events
            } // release handler

            mLock.lock(a);/ / request lock
            mSendingMessage = false;
            result = POLL_CALLBACK; // a callback occurred
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break; }}// Release lock.
    mLock.unlock(a);/ / releases the lock

    // Invoke all response callbacks.
    // Handle a Response event with a Callback() method and execute the corresponding Callback
    for (size_t i = 0; i < mResponses.size(a); i++) { Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p".this, response.request.callback.get(), fd, events, data);
#endif
            // Invoke the callback. Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            
            // The callback method that handles the request
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq); / / remove the fd
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear(a);// Clear the callback method referenced by Response
            result = POLL_CALLBACK; // a callback occurred}}return result;
}
Copy the code

Through the above source code analysis, we find that the main logic of pollInner() method is as follows:

  1. First callepoll_wait(), which is a blocking method that waits for events to occur or times out; When the nativeWake() method is called to write characters to the pipe writer, the method returns;
  2. forepoll_wait()Method if and only if:
  • POLL_ERROR: Indicates that an error occurs.
  • POLL_TIMEOUT: switches to Done when a timeout occurs.
  • If an event is detected in the pipeline, it will be processed according to the situation: If the event is generated by the pipeline reading end, the data of the pipeline will be read directly; If it is another event, the request is processed, the corresponding response object is generated, and the response array is pushed.
  1. Code snippet to enter the Done flag bit:
  • The native layer Message is processed first, and the Handler of the Native layer is called to process the Message.
  • Process the Response array, POLL_CALLBACK event;

From the above flow, we can see that the Request is collected first and put into the Response array instead of executing it immediately. When Done is executed, native Message is processed first and then Request is processed, indicating that the priority of native Message is higher than that of Request Request.

In addition, in the pollOnce() method, the events with no Callback in the Response array are processed first, and then the pollInner() method is called.

Step 4: Send the message

When we send a message via Handler, the call chain is as follows:

Through the above, we found all the messages, finally all private members of the call to the Handler method enqueueMessage (), within the method call again to the MessageQueue. Eventually enqueueMessage ();

Next, we look at the MessageQueue. EnqueueMessage () in the source code:

// API version 28
// android.os.MessageQueue.java

boolean enqueueMessage(Message msg, long when) {
	// Every ordinary Message object must have a target
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
        throw new IllegalStateException(msg + " This message is already in use.");
    }

    synchronized (this) {
        if (mQuitting) { // The current Message queue is exiting, reclaim Message to the Message cache pool
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
            msg.recycle();
            return false;
        }

        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            // If p is null, there is no message in the current message queue.
            // If the current message to be inserted is the earliest in the queue, the message is inserted to the head of the message queue
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue. Usually we don't have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            
        	// Insert messages into the message queue in chronological order. In general, there is no need to wake up the event queue;
        	Wake up is required only when the current message queue is in blocking sleep, the message queue header is a synchronous barrier message, and the current message to be inserted is asynchronous
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

        // We can assume mPtr ! = 0 because mQuitting is false.
        if (needWake) {
            nativeWake(mPtr); // Call the wake up method of native layer when the current message queue needs to wake up}}return true;
}
Copy the code

Through the source code analysis of the Java layer insert message above, we found:

  • When inserting a message, if there is no message in the current message queue, or if the message to be inserted is the first message to be executed in the queue, the message is inserted into the message queue header. If the current message queue is in the blocking sleep state, you need to wake up the message queue.
  • In other cases, when messages are inserted into the message queue in chronological order, there is generally no need to wake up the event queue; You need to wake up the message queue only when the current message queue is in blocking sleep, the message queue header is a synchronous barrier message, and the current message to be inserted is asynchronous.

Where, the invocation chain of nativeWake() is as follows:

Looper:: Wake () is the core method of the Native layer.

// API version 28
// system/core/libutils/Looper.cpp

void Looper::wake(a) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake".this);
#endif

    // Write the character 1 to the pipe mWakeEventFd
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if(nWrite ! =sizeof(uint64_t)) {
        if(errno ! = EAGAIN) {LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno)); }}}Copy the code

In the source code, TEMP_FAILURE_RETRY is a macro definition that is repeated until a write execution fails. After successful execution, the thread in blocking sleep will return from epoll_wait(), waking up the entire message queue to continue processing the message.

conclusion

MessageQueue saves the pointer of NativeMessageQueue object of Native layer through mPtr variable, so that MessageQueue becomes the hub of Java layer and Native layer, and can process messages of both upper layer and Native layer. The following is the mapping between Java layer and Native layer:

In the picture above:

  • Red dotted line relationship: MessageQueue of Java layer and Native layer can be associated with each other through JNI, so that they can call each other. By understanding this intermodulation relationship, we can understand how Java calls C++ code, and how C++ code calls Java code.
  • Blue dotted line relationship: Handler/Looper/Message These three classes of Java layer have no real relationship with Native layer, but have similar functions in the Handler Message model of Java layer and Native layer respectively. They’re all independent of each other, each implementing its own logic.
  • In the Native layer, WeakMessageHandler inherits from MessageHandler class. NativeMessageQueue inherits from MessageQueue.

In addition, the process of Message processing is to first process the Message of Native layer, then process the Request of Native layer, and finally process the Message of Java layer. Once you understand the process, you can see the real reason why sometimes the response time is long with very little upper-level information.

Reference links:

Android synchronization barrier mechanism

Comparison of the select, poll, and epoll mechanisms

Android Messaging -Handler (Java Layer)

Android Messaging -Handler (Native Layer)

You call this shit IO multiplexing?

Dig deep into how ePoll implements IO multiplexing!