Android interthread messaging mechanism
The principle of creating message queues for threads
Can a Handler be created on a child thread?
Create handler in child thread:
Thread {
Handler()
}.start()
Copy the code
The following exception is thrown:
Handler cannot be created in a thread that has not called prepare.
Why is this exception thrown? Let’s go to the source code.
Here is the Handler constructor:
public Handler(a) {
this(null.false);
}
Copy the code
public Handler(Callback callback, boolean async) {...// Get the current thread's Looper object, which is stored in ThreadLocal
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
Copy the code
Looper is created in the prepare method of the Looper class:
// There is one parameter, meaning whether to allow exit
private static void prepare(boolean quitAllowed) {
if(sThreadLocal.get() ! =null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
Copy the code
// Create looper for the main thread
public static void prepareMainLooper(a) {
// The main thread looper cannot exit, so pass the parameter false
prepare(false);
synchronized (Looper.class) {
if(sMainLooper ! =null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
// The main thread looper is created successfully and stored in a static variable, which can be retrieved at any timesMainLooper = myLooper(); }}Copy the code
Looper constructor:
private Looper(boolean quitAllowed) {
/ / create a MessageQueue
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
Copy the code
MessageQueue constructor:
MessageQueue(boolean quitAllowed) {
// Save the variable to allow exit
mQuitAllowed = quitAllowed;
// The real initialization is done in the native layer
mPtr = nativeInit();
}
Copy the code
The following figure shows the relationship between threads, Looper, MessageQueue, and Handler:
A Looper is stored in a ThreadLocal. A Looper is stored in a MessageQueue. MessageQueue receives messages from different handlers and polls them. If a Message exists, it is passed to the corresponding Handler, whose relationship to Message is determined by a target attribute in the Message.
Then continue to see the initialization process of MessageQueue’s native layer:
// Native layer initialization method
jlong android_os_MessageQueue_nativeInt(JNIEnv* env, jclss clazz){
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(a);return reinterpret_cast<jlong>(naticeMessageQueue);
}
Copy the code
// MessageQueue constructor for natvie layer
NativeMessageQueue::NativeMessageQueue(){
mLooper = Looper::getForThread(a);// Get local cache
// If the local cache is empty
// Create a Natice layer Looper and place it in the local cache
if(mLooper == NULL){
mLooper = new Looper(false);
Looper::setForThread(mLooper); }}Copy the code
Then there is the Looper constructor for the native layer:
Looper::Looper(bool allowNonCallbacks){
mWakeEventFd = eventfd(0,EFD_NONBLOCK); / / create the fd
rebuildEpollLocked(a); }Copy the code
Native layer which is the core part of the message loop, in its constructor, create a fd, in the early version, here is not to create fd, but USES the ordinary pipe, because the pipeline involves two copy of the memory, so in later editions of better performance of fd, its internal is a counter, Two copies are avoided.
Then there is the rebuildEpollLocked() function, which adds the above fd to epoll’s listening queue:
void Looper::rebuildEpollLocked(a){
// Create an epoll
mEpollFd = epoll_create(EPOLL_SIZE_HINT); .// Add the fd to the epoll listener queue to listen for its read events
struct epoll_event eventItem;
memset(&eventItem,0.sizeof(epoll_event));
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
epoll_ctl(mEpollFd,EPOLL_CTL_ADD,mWakeEventFd,&eventItem); . }Copy the code
The real listening is in the loop below:
int Looper::pollOnce(int timeoutMills, int* outFd,...){
for(;;) {pollInner(timeoutMills); }}Copy the code
int Looper::pollInner(int timeoutMills){
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// block the method and wait for the event to occur
int eventCount = epoll_wait(mEpollFd, eventItems,...) ;// If you have time, do it in a loop
for(int i = 0; i < eventCount; i++){
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if(fd == mWakeEventFd && (epollEvents & EPOLLIN)){
// If the event is the same as the read event, it will be consumed
awoken(a);// The method of consumption}}}Copy the code
What is the difference between the main thread Looper and the child thread Looper?
Looper of child thread can exit, Looper of main thread cannot exit.
What is the relationship between Looper and MessageQueue and Handler?
In Java layer, Looper and MessageQueue are one-to-one relationship, while in native layer, NativeMessageQueue contains a native layer Looper, which is also one-to-one relationship.
4. How was MessageQueue created?
Java layer MessageQueue constructor will call a native function to create a Native layer NativeMessageQueue, NativeMessageQueue will create a Looper,Looper will create an EventFD, And create epoll, and then add the fd’s readable events to epoll.
Second, the message cycle process
The message loop is Looper’s loop function:
public static void loop(a) {
// Get the current thread's looper
finalLooper me = myLooper(); .// Get the MessageQueue corresponding to Looper
finalMessageQueue queue = me.mQueue; .for (;;) {
// In a loop, the next message is continually fetched
Message msg = queue.next(); // might block if the queue is empty
if (msg == null) {
// No message indicates that the message queue is quitting.
// No message indicating that Looper is finished, return
return;
}
try {
// Use the corresponding MSG handler to distribute the message.
msg.target.dispatchMessage(msg);
} finally{... }...// Reclaim message, that is, reset the state of message and put it into the object pool (single linked list)msg.recycleUnchecked(); }}Copy the code
The two most important steps are fetching and distributing messages.
Take a look at the message first: the next() method in MessageQueue
Message next(a) {...int nextPollTimeoutMillis = 0;
for(;;) {...// Block here, waiting for another thread to send a message, or after the timeout, wake up and return
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if(msg ! =null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
// This is a message barrier
do {
prevMsg = msg;
msg = msg.next;
} while(msg ! =null && !msg.isAsynchronous());
}
// The following is the normal message processing flow
if(msg ! =null) {
if (now < msg.when) {
// If the message is not ready, set the timeout and wake up when the message is ready
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Get the message if it is ready
mBlocked = false;
if(prevMsg ! =null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
// The message is marked for use
msg.markInUse();
returnmsg; }}else {
// If there is no message, the queue is empty and the timeout is set to -1
nextPollTimeoutMillis = -1; }... }}Copy the code
NativePollOnce:
void android_os_MessageQueue_naticePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis){
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env,obj,timeoutMills);
}
Copy the code
void NativeMessageQueue::pollOnce(JNIEnv* env,jobject pollObj,int timeoutMillis){...// Core, loop
mLooper->pollOnce(timeoutMillis); . }Copy the code
As can be seen, Looper’s pollOnce function of the Native layer is finally called, and the pollOnce function finally calls its pollInner function, which is the final method to listen to messages.
3. Message sending process
Handler sendMessage, senMessageEmpty sendMessageDelayed, post, postDelay, postAt method and so on, finally all calls to:
public final boolean sendMessage(Message msg) {
return sendMessageDelayed(msg, 0);
}
Copy the code
public final boolean sendMessageDelayed(Message msg, long delayMillis){
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
Copy the code
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
// A reference to messageQueue is savedMessageQueue queue = mQueue; .// Pass message and timestamp information into MQ
return enqueueMessage(queue, msg, uptimeMillis);
}
Copy the code
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
// Set the target of message to this, that is, the handler that sends the message
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
Copy the code
Then go to enqueueMessage of MQ:
boolean enqueueMessage(Message msg, long when) {...synchronized (this) {... msg.markInUse(); msg.when = when; Message p = mMessages;boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr ! = 0 because mQuitting is false.
if(needWake) { nativeWake(mPtr); }}return true;
}
Copy the code
NativeWake writes messages to the underlying implementation of the queue, that is, writes a number to eventFD in the Looper of the native layer, and then the FD can receive the event.
void android_os_MessageQueue_nativeWake(JNIEnv* env,jclass clazz, jlong ptr){
NativeMessageQueue* naticeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake(a); }Copy the code
NativeMessageQueue finally calls its internal Looper’s wake method:
void Looper::wake(a){
uint64_t inc = 1;
// Write a number to mWakeEventFd
write(mWakeEventFd,&inc,sizeof(uint64_t));
}
Copy the code
To sum up, the process of adding a message starts with the enqueueMessage method of MessageQueue in the Java layer. This method adds the message to the MessageQueue and calls the wake() method of the local MQ corresponding to the message. Finally, a number is written to a counter FD through the local Looper to notify the queue of the message addition, and the counter FD is listened in the loop of the local Looper to wake up the native method called by the upper layer to get the message and process it.
4. The process of message distribution
Message distribution is in the Handler’s dispatchMessage
/** * Handle system messages here. */
public void dispatchMessage(Message msg) {
if(msg.callback ! =null) {
// See if MSG has a callback. If so, call handleCallback
handleCallback(msg);
} else {
// if MSG has no callback, see if there is a global mCallback
if(mCallback ! =null) {
// If so, the global handleMessage will be called, and the logic will be based on the return value of this callback.
// If false is returned, the handleMessage method will be executed
if (mCallback.handleMessage(msg)) {
return; }}// Our own copy of the handleMessage methodhandleMessage(msg); }}Copy the code