網站首頁 編程語言 正文
引言
在分析底層源碼時,時不時會碰到 Looper::wake() 或者 Looper::pollOnce() 這樣的代碼,之前大概知道是 Native 層的消息循環機制。為了以后我也能夠使用它,我決定還是徹底分析一遍源碼。
本文只涉及一個文件,路徑如下
system/core/libutils/Looper.cpp
Looper的創建
在 Java 層,有一個線程的子類 HandlerThread,它可以創建一個線程,并且使用消息機制,其中的關鍵兩步是 Looper::prepare() 和 Looper::loop()。
Looper::prepare() 會創建一個與線程綁定的 Looper 對象,這個綁定是通過 ThreadLocal實現的,而 Looper::loop() 會讓線程進入無限循環來處理消息。
在 Native 層,也有一個 Looper 類,可以通過 Looper::prepare() 來創建 Looper 對象,代碼如下
sp<Looper> Looper::prepare(int opts) {
// opts決定Looper::addFd()的參數callback是否可以為空
bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS;
// 通過pthread_getspecific()獲取線程本地存儲中的Looper對象
sp<Looper> looper = Looper::getForThread();
if (looper == nullptr) {
looper = new Looper(allowNonCallbacks);
// 通過pthread_setspecific()把Looper對象保存到線程本地存儲中
Looper::setForThread(looper);
}
return looper;
}
Native 層的線程在首次調用 Looper::prepare() 時,會創建 Looper 對象,并通過 pthread_setspecific() 把它保存到線程本地存儲中,后面再獲取 Looper 對象時,通過 pthread_getspecific() 從線程本地存儲中獲取。
這不就是 Java 的 ThreadLocal 的功能嗎?
現在讓我們來看下 Looper 對象的創建過程
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 1. 創建 eventfd 對象,用于喚醒 Looper
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
AutoMutex _l(mLock);
// 2. 創建 epoll 對象,并監聽剛才創建的 eventfd 的輸入事件
rebuildEpollLocked();
}
首先創建了一個eventfd 對象,由 mWakeEventFd 代表,它用于喚醒 Looper 。如何喚醒呢? 繼續往下看。
然后調用 rebuildEpollLocked() 創建一個 epoll 對象,并監聽剛才創建的 mWakeEventFd 的 I/O 事件,代碼如下
void Looper::rebuildEpollLocked() {
// ...
// 創建epoll對象
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
// 監聽 mWakeEventFd 輸入事件
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
for (size_t i = 0; i < mRequests.size(); i++) {
// ...
}
}
epoll 對象監聽了 mWakeEventFd 的可讀事件。在后面的分析中,我們將看到,當 Looper 開始輪詢時,會調用 epoll_wait() 阻塞地等待事件,那么有人向 mWakeEventFd 寫入數據時,epoll_wait() 將返回,那么 Looper 就被喚醒。
發送消息與監聽請求
Native 層的 Looper 可以處理兩種類型的事件,一種是消息( Message ),另一種是請求( Request )。下面我們來看看如何向 Looper 發送消息,如何讓 Looper 監聽請求。
發送消息
通過 Looper::sendMessageXXX() 這一類函數,可以向 Looper 發送消息
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ // acquire lock
// 通過鎖,可以保存 mMessageEnvelopes 的線程安全
AutoMutex _l(mLock);
// 獲取消息要插入的位置
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
// 1. 把信息保存到 mMessageEnvelopes 中
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// mSendingMessage 表明 Looper 正在處理消息,因此不用喚醒Looper
if (mSendingMessage) {
return;
}
} // release lock
// 2. 如有必要,就喚醒Looper
if (i == 0) {
wake();
}
}
當 Looper 接收到消息時,它會把消息保存到 mMessageEnvelopes 容器中,并且如果有必要,那么會調用 Looper::wake() 喚醒 Looper 來處理消息。
前面我們大概地說明了下如何通過 mWakeEventFd 這個 eventfd 對象喚醒 Looper,現在讓我們來看下喚醒是如何實現的
void Looper::wake() {
uint64_t inc = 1;
// 向 mWakeFd 中寫入數據
// TEMP_FAILURE_RETRY 是一個重試機制,確保不會因為系統中斷而導致數據沒有寫入
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
// 確保寫入的是 unsigned 64-bit 的數據
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
原來就是向 mWakeEventFd 寫數據。在后面我們將看到,當 Looper 進入輪詢時, 當epoll_wait() 檢測到 mWakeEventFd 有數據可讀時,就會從阻塞中醒來,從而達到 mWakeEventFd 喚醒 Looper 的目的。
監聽請求
現在我們來看下如何讓 Looper 監聽請求,它是通過 Looper::addFd() 實現的
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
// 1. 確認 ident 的值
if (!callback.get()) { // 回調為空
if (! mAllowNonCallbacks) { // mAllowNonCallbacks是在創建Looper時初始化的
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) { // 回調為空,ident的值不能小于0
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else { // 回調不為空
// POLL_CALLBACK值為-2,表示請求通過回調處理請求
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
// 2. 包裝成一個 Request
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
// 3. 用 epoll 對象監聽 fd 的事件,并把請求保存到 mRequests 中
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult < 0) {
// ...
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
Looper::addFd() 的實質就是用 epoll 對象監聽指定 fd 的 I/O 事件。為何我把這一過程稱之為 監聽請求 呢 ? 因為這個函數把它的參數包裝成一個 Request 對象,并保存到 mRequests 容器 中。
Looper 進行輪詢時,epoll_wait() 會阻塞,當 fd 指向的文件有 I/O 事件時,epoll_wait() 將會獲取到 fd 的可讀事件,因此 Looper 會被喚醒。
當 Looper 檢測到有請求到來時,一般是通過回調處理的,也就是這里的參數 callback。當然,也可以不設置回調,當有請求到來時,交給外部的調用者去處理,我們將會在后面看到。
根據以上知識,我們來理解 Looper::addFd() 的第一步。當 callback 參數為空時,ident 必須大于或等于0,否則為 POLL_CALLBACK,注意,這的值是 -2。 因此呢,當我們檢測到一個請求的 ident 大于或等于0時,這個請求肯定不是通過回調處理的。這一點非常重要,我們將會在后面用到。
Looper 處理消息或請求
Native 層的 Looper 是通過 Looper::pollOnce() 或 Looper::pollAll() 來統一處理消息和請求的,我們挑 Looper::pollOnce() 這個函數來分析下
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) { // 無限循環
// 1. 處理那些不是通過callback處理的請求
while (mResponseIndex < mResponses.size()) {
// ...
}
// 2. 處理pollInner()輪詢的結果
if (result != 0) {
// ...
}
// 3. epoll 等待并處理事件(如果有事件到來)
result = pollInner(timeoutMillis);
}
}
當首次調用 Looper::pollOnce() 時,第一步和第二步肯定不會發生,那么我們先來看下第三步,這個函數比較長,我們一步步解析
int Looper::pollInner(int timeoutMillis) {
// 省略計算 timeoutMillis 的代碼
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 1. 等待事件
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
}
第一步,通過 epoll_wait() 阻塞地等待它監聽的 fd 的 I/O 就緒, 此時 Looper 進入休眠。
那么怎么喚醒 Looper 呢? 根據前面的分析,epoll 對象監聽了 mWakeEventFd 以及 通過 Looper::addFd() 添加的 fd。 那么向這些被監聽的 fd 寫入數據,就可以喚醒 Looper。例如,Looper::wake() 就是通過向 mWakeEventFd 寫入數據來喚醒 Looper。
那么現在我們來看下 Looper 被喚醒后的處理流程
int Looper::pollInner(int timeoutMillis) {
// ...
// 1. 等待I/O就緒
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
// 2. 處理 epoll 事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
// 2.1 處理被消息喚醒的情況
if (epollEvents & EPOLLIN) {
// 清理eventfd中的數據
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 2.2 處理被請求喚醒的情況
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
// 根據epoll觸發的事件類型,填充events相應的位
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 創建Response對象,保存兩個參數,然后把Response對象保存到mResponses中
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
}
當 mWakeEventFd 的 I/O 就緒,就會走到2.1步,之后會讀取 mWakeEventFd 中的數據,讀取的數據并沒有什么用,只是清理數據而已。而這一步,大部分情況 是由于消息的到來,而極少情況是并不是因為消息的到來,而是因為線程有緊急事情需要處理,所以必須要喚醒。
當通過Looper::addFd() 添加的 fd 就緒時,就會走到 2.2 步,這一步一定是因為請求到來了。它會創建 Reponse 對象,并保存,代碼如下
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
這里我們要注意下,mResponses 容器表示待處理請求的集合,這些請求會在后面處理,讓我們接著往下看。
int Looper::pollInner(int timeoutMillis) {
// ...
// 1. 等待I/O就緒
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
// 2. 處理 epoll 事件
for (int i = 0; i < eventCount; i++) {
// ...
}
Done: ;
mNextMessageUptime = LLONG_MAX;
// 3. 處理消息
while (mMessageEnvelopes.size() != 0) { // 循環處理消息
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
// 3.1 取出隊頭消息
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// 3.2 隊頭消息處理的時間點小于當前時間點,表示要立即處理消息
{
// 獲取handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
// 消息交給handler處理
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
// POLL_CALLBACK 表示消息被回調處理
result = POLL_CALLBACK;
} else {
// 3.2 隊頭的消息處理的時間點大于當前時間,表示還沒有到處理的時間點,就退出處理消息的循環
// mNextMessageUptime 表示下一個消息要處理的時間點,當通過break退出循環后,
// 在外層的下一次循調用pollInner()時,會通過 mNextMessageUptime 計算 epoll_wait 的超時時間
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
}
根據前面分析的消息發送的過程,消息保存在 mMessageEnvelopes 中。那么這里的第三步,很明顯是在處理消息。通過循環,不斷取出消息,然后把消息的 messageEnvelope.uptime 與當前時間進行比較,如果小于當前時間,就證明要立馬處理消息了,否則這些消息只能在下一次輪詢中再處理。
處理完了消息,現在來處理請求
int Looper::pollInner(int timeoutMillis) {
// ...
// 1. 等待事件
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
// 2. 處理 epoll 事件
for (int i = 0; i < eventCount; i++) {
// ...
}
Done: ;
// 3. 處理消息
while (mMessageEnvelopes.size() != 0) { // 循環處理消息
// ...
}
// 4. 循環處理請求
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// 檢測請求是否通過回調處理
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
// 表明消息被回調處理了
result = POLL_CALLBACK;
}
}
// 返回結果
return result;
}
剛剛我們還提到,mResponse 中保存了待處理的請求。現在通過循環,不斷取出請求來處理。處理請求有一個條件,那就是請求必須有回調,否則不處理。 再回顧前面分析 監聽請求 的代碼,當Looper::addFd() 的參數 callback 不為空時,Request.ident 的值為 POLL_CALLBACK,表明請求需要通過回調處理。
Looper::pollInner() 函數分析完畢,現在再回到 Looper::pollOnce()
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) { // 無限循環
// 1. 處理那些不是通過callback處理的請求
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
// 從Looper::addFd()分析可知,只有當callback為空的情況下,ident的值>=0,否則為POLL_CALLBACK(-2)
// 因此,這里處理的是那些沒有通過callback處理的請求
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 因為Looper無法通過callback處理,所以把這些元數據交給調用者處理
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
// 注意,這里返回的值大于0
return ident;
}
}
// 2. 處理pollInner()輪詢的結果
// result的值有很多種,但是都為負數(注意,上面處理那些不是通過callback處理的請求,返回正值)
// 1. POLL_WAKE(-1): 表示epoll_wait()是被eventfd喚醒的
// 1. POLL_ERROR(-4): 表示epoll_wait()出錯
// 2. POLL_TIMEOUT(-3) : 表示epoll_wait()超時
// 3. POLL_CALLBACK(-2) : 表示消息或請求是通過回調處理的
if (result != 0) {
// 消息或事件無論是否被callback處理,這些傳入的參數都沒有意義,因此清空
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
// 注意,返回的是負值
return result;
}
// 3. epoll 等待并處理事件(如果有事件到來)
result = pollInner(timeoutMillis);
}
}
從整體看,當 pollInner() 返回后,就會調用第一步和第二步來處理結果。
首先來看第一步,根據前面 監聽請求 的分析,當 Looper::addFd() 的參數 callback 為空時,Request.ident 的值才大于等于0。Looper::pollInner 只通過回調來處理請求,而對于那些沒有回調的請求呢?那就是在這里處理。而處理的方式是直接把元數據返回給調用者,那么意思就很明顯了,讓調用者自己處理。
再來看第二步,直接返回 Looper::pollInner() 的結果,并把參數清0。因為無論返回的什么結果,這些參數都沒有意義了,這一點請大家自己體會。
關于第一步和第二步,還有一點需要關注,第一步的返回值是正值,而第二步返回值是負值。
結束
本文對 Native 的 Looper 的主要函數進行分析,揭開了 Native 層消息機制的核心,但是目前我并不能給一個很好例子來理解本文的內容。需要大家在分析 Native 層源碼時慢慢體會。
可能有人會問,你為何不以 Java 層的消息機制為例來引出 Native 層的消息機制呢? 因為這樣廢話太多。
原文鏈接:https://juejin.cn/post/7044787749299159076
相關推薦
- 2022-12-26 C++內存分區模型超詳細講解_C 語言
- 2022-09-08 pandas時間序列之如何將int轉換成datetime格式_python
- 2022-11-19 Compose?動畫藝術之屬性動畫探索_Android
- 2022-06-12 Python語法學習之線程的創建與常用方法詳解_python
- 2023-04-02 使用C#連接SQL?Server的詳細圖文教程_C#教程
- 2022-10-01 Python類和對象基礎入門介紹_python
- 2022-08-15 Python包裝之對象處理_python
- 2022-12-13 pandas中merge()函數的用法解讀_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支