網(wǎng)站首頁 編程語言 正文
引言
在分析底層源碼時,時不時會碰到 Looper::wake() 或者 Looper::pollOnce() 這樣的代碼,之前大概知道是 Native 層的消息循環(huán)機(jī)制。為了以后我也能夠使用它,我決定還是徹底分析一遍源碼。
本文只涉及一個文件,路徑如下
system/core/libutils/Looper.cpp
Looper的創(chuàng)建
在 Java 層,有一個線程的子類 HandlerThread,它可以創(chuàng)建一個線程,并且使用消息機(jī)制,其中的關(guān)鍵兩步是 Looper::prepare() 和 Looper::loop()。
Looper::prepare() 會創(chuàng)建一個與線程綁定的 Looper 對象,這個綁定是通過 ThreadLocal實(shí)現(xiàn)的,而 Looper::loop() 會讓線程進(jìn)入無限循環(huán)來處理消息。
在 Native 層,也有一個 Looper 類,可以通過 Looper::prepare() 來創(chuàng)建 Looper 對象,代碼如下
sp<Looper> Looper::prepare(int opts) {
// opts決定Looper::addFd()的參數(shù)callback是否可以為空
bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS;
// 通過pthread_getspecific()獲取線程本地存儲中的Looper對象
sp<Looper> looper = Looper::getForThread();
if (looper == nullptr) {
looper = new Looper(allowNonCallbacks);
// 通過pthread_setspecific()把Looper對象保存到線程本地存儲中
Looper::setForThread(looper);
}
return looper;
}
Native 層的線程在首次調(diào)用 Looper::prepare() 時,會創(chuàng)建 Looper 對象,并通過 pthread_setspecific() 把它保存到線程本地存儲中,后面再獲取 Looper 對象時,通過 pthread_getspecific() 從線程本地存儲中獲取。
這不就是 Java 的 ThreadLocal 的功能嗎?
現(xiàn)在讓我們來看下 Looper 對象的創(chuàng)建過程
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 1. 創(chuàng)建 eventfd 對象,用于喚醒 Looper
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
AutoMutex _l(mLock);
// 2. 創(chuàng)建 epoll 對象,并監(jiān)聽剛才創(chuàng)建的 eventfd 的輸入事件
rebuildEpollLocked();
}
首先創(chuàng)建了一個eventfd 對象,由 mWakeEventFd 代表,它用于喚醒 Looper 。如何喚醒呢? 繼續(xù)往下看。
然后調(diào)用 rebuildEpollLocked() 創(chuàng)建一個 epoll 對象,并監(jiān)聽剛才創(chuàng)建的 mWakeEventFd 的 I/O 事件,代碼如下
void Looper::rebuildEpollLocked() {
// ...
// 創(chuàng)建epoll對象
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
// 監(jiān)聽 mWakeEventFd 輸入事件
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
for (size_t i = 0; i < mRequests.size(); i++) {
// ...
}
}
epoll 對象監(jiān)聽了 mWakeEventFd 的可讀事件。在后面的分析中,我們將看到,當(dāng) Looper 開始輪詢時,會調(diào)用 epoll_wait() 阻塞地等待事件,那么有人向 mWakeEventFd 寫入數(shù)據(jù)時,epoll_wait() 將返回,那么 Looper 就被喚醒。
發(fā)送消息與監(jiān)聽請求
Native 層的 Looper 可以處理兩種類型的事件,一種是消息( Message ),另一種是請求( Request )。下面我們來看看如何向 Looper 發(fā)送消息,如何讓 Looper 監(jiān)聽請求。
發(fā)送消息
通過 Looper::sendMessageXXX() 這一類函數(shù),可以向 Looper 發(fā)送消息
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ // acquire lock
// 通過鎖,可以保存 mMessageEnvelopes 的線程安全
AutoMutex _l(mLock);
// 獲取消息要插入的位置
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
// 1. 把信息保存到 mMessageEnvelopes 中
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// mSendingMessage 表明 Looper 正在處理消息,因此不用喚醒Looper
if (mSendingMessage) {
return;
}
} // release lock
// 2. 如有必要,就喚醒Looper
if (i == 0) {
wake();
}
}
當(dāng) Looper 接收到消息時,它會把消息保存到 mMessageEnvelopes 容器中,并且如果有必要,那么會調(diào)用 Looper::wake() 喚醒 Looper 來處理消息。
前面我們大概地說明了下如何通過 mWakeEventFd 這個 eventfd 對象喚醒 Looper,現(xiàn)在讓我們來看下喚醒是如何實(shí)現(xiàn)的
void Looper::wake() {
uint64_t inc = 1;
// 向 mWakeFd 中寫入數(shù)據(jù)
// TEMP_FAILURE_RETRY 是一個重試機(jī)制,確保不會因?yàn)橄到y(tǒng)中斷而導(dǎo)致數(shù)據(jù)沒有寫入
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
// 確保寫入的是 unsigned 64-bit 的數(shù)據(jù)
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
原來就是向 mWakeEventFd 寫數(shù)據(jù)。在后面我們將看到,當(dāng) Looper 進(jìn)入輪詢時, 當(dāng)epoll_wait() 檢測到 mWakeEventFd 有數(shù)據(jù)可讀時,就會從阻塞中醒來,從而達(dá)到 mWakeEventFd 喚醒 Looper 的目的。
監(jiān)聽請求
現(xiàn)在我們來看下如何讓 Looper 監(jiān)聽請求,它是通過 Looper::addFd() 實(shí)現(xiàn)的
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
// 1. 確認(rèn) ident 的值
if (!callback.get()) { // 回調(diào)為空
if (! mAllowNonCallbacks) { // mAllowNonCallbacks是在創(chuàng)建Looper時初始化的
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) { // 回調(diào)為空,ident的值不能小于0
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else { // 回調(diào)不為空
// POLL_CALLBACK值為-2,表示請求通過回調(diào)處理請求
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
// 2. 包裝成一個 Request
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
// 3. 用 epoll 對象監(jiān)聽 fd 的事件,并把請求保存到 mRequests 中
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult < 0) {
// ...
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
Looper::addFd() 的實(shí)質(zhì)就是用 epoll 對象監(jiān)聽指定 fd 的 I/O 事件。為何我把這一過程稱之為 監(jiān)聽請求 呢 ? 因?yàn)檫@個函數(shù)把它的參數(shù)包裝成一個 Request 對象,并保存到 mRequests 容器 中。
Looper 進(jìn)行輪詢時,epoll_wait() 會阻塞,當(dāng) fd 指向的文件有 I/O 事件時,epoll_wait() 將會獲取到 fd 的可讀事件,因此 Looper 會被喚醒。
當(dāng) Looper 檢測到有請求到來時,一般是通過回調(diào)處理的,也就是這里的參數(shù) callback。當(dāng)然,也可以不設(shè)置回調(diào),當(dāng)有請求到來時,交給外部的調(diào)用者去處理,我們將會在后面看到。
根據(jù)以上知識,我們來理解 Looper::addFd() 的第一步。當(dāng) callback 參數(shù)為空時,ident 必須大于或等于0,否則為 POLL_CALLBACK,注意,這的值是 -2。 因此呢,當(dāng)我們檢測到一個請求的 ident 大于或等于0時,這個請求肯定不是通過回調(diào)處理的。這一點(diǎn)非常重要,我們將會在后面用到。
Looper 處理消息或請求
Native 層的 Looper 是通過 Looper::pollOnce() 或 Looper::pollAll() 來統(tǒng)一處理消息和請求的,我們挑 Looper::pollOnce() 這個函數(shù)來分析下
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) { // 無限循環(huán)
// 1. 處理那些不是通過callback處理的請求
while (mResponseIndex < mResponses.size()) {
// ...
}
// 2. 處理pollInner()輪詢的結(jié)果
if (result != 0) {
// ...
}
// 3. epoll 等待并處理事件(如果有事件到來)
result = pollInner(timeoutMillis);
}
}
當(dāng)首次調(diào)用 Looper::pollOnce() 時,第一步和第二步肯定不會發(fā)生,那么我們先來看下第三步,這個函數(shù)比較長,我們一步步解析
int Looper::pollInner(int timeoutMillis) {
// 省略計(jì)算 timeoutMillis 的代碼
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 1. 等待事件
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
}
第一步,通過 epoll_wait() 阻塞地等待它監(jiān)聽的 fd 的 I/O 就緒, 此時 Looper 進(jìn)入休眠。
那么怎么喚醒 Looper 呢? 根據(jù)前面的分析,epoll 對象監(jiān)聽了 mWakeEventFd 以及 通過 Looper::addFd() 添加的 fd。 那么向這些被監(jiān)聽的 fd 寫入數(shù)據(jù),就可以喚醒 Looper。例如,Looper::wake() 就是通過向 mWakeEventFd 寫入數(shù)據(jù)來喚醒 Looper。
那么現(xiàn)在我們來看下 Looper 被喚醒后的處理流程
int Looper::pollInner(int timeoutMillis) {
// ...
// 1. 等待I/O就緒
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
// 2. 處理 epoll 事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
// 2.1 處理被消息喚醒的情況
if (epollEvents & EPOLLIN) {
// 清理eventfd中的數(shù)據(jù)
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 2.2 處理被請求喚醒的情況
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
// 根據(jù)epoll觸發(fā)的事件類型,填充events相應(yīng)的位
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 創(chuàng)建Response對象,保存兩個參數(shù),然后把Response對象保存到mResponses中
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
}
當(dāng) mWakeEventFd 的 I/O 就緒,就會走到2.1步,之后會讀取 mWakeEventFd 中的數(shù)據(jù),讀取的數(shù)據(jù)并沒有什么用,只是清理數(shù)據(jù)而已。而這一步,大部分情況 是由于消息的到來,而極少情況是并不是因?yàn)橄⒌牡絹恚且驗(yàn)榫€程有緊急事情需要處理,所以必須要喚醒。
當(dāng)通過Looper::addFd() 添加的 fd 就緒時,就會走到 2.2 步,這一步一定是因?yàn)檎埱蟮絹砹恕K鼤?chuàng)建 Reponse 對象,并保存,代碼如下
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
這里我們要注意下,mResponses 容器表示待處理請求的集合,這些請求會在后面處理,讓我們接著往下看。
int Looper::pollInner(int timeoutMillis) {
// ...
// 1. 等待I/O就緒
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
// 2. 處理 epoll 事件
for (int i = 0; i < eventCount; i++) {
// ...
}
Done: ;
mNextMessageUptime = LLONG_MAX;
// 3. 處理消息
while (mMessageEnvelopes.size() != 0) { // 循環(huán)處理消息
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
// 3.1 取出隊(duì)頭消息
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// 3.2 隊(duì)頭消息處理的時間點(diǎn)小于當(dāng)前時間點(diǎn),表示要立即處理消息
{
// 獲取handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
// 消息交給handler處理
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
// POLL_CALLBACK 表示消息被回調(diào)處理
result = POLL_CALLBACK;
} else {
// 3.2 隊(duì)頭的消息處理的時間點(diǎn)大于當(dāng)前時間,表示還沒有到處理的時間點(diǎn),就退出處理消息的循環(huán)
// mNextMessageUptime 表示下一個消息要處理的時間點(diǎn),當(dāng)通過break退出循環(huán)后,
// 在外層的下一次循調(diào)用pollInner()時,會通過 mNextMessageUptime 計(jì)算 epoll_wait 的超時時間
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
}
根據(jù)前面分析的消息發(fā)送的過程,消息保存在 mMessageEnvelopes 中。那么這里的第三步,很明顯是在處理消息。通過循環(huán),不斷取出消息,然后把消息的 messageEnvelope.uptime 與當(dāng)前時間進(jìn)行比較,如果小于當(dāng)前時間,就證明要立馬處理消息了,否則這些消息只能在下一次輪詢中再處理。
處理完了消息,現(xiàn)在來處理請求
int Looper::pollInner(int timeoutMillis) {
// ...
// 1. 等待事件
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// ...
// 2. 處理 epoll 事件
for (int i = 0; i < eventCount; i++) {
// ...
}
Done: ;
// 3. 處理消息
while (mMessageEnvelopes.size() != 0) { // 循環(huán)處理消息
// ...
}
// 4. 循環(huán)處理請求
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// 檢測請求是否通過回調(diào)處理
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
// 表明消息被回調(diào)處理了
result = POLL_CALLBACK;
}
}
// 返回結(jié)果
return result;
}
剛剛我們還提到,mResponse 中保存了待處理的請求。現(xiàn)在通過循環(huán),不斷取出請求來處理。處理請求有一個條件,那就是請求必須有回調(diào),否則不處理。 再回顧前面分析 監(jiān)聽請求 的代碼,當(dāng)Looper::addFd() 的參數(shù) callback 不為空時,Request.ident 的值為 POLL_CALLBACK,表明請求需要通過回調(diào)處理。
Looper::pollInner() 函數(shù)分析完畢,現(xiàn)在再回到 Looper::pollOnce()
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) { // 無限循環(huán)
// 1. 處理那些不是通過callback處理的請求
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
// 從Looper::addFd()分析可知,只有當(dāng)callback為空的情況下,ident的值>=0,否則為POLL_CALLBACK(-2)
// 因此,這里處理的是那些沒有通過callback處理的請求
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 因?yàn)長ooper無法通過callback處理,所以把這些元數(shù)據(jù)交給調(diào)用者處理
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
// 注意,這里返回的值大于0
return ident;
}
}
// 2. 處理pollInner()輪詢的結(jié)果
// result的值有很多種,但是都為負(fù)數(shù)(注意,上面處理那些不是通過callback處理的請求,返回正值)
// 1. POLL_WAKE(-1): 表示epoll_wait()是被eventfd喚醒的
// 1. POLL_ERROR(-4): 表示epoll_wait()出錯
// 2. POLL_TIMEOUT(-3) : 表示epoll_wait()超時
// 3. POLL_CALLBACK(-2) : 表示消息或請求是通過回調(diào)處理的
if (result != 0) {
// 消息或事件無論是否被callback處理,這些傳入的參數(shù)都沒有意義,因此清空
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
// 注意,返回的是負(fù)值
return result;
}
// 3. epoll 等待并處理事件(如果有事件到來)
result = pollInner(timeoutMillis);
}
}
從整體看,當(dāng) pollInner() 返回后,就會調(diào)用第一步和第二步來處理結(jié)果。
首先來看第一步,根據(jù)前面 監(jiān)聽請求 的分析,當(dāng) Looper::addFd() 的參數(shù) callback 為空時,Request.ident 的值才大于等于0。Looper::pollInner 只通過回調(diào)來處理請求,而對于那些沒有回調(diào)的請求呢?那就是在這里處理。而處理的方式是直接把元數(shù)據(jù)返回給調(diào)用者,那么意思就很明顯了,讓調(diào)用者自己處理。
再來看第二步,直接返回 Looper::pollInner() 的結(jié)果,并把參數(shù)清0。因?yàn)闊o論返回的什么結(jié)果,這些參數(shù)都沒有意義了,這一點(diǎn)請大家自己體會。
關(guān)于第一步和第二步,還有一點(diǎn)需要關(guān)注,第一步的返回值是正值,而第二步返回值是負(fù)值。
結(jié)束
本文對 Native 的 Looper 的主要函數(shù)進(jìn)行分析,揭開了 Native 層消息機(jī)制的核心,但是目前我并不能給一個很好例子來理解本文的內(nèi)容。需要大家在分析 Native 層源碼時慢慢體會。
可能有人會問,你為何不以 Java 層的消息機(jī)制為例來引出 Native 層的消息機(jī)制呢? 因?yàn)檫@樣廢話太多。
原文鏈接:https://juejin.cn/post/7044787749299159076
相關(guān)推薦
- 2023-03-18 k8s?service?nodePort無法訪問的問題解決_云其它
- 2022-04-23 Python中ini配置文件讀寫的實(shí)現(xiàn)_python
- 2022-11-20 利用Go語言快速實(shí)現(xiàn)一個極簡任務(wù)調(diào)度系統(tǒng)_Golang
- 2022-03-27 Python中Tkinter布局管理grid的使用_python
- 2022-08-25 python數(shù)學(xué)建模之三大模型與十大常用算法詳情_python
- 2022-11-16 注意import和from?import?的區(qū)別及說明_python
- 2022-08-10 C#多線程之線程綁定ThreadLocal類_C#教程
- 2022-07-30 Linux常見命令-搜索查詢類,find 查找文件或者目錄,locate 快速定位文件路徑,grep
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支