Android6.0的Looper源码分析(1)
1????? Looper簡介
Android在Java標準線程模型的基礎上,提供了消息驅動機制,用于多線程之間的通信。而其具體實現就是Looper。
Android Looper的實現主要包括了3個概念:Message,MessageQueue,Handler,Looper。其中Message就是表示一個可執行的任務。消息創建完畢通過消息處理器Handler在任意線程中發送添加至MessageQueue,最終在Looper線程逐個取出并調用handler.handleMessage()進行處理。
?
?
2?????Looper的初始化
這里可以嘗試分析Looper.java類的結構來推測Looper機制的實現原理。以下為Looper類的變量域:
| //這里可以簡單的將ThreadLocal類型的變量想象成一個Map,鍵值為線程號 static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>(); //?注意下面的static表示sMainLooper歸于Looper.Class ????private static Looper sMainLooper;?//注意static數據,進程間并非共享 ????//Looper的每個線程實例都有一個MessageQueue ??? final MessageQueue mQueue; ??? final Thread mThread; |
第一個變量sThreadLocal為ThreadLocal<Looper>類型的變量,它主要由兩個方法,set()和get();這里通過泛型指定了需要線程賦值的變量類型為Looper。簡單理解sThreadLocal .set()就是將當前線程的Looper副本值設定為指定值。sThreadLocal.get()將得到Looper實例在當前線程下的副本。(ThreadLocal的實現還有待研究,初步猜測其內部存在哈希Map,可以根據當前線程的線程號區分不同線程的變量)。通過ThreadLocal實現了線程級單例。
第二個變量為static的sMainLooper,存放的應該是主線程(即UI線程的Looper),類型設計為static,這樣通過Looper.getMainLooper()的方法在任何線程都能獲得該Looper,從而更新UI。
第三個參數為java層的Massage隊列,Handler.sendMessage()就是將Message添加到此隊列以供Looper.loop()。在接下來的分析將會發現,java層的MessageQueue的新建會導致Native層的NativeMessageQueue的創建,進而在導致Native層Looper的創建。
第四個參數,就是Looper所在線程的引用。
將一個線程改造成Looper線程很容易就可以實現,如下;
| ? ??class LooperThread extends Thread { ? ??????public Handler mHandler; ? ? ??????public void run() { ? ??????????Looper.prepare(); ? ? ??????????mHandler = new Handler() {//構造方法內部綁定了當前Looper線程 ? ??????????????public void handleMessage(Message msg) { ? ??????????????????//?在這里處理send進來的消息 ? ??????????????} ? ??????????}; ? ? ??????????Looper.loop(); ??????? } ? ??} |
首先分析Looper的準備工作prepare()。
| ??? public static void prepare() { ??????? prepare(true); ?? ?} ? ??? private static void prepare(boolean quitAllowed) {//保證Looper的線程級單例 ??????? if (sThreadLocal.get() != null) { ??????????? throw new RuntimeException("Only one Looper may be created per thread"); ??????? } ????????sThreadLocal.set(new Looper(quitAllowed));//這里創建了Looper的線程單例 ??? } |
Looper線程單例的創建會導致MessageQueue的創建,MessageQueue內有一個Message類型的變量sMessages,因此可以想到MessageQueue在java層是通過鏈表實現的。以下為MessageQueue的構造函數:
| MessageQueue(boolean quitAllowed) { ??????? mQuitAllowed = quitAllowed; //通過JNI調用了Native層的相關函數,導致了NativeMessageQueue的創建 ????????mPtr = nativeInit();??? } |
可以看到MessageQueue在構造的時候通過JNI調用了Native層的C++函數,從而對Looper在Native層進行必要的初始化操作。同時java MessageQueue獲得了一個指向Native層的指針mPtr,從而可以通過mPtr方便的調用底層的相關方法。NativeInit對應android_os_MessageQueue.cpp中的以下函數。
| static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { ????//在Native層又創建了NativeMessageQueue ????NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); ??? if (!nativeMessageQueue) { ??????? jniThrowRuntimeException(env, "Unable to allocate native queue"); ??????? return 0; ??? } ? ??? nativeMessageQueue->incStrong(env); ??? //這里的返回給java層的mPtr,因此mPtr實際上是Java MessageQueue與 //nativeMessageQueue的橋梁,這里比老版本實現更為簡潔 ????return reinterpret_cast<jlong>(nativeMessageQueue); } | ? |
此時Java層和Native層的MessageQueue被mPtr連接起來了,NativeMessageQueue只是java層MessageQueue在Ntive層的體現,其本身并沒有實現Queue的數據結構,而是從其父類MessageQueue中繼承了mLooper變量。與java層類似,這個Looper也是線程級單例。以下為NativeMessageQueue的構造函數:
| NativeMessageQueue::NativeMessageQueue() : ??????? mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { ??? mLooper = Looper::getForThread(); ??? if (mLooper == NULL) { ????????mLooper = new Looper(false);//在Native層創建了Looper對象 ????????Looper::setForThread(mLooper);//同樣是線程級單例 ??? } } |
可以看到在Java層Looper的創建導致了MessageQueue的創建,而在Native層則剛好相反:NativeMessageQueue的創建導致了Looper的創建。而且Native層的Looper創建和Java層的也完全不一樣。它利用了Linux的epoll機制監測了Input的fd和喚醒fd。從功能上來講,這個喚醒fd才是真正處理java Message和Native Message的鑰匙。(注意5.0以上版本Looper的定義在System/core下)。
| Looper::Looper(bool allowNonCallbacks) : ??????? mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), ??????? mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), ??????? mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { ????//這是linux后來才有的東西,負責線程通信,替換了老版本的pipe mWakeEventFd = eventfd(0, EFD_NONBLOCK); ??? LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd.? errno=%d", errno); ? ??? AutoMutex _l(mLock); ????rebuildEpollLocked(); } |
進入rebuildEpollLocked
| void Looper::rebuildEpollLocked() { ????// Close old epoll instance if we have one. ??? if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ??????? ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif ??????? close(mEpollFd); ??? } ? ???// Allocate the new epoll instance and register the wake pipe. //采用linux的Epoll,與Select功能其實有點類似 ??? ????mEpollFd = epoll_create(EPOLL_SIZE_HINT); ??? LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.? errno=%d", errno); ? ??? struct epoll_event eventItem; ??? memset(& eventItem, 0, sizeof(epoll_event));?//?清空 ??? eventItem.events = EPOLLIN;//關注EPOLLIN事件,也就是可讀 ??? eventItem.data.fd = mWakeEventFd;//設置Fd ??? //?將mWakeEventFd的event添加到監聽隊列,這里其實只是為epoll_ctl放置一個喚醒機制 ??? int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); ??? LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance.? errno=%d", ??????????? errno); ????//這里主要添加的是Input事件如鍵盤,傳感器輸入,這里基本上由系統負責,很少主動去添加 ??? for (size_t i = 0; i < mRequests.size(); i++) { ??????? const Request& request = mRequests.valueAt(i); ??????? struct epoll_event eventItem; ??????? request.initEventItem(&eventItem); ? ??????? int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); ??????? if (epollResult < 0) { ??????????? ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d", ??????????????????? request.fd, errno); ??????? } ??? } } |
這里一定要明白的是,添加的這些fd除了mWakeEventFd負責解除阻塞讓程序繼續運行,從而處理Native Message和Java Message外,其他fd與Message的處理其實,毫無關系(知道這點非常重要)。此時Java層與Native層的聯系如下圖所示:
?
3?????創建消息并發送消息
創建消息和發送消息一般是在Looper線程之外的另一個線程通過Handler發送。以下是Handler的滿參構造方法。
| ??? public Handler(Callback callback, boolean async) { ??????? if (FIND_POTENTIAL_LEAKS) {//調試接口,默認為false ??????????? final Class<? extends Handler> klass = getClass(); ??????????? if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && ??????????????????? (klass.getModifiers() & Modifier.STATIC) == 0) { ??????????????? Log.w(TAG, "The following Handler class should be static or leaks might occur: " + ??????????????????? klass.getCanonicalName()); ??????????? } ??????? } ????????//Handler綁定當前線程的Looper實例 ????????mLooper = Looper.myLooper(); ??????? if (mLooper == null) { ??????????? throw new RuntimeException( ??????????????? "Can't create handler inside thread that has not called Looper.prepare()"); ??????? } ????????mQueue = mLooper.mQueue;//sendMessage的目標隊列就是Looper的MessageQueue ??????? mCallback = callback;//Handler指定callback ??????? mAsynchronous = async;//是否異步 ??? } |
在每一個Handler的構造過程中,Handler通過“mLooper =Looper.myLooper();”悄悄的持有了當前所在的looper線程的一個引用。我們已經知道每個Looper都會有一個MessageQueue,這樣Handler,Looper,MessageQueue就被關聯起來了。
利用Handler發送消息之前需要新建一個Message。獲取Message一般可以采用Message類的static方法obtain()。此方法有很多重載方法,零參實現如下(多參重載只是對零參時未賦值的變量進行了賦值)
| public static Message obtain() { ??????? synchronized (sPoolSync) { ??????????? if (sPool != null) { ??????????????? Message m = sPool; ??????????????? sPool = m.next; ??????????????? m.next = null; ??????????????? m.flags = 0; // clear in-use flag ??????????????? sPoolSize--; ??????????????? return m; ??????????? } ??????? } ??????? return new Message(); ??? } |
接著就可以調用Handler(非Looper線程持有Handler引用)的sendMessage(msg)方法。前面已經提到,Handler內部持有一個Looper的引用,Looper內部有一個MessageQueue。這樣就實現了線程間的消息傳遞。當然除了sendMessage(msg)之外還有其他類似的發送消息的函數。其本質就是往MessageQueue里面添加Message。這里就不詳述了。
特別要指出的是Looper.loop()在消息隊列為空的情況下并不是阻塞在這個MessageQueue上,而是阻塞在Native層的epoll_wait上面。這樣會存在很多問題,一個最為重要的問題就是如果在阻塞的時候,突然接收到java Message,程序怎么立馬去處理這個Message?前面提到epoll監聽了Input的fd和mWakeEventFd。答案就在mWakeEventFd。
先來看每個sendMessage()或其他Send方法都會最終調用以下的這個方法。
| boolean enqueueMessage(Message msg, long when) { ??????? if (msg.target == null) { ??????????? throw new IllegalArgumentException("Message must have a target."); ??????? } ??????? if (msg.isInUse()) { ??????????? throw new IllegalStateException(msg + " This message is already in use."); ??????? } ? ??????? synchronized (this) { ??????????? if (mQuitting) { ??????????????? IllegalStateException e = new IllegalStateException( ??????????????????????? msg.target + " sending message to a Handler on a dead thread"); ??????????????? Log.w(TAG, e.getMessage(), e); ??????????????? msg.recycle(); ??????????????? return false; ??????????? } ? ? ??????????msg.markInUse(); ??????????? msg.when = when; ??????????? Message p = mMessages; ??????????? boolean needWake; ??????????? if (p == null || when == 0 || when < p.when) { ??????????????? // New head, wake up the event queue if blocked. ?????????? ?????msg.next = p; ??????????????? mMessages = msg; ??????????????? needWake = mBlocked; ??????????? } else { ????????????????// Inserted within the middle of the queue.? Usually we don't have to wake ??????????????? // up the event queue unless there is a barrier at the head of the queue ??????????????? // and the message is the earliest asynchronous message in the queue. ??????????????? needWake = mBlocked && p.target == null && msg.isAsynchronous(); ??????????????? Message prev; ??????????????? for (;;) { ??????????????????? prev = p; ??????????????????? p = p.next; ??????????????????? if (p == null || when < p.when) { ??????????????????????? break; ??????????????????? } ??????????????????? if (needWake && p.isAsynchronous()) { ??????????????????????? needWake = false; ??????????????????? } ??????????????? } ??????????????? msg.next = p;?// invariant: p == prev.next ??????????????? prev.next = msg; ??????????? } ? ????????????// We can assume mPtr != 0 because mQuitting is false. ????????????if (needWake) { ????????????????nativeWake(mPtr); ????????????} ??????? } ??????? return true; ??? } |
可以看到以上函數才是真正添加Message的實干函數。在每次添加完畢之后都在需needWake的時候去調用NativeWake(mPtr)。我們已經知道mPtr指向了Native層的NativeMessageQueue。NativeWake(mPtr)最終調用了該類的wake()方法。此方法向mWakeEventFd寫入了一個字節的內容。到底是什么內容并不重要,重要的是fd存在內容了,換句話說就是mWakeEventFd可讀了!因此epoll_wait返回。首先遍歷Native消息隊列(此時基本上為空遍歷),接著遍歷活動fd,這里只有一個活動fd就是mWakeEventFd,讀掉這一個字節的數據解除掉mWakeEventFd的可讀狀態。此時mWakeEventFd功成身退。程序已經從阻塞狀態解除了出來。程序返回到java層的MessageQueue.next()函數中,next函數返回即從MessageQueue中返回此msg,以做后續的處理。
首先來看Looper.loop()。
| ??? public static void loop() { ??????? final Looper me = myLooper(); ??????? if (me == null) { ??????????? throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); ??????? } ??????? final MessageQueue queue = me.mQueue; ? ????????// Make sure the identity of this thread is that of the local process, ??????? // and keep track of what that identity token actually is. ??????? Binder.clearCallingIdentity(); ??????? final long ident = Binder.clearCallingIdentity(); ? ????????for (;;) {//無限循環直到quit() ????????????Message msg = queue.next();//獲取下一個java Message ??????????? if (msg == null) { ????????????????// No message indicates that the message queue is quitting. ??????????????? return; ??????????? } ? ????????????// This must be in a local variable, in case a UI event sets the logger ??????????? Printer logging = me.mLogging; ??????????? if (logging != null) { ??????????????? logging.println(">>>>> Dispatching to " + msg.target + " " + ??????????????????????? msg.callback + ": " + msg.what); ??????????? } ? ????????????msg.target.dispatchMessage(msg);//java層的Message處理在這里 ? ??????????? if (logging != null) { ??????????????? logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); ??????????? } ? ??????????? // Make sure that during the course of dispatching the ??????????? // identity of the thread wasn't corrupted. ??????????? final long newIdent = Binder.clearCallingIdentity(); ??????????? if (ident != newIdent) { ??????????????? Log.wtf(TAG, "Thread identity changed from 0x" ??????????????????????? + Long.toHexString(ident) + " to 0x" ? ??????????????????????+ Long.toHexString(newIdent) + " while dispatching to " ??????????????????????? + msg.target.getClass().getName() + " " ??????????????????????? + msg.callback + " what=" + msg.what); ??????????? } ? ??????????? msg.recycleUnchecked(); ??????? } ??? } |
這里直接進入MessageQueue.next()
| Message next() { ????????// Return here if the message loop has already quit and been disposed. ??????? // This can happen if the application tries to restart a looper after quit ??????? // which is not supported. ??????? final long ptr = mPtr; ??????? if (ptr == 0) { ??????????? return null; ??????? } ? ??????? int pendingIdleHandlerCount = -1;?// -1 only during first iteration ????????int nextPollTimeoutMillis = 0;//這個參數向Native層epoll_wait指定時超時時間 ????????for (;;) { ??????????? if (nextPollTimeoutMillis != 0) {//此處作用有待研究 ??????????????? Binder.flushPendingCommands(); ??????????? } ? ????????????nativePollOnce(ptr, nextPollTimeoutMillis);//一般都是阻塞在這個函數 ? ??????????? synchronized (this) { ????????????????// Try to retrieve the next message.? Return if found. ??????????????? final long now = SystemClock.uptimeMillis(); ??????????????? Message prevMsg = null; ??????????????? Message msg = mMessages; ??????????????? if (msg != null && msg.target == null) { ????????????????// Stalled by a barrier.? Find the next asynchronous message in the queue. ??????????????????? do { ??????????????????????? prevMsg = msg; ??????????????????????? msg = msg.next; ??????????????????? } while (msg != null && !msg.isAsynchronous()); ??????????? ????} ??????????????? if (msg != null) { ??????????????????? if (now < msg.when) { ????????????????????????// Next message is not ready.? Set a timeout to wake up when it is ready. ??????????????????????? nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); ??????????????????? } else { ????????????????????????// Got a message. ??????????????????????? mBlocked = false; ??????????????????????? if (prevMsg != null) { ??????????????????????????? prevMsg.next = msg.next; ?????????????????? ?????} else { ??????????????????????????? mMessages = msg.next; ??????????????????????? } ??????????????????????? msg.next = null; ??????????????????????? if (DEBUG) Log.v(TAG, "Returning message: " + msg); ??????????????????????? msg.markInUse(); ??????? ????????????????return msg; ??????????????????? } ??????????????? } else { ????????????????????// No more messages. ??????????????????? nextPollTimeoutMillis = -1; ??????????????? } ? ????????????????// Process the quit message now that all pending messages have been?handled. ??????????????? if (mQuitting) { ??????????????????? dispose(); ??????????????????? return null; ??????????????? } ? ????????????????// If first time idle, then get the number of idlers to run. ??????????????? // Idle handles only run if the queue is empty or if the first message ??????????????? // in the queue (possibly a barrier) is due to be handled in the future. ??????????????? if (pendingIdleHandlerCount < 0 ??????????????????????? && (mMessages == null || now < mMessages.when)) { ??????????????????? pendingIdleHandlerCount = mIdleHandlers.size(); ??????????????? } ??????????????? if (pendingIdleHandlerCount <= 0) { ????????????????????// No idle handlers to run.? Loop and wait some more. ??????????????????? mBlocked = true; ??????? ????????????continue; ??????????????? } ? ??????????????? if (mPendingIdleHandlers == null) { ??????????????????? mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; ??????????????? } ??????????????? mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); ??????????? } ? ????????????// Run the idle handlers. ??????????? // We only ever reach this code block during the first iteration. ??????????? for (int i = 0; i < pendingIdleHandlerCount; i++) { ??????????????? final IdleHandler idler = mPendingIdleHandlers[i]; ??????????????? mPendingIdleHandlers[i] = null; // release the reference to the handler ? ??????????????? boolean keep = false; ??????????????? try { ??????????????????? keep = idler.queueIdle(); ??????????????? } catch (Throwable t) { ??????????????????? Log.wtf(TAG, "IdleHandler threw exception", t); ??????????????? } ? ??????????????? if (!keep) { ??????????????????? synchronized (this) { ??????????????????????? mIdleHandlers.remove(idler); ??????????????????? } ??????????????? } ??????????? } ? ????????????// Reset the idle handler count to 0 so we do not run them again. ??????????? pendingIdleHandlerCount = 0; ? ????????????// While calling an idle handler, a new message could have been delivered ??????????? // so go back and look again for a pending message without waiting. ??????????? nextPollTimeoutMillis = 0; ??????? } ??? } |
上面函數中最為重要的變量為nextPollTimeoutMillis。這個參數為Native層的epoll_wait指定了超時時間。為什么會存在這個epoll_wait超時時間呢?不是已經有一個mWakeEventFd已經可以喚醒epoll_wait了么?回答這個問題需要對Message加以分析,存在多種Message,其中一種Message為需要立即執行的消息。這樣的消息通過mWakeEventFd喚醒就可以了。另一種消息是延時消息,或者是在指定時間執行的消息。這樣的消息添加到MessageQueue后一般不需要立即執行,而是等一段時間才會去執行,通過一些必要的計算給epoll_wait()指定超時時間可以使得在需要執行這些定時任務的時候epoll_wait()返回。此函數就是實現了這樣的邏輯。
接著上面的之前的分析,Looper.loop()調用MessageQueue.next()。next()調用NativePollOnce從而進入Native層處理input和Native Message。NativePollOnce經過幾次轉調最終會落在mLooper.PollOnce(),如下:
| int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { ??? int result = 0; ????for (;;) {//首先對fd對應的的responses進行處理,后面會發現responses里都是活動fd ??????? while (mResponseIndex < mResponses.size()) { ??????????? const Response& response = mResponses.itemAt(mResponseIndex++); ??????????? int ident = response.request.ident; ??????????? if (ident >= 0) {//這里大于0標示沒有指定callback直接返回即可,有為-2 ??????????????? int fd = response.request.fd; ??????????????? int events = response.events; ??????????????? void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ??????????????? ALOGD("%p ~ pollOnce - returning signalled identifier %d: " ??????? ????????????????"fd=%d, events=0x%x, data=%p", ??????????????????????? this, ident, fd, events, data); #endif ??????????????? if (outFd != NULL) *outFd = fd; ??????????????? if (outEvents != NULL) *outEvents = events; ??????????????? if (outData != NULL) *outData = data; ??????????????? return ident; ??????????? } ??????? } // ??????? if (result != 0) {//注意這里處于循環內部,改變result的值是在后面的pollInner #if DEBUG_POLL_AND_WAKE ??????????? ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif ??????????? if (outFd != NULL) *outFd = 0; ??????????? if (outEvents != NULL) *outEvents = 0; ??????????? if (outData != NULL) *outData = NULL; ??????????? return result; ??????? } ? ????????result = pollInner(timeoutMillis);//內部epoll_wait ??? } } |
接著進入pollInner
| int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE ??? ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif ? ????// Adjust the timeout based on when the next message is due. ??? if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { ??????? nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); ??????? int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); ??????? if (messageTimeoutMillis >= 0 ??????????????? && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { ??????????? timeoutMillis = messageTimeoutMillis; ??????? } #if DEBUG_POLL_AND_WAKE ??????? ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d", ??????????????? this, mNextMessageUptime - now, timeoutMillis); #endif ??? } ? ????// Poll. ??? int result = POLL_WAKE; ??? mResponses.clear(); ??? mResponseIndex = 0; ? ????// We are about to idle. ??? mPolling = true; ? ??? struct epoll_event eventItems[EPOLL_MAX_EVENTS]; ??? int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); ? ????// No longer idling. ??? mPolling = false; ? ????//?獲得鎖,在Native Message的處理和添加邏輯上需要同步 ????mLock.lock(); ? ????//如果需要,重建epoll ??? if (mEpollRebuildRequired) { ??????? mEpollRebuildRequired = false; ??????? rebuildEpollLocked(); ??????? goto Done; ??? } ? ????// Check for poll error. ??? if (eventCount < 0) { ??????? if (errno == EINTR) { ??????????? goto Done; ??????? } ??????? ALOGW("Poll failed with an unexpected error, errno=%d", errno); ??????? result = POLL_ERROR; ??????? goto Done; ??? } ? ????// epoll超時 ??? if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ??????? ALOGD("%p ~ pollOnce - timeout", this); #endif ??????? result = POLL_TIMEOUT;//此值返回PollOnce,從而導致java定時Message執行 ??????? goto Done; ??? } ? ????// Handle all events. #if DEBUG_POLL_AND_WAKE ??? ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif ??? //首先處理活動的input設備和mWakeEventFd ??? for (int i = 0; i < eventCount; i++) { ??????? int fd = eventItems[i].data.fd; ??????? uint32_t epollEvents = eventItems[i].events; ??????? if (fd == mWakeEventFd) {//若果是喚醒fd有反應 ???????? ???if (epollEvents & EPOLLIN) { ????????????????awoken();//內部就是read,從而使fd可讀狀態被清除 ??????????? } else { ??????????????? ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); ??????????? } ??????? } else {//其他input fd處理,其實就是講活動fd放入到responses隊列中,等待處理 ??????????? ssize_t requestIndex = mRequests.indexOfKey(fd); ??????????? if (requestIndex >= 0) { ??????????????? int events = 0; ??????????????? if (epollEvents & EPOLLIN) events |= EVENT_INPUT; ??????????????? if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; ??????????????? if (epollEvents & EPOLLERR) events |= EVENT_ERROR; ??????????????? if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; ????????????????pushResponse(events, mRequests.valueAt(requestIndex)); ??????????? } else { ???? ???????????ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " ??????????????????????? "no longer registered.", epollEvents, fd); ??????????? } ??????? } ??? } Done: ; ? ????//?這里應該是處理Native層的Message ??? mNextMessageUptime = LLONG_MAX; ??? while (mMessageEnvelopes.size() != 0) { ??????? nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); ??????? const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); ??????? if (messageEnvelope.uptime <= now) { ????????????// Remove the envelope from the list. ??????????? // We keep a strong reference to the handler until the call to handleMessage ??????????? // finishes.? Then we drop it so that the handler can be deleted *before* ??????????? // we reacquire our lock. ??????????? {?// obtain handler ? ??????????????sp<MessageHandler> handler = messageEnvelope.handler; ??????????????? Message message = messageEnvelope.message; ??????????????? mMessageEnvelopes.removeAt(0); ??????????????? mSendingMessage = true; ??????????????? mLock.unlock(); ? #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ??????????????? ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", ??????????????????????? this, handler.get(), message.what); #endif ????????????????handler->handleMessage(message);//處理Native Message ????????? ??}?// release handler ? ??????????? mLock.lock(); ??????????? mSendingMessage = false; ??????????? result = POLL_CALLBACK; ??????? } else { ????????????// The last message left at the head of the queue determines the next wakeup time. ??????????? mNextMessageUptime = messageEnvelope.uptime; ??????????? break; ??????? } ??? } ? ????// Release lock. ??? mLock.unlock(); ? ????//?處理之前添加進responses的活動Input設備 ??? for (size_t i = 0; i < mResponses.size(); i++) { ??????? Response& response = mResponses.editItemAt(i); ??????? if (response.request.ident == POLL_CALLBACK) { ??????????? int fd = response.request.fd; ??????????? int events = response.events; ??????????? void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ??????????? ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", ??????????????????? this, response.request.callback.get(), fd, events, data); #endif ????????????// Invoke the callback.? Note that the file descriptor may be closed by ??????????? // the callback (and potentially even reused) before the function returns so ??????????? // we need to be a little careful when removing the file descriptor afterwards. ??????????? //這里處理了有callback的fd,沒有fd的處理可以推后到下次循環的pollOnce ????????????int callbackResult = response.request.callback->handleEvent(fd, events, data); ??????????? if (callbackResult == 0) { ??????????????? removeFd(fd, response.request.seq); ??????????? } ? ????????????// Clear the callback reference in the response structure promptly because we ?? ?????????// will not clear the response vector itself until the next poll. ??????????? response.request.callback.clear(); ??????????? result = POLL_CALLBACK; ??????? } ??? } ??? return result; } |
下面是Looper的處理結構圖。關鍵在于epoll。
?
這里很明顯涉及到3類消息的處理:
1,Java層的Message
2,Native層的Message
3,活動fd指向的Input設備
下面將對著三類消息一一進行分析。
4?????Java層 Message的處理
首先需要明確的是Java層Message的執行時機。在上一節的分析中已經分析過了,它是在Native層Message和fd之后。Looper.loop()阻塞的位置在MassageQueue.next()->pollOnce()->pollInner()->epoll_wait()。
1,???????????如果三類消息都為空,此時Java層send進來一個msg。sendMessage()將調用NativeWake喚醒epoll_wait()。從而回到Java層處理該msg。
2,???????????如果只有Java層有msg,且為定時任務,sendMessage時喚醒epoll_wait()。在下一次循環中為epoll_wait設置超時時間。(實際上邏輯更為復雜)。
3,???????????在循環時添加Java Message。epoll_wait立即返回。Msg在下一次循環被處理。
Java層Message的發送和處理流程大致如下圖所示:
5?????Native層 Message的處理
Native層Message的發送和處理流程大致如下圖所示:
從圖中可以發現,Native消息的發送過程和處理與java層Message的處理比較類似。都是在任意線程中新建一個Message,然后sendMessage(),所不同的是Native層的Looper沒有Handler,因此sendMessage只能通過Looper.sendMessage()。并且需要在SendMessage()時為該Message指定處理該Message的MessageHandler。而且Native層MessageQueue的實現mMessageEnvelopes本質上是Vector,這一點和Java層MessageQueue是不同的。同樣需要在sendMessage()的時候wake()。邏輯和Java層類似就不贅述了。
6?????活動fd對應的Input設備的處理
這類消息由epoll直接監聽fd,當input設備有活動時,epoll_wait()檢測到對應的fd可讀(或可寫)。從而對fd做處理。這類消息的處理比較分散,首先來看pollInner()。
| ? int Looper::pollInner(int timeoutMillis) { ??? …… int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); …… ??? for (int i = 0; i < eventCount; i++) { ??????? int fd = eventItems[i].data.fd; ??????? uint32_t epollEvents = eventItems[i].events; ??????? if (fd == mWakeEventFd) { ??????????? if (epollEvents & EPOLLIN) { ??????????????? awoken(); ??????????? } else { ??????????????? ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); ???????? ???} ??????? } else { ??????????? ssize_t requestIndex = mRequests.indexOfKey(fd); ??????????? if (requestIndex >= 0) { ??????????????? int events = 0; ??????????????? if (epollEvents & EPOLLIN) events |= EVENT_INPUT; ??????????????? if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; ??????????????? if (epollEvents & EPOLLERR) events |= EVENT_ERROR; ??????????????? if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; ????????????????//將活動的fd對應mRequests包裝成responses隊列 ????????????????pushResponse(events, mRequests.valueAt(requestIndex)); ??????????? } else { ??????????????? ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " ??????????????????????? "no longer registered.", epollEvents, fd); ??????????? } ??????? } } …… ??? } ? ????//?帶callback的responses處理 ??? for (size_t i = 0; i < mResponses.size(); i++) { ??????? Response& response = mResponses.editItemAt(i); ??????? if (response.request.ident == POLL_CALLBACK) { ??????????? int fd = response.request.fd; ??????????? int events = response.events; ? ??????????void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ??????????? ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", ??????????????????? this, response.request.callback.get(), fd, events, data); #endif ????????????// Invoke the callback.? Note that the file descriptor may be closed by ??????????? // the callback (and potentially even reused) before the function returns so ??????????? // we need to be a little careful when removing the file descriptor afterwards. ????????????int callbackResult = response.request.callback->handleEvent(fd, events, data); ??????????? if (callbackResult == 0) { ??????????????? removeFd(fd, response.request.seq); ??????????? } ? ????????????// Clear the callback reference in the response structure promptly because we ??????????? // will not clear the response vector itself until the next poll. ??????????? response.request.callback.clear(); ??????????? result = POLL_CALLBACK; ??????? } ??? } ??? return result; } |
可以看到,對于活躍fd已經包含了callback的response,直接調用了此callback的HandlerEvent()函數。那對于沒有指定Callback的活動responses在那處理呢?在下一次訓話中的PollOnce()。也就是下一次epoll_wait()之前。
| int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { ??? int result = 0; ??? for (;;) { ??????? while (mResponseIndex < mResponses.size()) { ??????????? const Response& response = mResponses.itemAt(mResponseIndex++); ??????????? int ident = response.request.ident; ??????????? if?(ident >= 0) {//這里大于0標示沒有指定callback直接返回即可,有為-2 ?????????????? ?int fd = response.request.fd; ??????????????? int events = response.events; ??????????????? void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ??????????????? ALOGD("%p ~ pollOnce - returning signalled identifier %d: " ??????????????????????? "fd=%d, events=0x%x, data=%p", ??????????????????????? this, ident, fd, events, data); #endif ??????????????? if (outFd != NULL) *outFd = fd; ??????????????? if (outEvents != NULL) *outEvents = events; ??????????????? if (outData != NULL) *outData = data; ?? ?????????????return ident;//對沒有callback的response直接返回ident(“沒有callback”) ??????????????? ??????????? } ??????? } ? ????????if (result != 0) { #if DEBUG_POLL_AND_WAKE ??????????? ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif ??????????? if (outFd != NULL) *outFd = 0; ??????????? if (outEvents != NULL) *outEvents = 0; ??????????? if (outData != NULL) *outData = NULL; ??????????? return result; ??????? } ? ????????result = pollInner(timeoutMillis); ??? } } |
注意pollOnce傳入此函數的后三個參數為指針,因此也可以被認為是“返回值”,上層由此獲得了一個活動fd的副本,以做后續處理。而此活動fd被responses.clear()掉。
接著還是來繼續分析自帶callback的request。這里面臨兩個問題:1,誰添加了這些request?2,這些request的callback->handleEvent()到底指向了那個函數?
對于第一個為題,可從后往前分析。epoll使用的是fd。這些fd在NativeInit中具體一點就是在Native Looper的構建中被添加進epoll監聽隊列中,如下
| void Looper::rebuildEpollLocked() { ????// Close old epoll instance if we have one. ??? if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ??????? ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif ??????? close(mEpollFd); ??? } ? ????// Allocate the new epoll instance and register the wake pipe. ??? mEpollFd = epoll_create(EPOLL_SIZE_HINT); ??? LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.? errno=%d", errno); ? ??? struct epoll_event eventItem; ??? memset(& eventItem, 0, sizeof(epoll_event));?// zero out unused members of data field union ??? eventItem.events = EPOLLIN; ??? eventItem.data.fd = mWakeEventFd; ??? int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); ??? LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance.? errno=%d", ??????????? errno); ???//就是這里 ????for (size_t i = 0; i < mRequests.size(); i++) { ??????? const Request& request = mRequests.valueAt(i); ??????? struct epoll_event eventItem; ??????? request.initEventItem(&eventItem); ? ????????int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); ??????? if (epollResult < 0) { ??????????? ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d", ??????????????????? request.fd, errno); ??????? } ??? } } |
從以上程序可以發現這些fd都是mRequests中取出來的。而mRequests由Looper.addFd()添加。查看此函數的調用者發現,很多地方都有調用此函數。因此推測在Native層可以直接使用此函數,向epoll添加監聽fd。那java層能向epoll添加fd么?發現NativeInit在Native層對應的函數android_os_MessageQueue_nativeInit有一個鄰居如下。
| static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, ??????? jlong ptr, jint fd, jint events) { ??? NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); ??? nativeMessageQueue->setFileDescriptorEvents(fd, events); } |
進入setFileDescriptorEvents()
| void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) { ??? if (events) {//從這里判斷是添加還是刪除 ??????? int looperEvents = 0; ??????? if (events & CALLBACK_EVENT_INPUT) { ??????????? looperEvents |= Looper::EVENT_INPUT; ? ??????} ??????? if (events & CALLBACK_EVENT_OUTPUT) { ??????????? looperEvents |= Looper::EVENT_OUTPUT; ??????? } ????????mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this, ????????????????reinterpret_cast<void*>(events));//添加fd,this指明callback為類自己 ??? } else { ????????mLooper->removeFd(fd);//這里刪除fd ??? } } |
因此在Java層也是可以向epoll添加fd的
| ??? private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events, ??????????? OnFileDescriptorEventListener listener) { ??????? final int fdNum = fd.getInt$(); ? ??????? int index = -1; ??????? FileDescriptorRecord record = null; ??????? if (mFileDescriptorRecords != null) { ??????????? index = mFileDescriptorRecords.indexOfKey(fdNum); ??????????? if (index >= 0) { ??????????????? record = mFileDescriptorRecords.valueAt(index); ??????????????? if (record != null && record.mEvents == events) { ??????????????????? return; ??????????????? } ??????????? } ??????? } ? ??????? if (events != 0) { ??????????? events |= OnFileDescriptorEventListener.EVENT_ERROR; ??????????? if (record == null) { ??????????????? if (mFileDescriptorRecords == null) { ??????????????????? mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>(); ??????????????? } ??????????????? record = new FileDescriptorRecord(fd, events, listener); ??????????????? mFileDescriptorRecords.put(fdNum, record); ??????????? } else { ??????????????? record.mListener = listener; ??????????????? record.mEvents = events; ??????????????? record.mSeq += 1; ??????????? } ????????????nativeSetFileDescriptorEvents(mPtr, fdNum, events);//添加或刪除fd ??????? } else if (record != null) { ??????????? record.mEvents = 0; ????????????mFileDescriptorRecords.removeAt(index);//猜測是java層的fd記錄 ??????? } ??? } |
由于在addFd時候指定自己也就是this是callback,因此到最后該fd處理的時候會進入NativeMessageQueue的handlerEvent()方法。
| int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) { ??? int events = 0; ??? if (looperEvents & Looper::EVENT_INPUT) { ??????? events |= CALLBACK_EVENT_INPUT; ??? } ??? if (looperEvents & Looper::EVENT_OUTPUT) { ??????? events |= CALLBACK_EVENT_OUTPUT; ??? } ??? if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) { ??????? events |= CALLBACK_EVENT_ERROR; ??? } ??? int oldWatchedEvents = reinterpret_cast<intptr_t>(data); ????int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,//調用java層代碼 ????????????gMessageQueueClassInfo.dispatchEvents, fd, events); ??? if (!newWatchedEvents) { ??????? return 0;?// unregister the fd ??? } ??? if (newWatchedEvents != oldWatchedEvents) { ??????? setFileDescriptorEvents(fd, newWatchedEvents); ??? } ??? return 1; } |
需要注意的是gMessageQueueClassInfo指向了java層的MessageQueue,因此MessageQueue的dispatchEvents()方法被調用。Message在java層被處理。
當然在Native層就可以實現fd的添加和處理。貌似這也是主要的途徑。Android6.0有好些個專門的類處理Input設備。如android_view_InputQueue、android_view_InputEventSender、android_view_InputEventReceiver等等。這里就不詳述了。留待以后研究。
原文地址: http://blog.csdn.net/a34140974/article/details/50638089
總結
以上是生活随笔為你收集整理的Android6.0的Looper源码分析(1)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android动态加载技术三个关键问题详
- 下一篇: Android6.0的SMS(短信)源码