網(wǎng)站首頁 編程語言 正文
寫在前面:本文以Linux2.6.0的內(nèi)核源碼進(jìn)行講解,使用x86 32位機(jī)講解。
多路復(fù)用原理
講多路復(fù)用的原理,那么一定先要講沒有多路復(fù)用的弊端。傳統(tǒng)的阻塞式,進(jìn)程一旦io讀寫就開始阻塞,效率太低,導(dǎo)致整個(gè)系統(tǒng)的吞吐量急劇下降。那么,就有人要思考,居然要阻塞,那么我就使用線程去阻塞,那么一個(gè)系統(tǒng)的io操作如此之多,要多少個(gè)線程才能滿足呢?線程是不是會(huì)占用資源?當(dāng)線程多起來后分時(shí)復(fù)用的CPU需要一直切換上下文,切換上下文是不是在浪費(fèi)CPU資源呢,會(huì)導(dǎo)致大量的時(shí)間在切換上下文?
所以就引入了多路復(fù)用,把需要io讀寫的操作叫做一個(gè)事件(多個(gè)事件,所以叫多路),把這些io事件維護(hù)在一個(gè)隊(duì)列中,并且每個(gè)事件都綁定上一個(gè)回調(diào)函數(shù)(回調(diào)函數(shù)負(fù)責(zé)喚醒當(dāng)前阻塞的進(jìn)程),而這些事件的綁定回調(diào)函數(shù)和維護(hù)隊(duì)列的操作都是交給一個(gè)進(jìn)程來做(所以叫復(fù)用)。當(dāng)?shù)讓泳W(wǎng)卡沒有觸發(fā)讀寫事件的時(shí)候,這個(gè)進(jìn)程就去阻塞等待(讓出CPU的使用權(quán)),當(dāng)?shù)讓泳W(wǎng)卡建立好連接后會(huì)發(fā)出中斷信號(hào),此時(shí)中斷處理函數(shù)就會(huì)回調(diào)事件對(duì)應(yīng)的回調(diào)函數(shù),而回調(diào)函數(shù)就是讓進(jìn)程喚醒,喚醒后,進(jìn)程會(huì)去遍歷所有事件,得到那個(gè)準(zhǔn)備好了的事件,最終把所有事件返回給用戶態(tài)(所以用戶態(tài)還需要遍歷才能得到準(zhǔn)備好的事件)。
考慮到,文字描述比較難懂,特意畫了一張流程圖幫助讀者來理解(盡力了...)
此多路復(fù)用只是內(nèi)核中select的流程,poll和epoll的流程略有區(qū)別,poll和epoll后續(xù)會(huì)有文章來細(xì)講。
?
?
select系統(tǒng)調(diào)用源碼講解:
如何從用戶態(tài)進(jìn)入內(nèi)核態(tài),執(zhí)行系統(tǒng)調(diào)用的流程,這里就不進(jìn)行講解,那么直接講解sys_select的方法。? ? ? ?
其次,在Linux內(nèi)核中,VFS萬物皆文件的思想,一言以蔽之:不管是普通文件,網(wǎng)絡(luò)、外設(shè)設(shè)備等等都是抽象成一個(gè)文件,而文件在內(nèi)核中的實(shí)現(xiàn)file結(jié)構(gòu)體返回給用戶態(tài)不安全,所以都是返回的fd下標(biāo)給用戶態(tài)。
sys_select源碼:
// n是事件數(shù)量
// inp是輸入事件的位圖
// outp是輸出事件的位圖
// exp是不感興趣事件的位圖
// tvp是計(jì)時(shí)器
asmlinkage long
sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp)
{
fd_set_bits fds;
char *bits;
long timeout;
int ret, size, max_fdset;
timeout = MAX_SCHEDULE_TIMEOUT;
if (tvp) {
time_t sec, usec;
if ((ret = verify_area(VERIFY_READ, tvp, sizeof(*tvp)))
|| (ret = __get_user(sec, &tvp->tv_sec))
|| (ret = __get_user(usec, &tvp->tv_usec)))
goto out_nofds;
ret = -EINVAL;
if (sec < 0 || usec < 0)
goto out_nofds;
if ((unsigned long) sec < MAX_SELECT_SECONDS) {
timeout = ROUND_UP(usec, 1000000/HZ);
timeout += sec * (unsigned long) HZ;
}
}
ret = -EINVAL;
if (n < 0)
goto out_nofds;
/* max_fdset can increase, so grab it once to avoid race */
max_fdset = current->·files->max_fdset;
if (n > max_fdset)
n = max_fdset;
/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
ret = -ENOMEM;
// 根據(jù)n的數(shù)量,算出需要多少字節(jié)(也就是多少位)
size = FDS_BYTES(n);
// 開辟4 * 6的大小,用來存放fd_set_bits fds;
bits = select_bits_alloc(size);
if (!bits)
goto out_nofds;
// 推指針
fds.in = (unsigned long *) bits;
fds.out = (unsigned long *) (bits + size);
fds.ex = (unsigned long *) (bits + 2*size);
fds.res_in = (unsigned long *) (bits + 3*size);
fds.res_out = (unsigned long *) (bits + 4*size);
fds.res_ex = (unsigned long *) (bits + 5*size);
// 把用戶態(tài)的數(shù)據(jù)拷貝到內(nèi)核態(tài),因?yàn)橛脩魬B(tài)的數(shù)據(jù)不可信。
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
// 清空返回給用戶態(tài)的位圖
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
// 調(diào)用do_select執(zhí)行具體邏輯
ret = do_select(n, &fds, &timeout);
if (tvp && !(current->personality & STICKY_TIMEOUTS)) {
time_t sec = 0, usec = 0;
if (timeout) {
sec = timeout / HZ;
usec = timeout % HZ;
usec *= (1000000/HZ);
}
put_user(sec, &tvp->tv_sec);
put_user(usec, &tvp->tv_usec);
}
if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
// 把內(nèi)核態(tài)返回?cái)?shù)據(jù)的內(nèi)容拷貝到用戶態(tài).
set_fd_set(n, inp, fds.res_in);
set_fd_set(n, outp, fds.res_out);
set_fd_set(n, exp, fds.res_ex);
out:
select_bits_free(bits, size);
out_nofds:
return ret;
}
方法參數(shù):
????????// n是事件數(shù)量
????????// inp是輸入事件的位圖
????????// outp是輸出事件的位圖
????????// exp是不感興趣事件的位圖(如果是不感興趣的事件,就會(huì)被忽略)
????????// tvp是計(jì)時(shí)器fd_set __user:
fd的位圖形式,用一個(gè)bit位來代表一個(gè)fd,這樣非常非常節(jié)省空間。__user就代表是用戶態(tài)傳來的數(shù)據(jù).
這段代碼并不復(fù)雜,這里根據(jù)事件的數(shù)量,得出內(nèi)核要開辟多少個(gè)位圖大小來存放用戶態(tài)傳過來的數(shù)據(jù),大部分的操作都是把用戶態(tài)的內(nèi)容拷貝到內(nèi)核態(tài),再把內(nèi)核態(tài)的數(shù)據(jù)拷貝到用戶態(tài)。(為什么要把用戶態(tài)的數(shù)據(jù)給拷貝到內(nèi)核態(tài)呢?思考一個(gè)問題,這里是傳入的指針,指針具體指向的數(shù)據(jù)在用戶態(tài),而用戶態(tài)可能會(huì)修改數(shù)據(jù),導(dǎo)致數(shù)據(jù)不安全,所以,對(duì)于內(nèi)核態(tài)來說,用戶態(tài)的數(shù)據(jù)不可信,所以就需要進(jìn)行拷貝)
do_select:
把用戶態(tài)的數(shù)據(jù)進(jìn)行拷貝以后,進(jìn)入到do_select方法中進(jìn)行具體的處理。
int do_select(int n, fd_set_bits *fds, long *timeout)
{
struct poll_wqueues table;
poll_table *wait;
int retval, i;
long __timeout = *timeout;
spin_lock(¤t->files->file_lock);
retval = max_select_fd(n, fds);
spin_unlock(¤t->files->file_lock);
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table);
wait = &table.pt;
if (!__timeout)
wait = NULL;
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
set_current_state(TASK_INTERRUPTIBLE);
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
struct file_operations *f_op = NULL;
struct file *file = NULL;
// 這里是先++ 再*
// 取數(shù)組的下一位。
in = *inp++; out = *outp++; ex = *exp++;
// 二進(jìn)制組合,最后還是為0,就代表這三個(gè)值都為0
all_bits = in | out | ex;
if (all_bits == 0) {
i += __NFDBITS;
continue;
}
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
// 這個(gè)退出條件,代表所有的n都已經(jīng)遍歷完了。
if (i >= n)
break;
if (!(bit & all_bits))
continue;
// 獲取到文件
file = fget(i);
if (file) {
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll)
// 這里面會(huì)把當(dāng)前fd和當(dāng)前current做綁定放入到一個(gè)隊(duì)列中等待
mask = (*f_op->poll)(file, retval ? NULL : wait);
fput(file);
// in 必須有bit這一位?
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
}
// out 必須有bit這一位?
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
// ex 必須有bit這一位?
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
}
// shit
wait = NULL;
if (retval || !__timeout || signal_pending(current))
break;
if(table.error) {
retval = table.error;
break;
}
__timeout = schedule_timeout(__timeout);
}
__set_current_state(TASK_RUNNING);
poll_freewait(&table);
/*
* Up-to-date the caller timeout.
*/
*timeout = __timeout;
return retval;
}
嗯,代碼量確實(shí)挺多的,多層for循環(huán)嵌套,但是沒關(guān)系,待筆者細(xì)細(xì)道來。
首先,我們得先要明白所有邏輯都在do_select方法中了,所以給事件掛回調(diào)的處理、隊(duì)列的處理都會(huì)在這里。
void poll_initwait(struct poll_wqueues *pwq)
{
?? ?init_poll_funcptr(&pwq->pt, __pollwait);
?? ?pwq->error = 0;
?? ?pwq->table = NULL;
}函數(shù)指針的賦值,把__pollwait函數(shù)掛在了poll_wqueues結(jié)構(gòu)體中poll_table pt屬性中。
隨之進(jìn)入到for循環(huán)中。
第一層for循環(huán):都把select稱為輪訓(xùn)式多路復(fù)用,這里體現(xiàn)的淋漓盡致,因?yàn)樗姥h(huán)的機(jī)制+schedule_timeout(__timeout)切換上下文的方式(讓出CPU的使用權(quán),直到有中斷喚醒)
第二層for循環(huán)和第三層for循環(huán):就是來遍歷位圖,因?yàn)槲粓D的一位代表一個(gè)fd,所以通過fd獲取到file結(jié)構(gòu)體。
拿到file結(jié)構(gòu)體后,VFS虛擬文件+函數(shù)指針的魅力又體現(xiàn)出來了,因?yàn)橥ㄟ^if (f_op && f_op->poll)?就可以得知當(dāng)前的文件系統(tǒng)是否實(shí)現(xiàn)了poll函數(shù)指針。因?yàn)閷?duì)于poll函數(shù)指針而言,普通的文件系統(tǒng)是沒有實(shí)現(xiàn)的,而socket套接字的文件系統(tǒng)是有實(shí)現(xiàn)的。而下文會(huì)仔細(xì)道來。而poll返回的mask標(biāo)志位是可以判斷是否有讀寫事件,存在讀寫事件的話,res_in |= bit就把返回的位圖對(duì)應(yīng)置位,并且retval++,后續(xù)就會(huì)break;退出所有循環(huán),回到sys_select中。
所以看到mask = (*f_op->poll)(file, retval ? NULL : wait);具體的函數(shù)實(shí)現(xiàn)。
poll函數(shù)指針socket的具體實(shí)現(xiàn)
?
?我們直接看到tcp的實(shí)現(xiàn),但是此方法不過細(xì)講,我們能明白他內(nèi)部回調(diào)了之前poll_initwait方法掛上的__pollwait回調(diào)函數(shù)即可。
并且此方法,會(huì)返回do_select方法需要的mask標(biāo)志位。也就是可以理解為,當(dāng)tcp完成三次握手,建立上連接以后,會(huì)根據(jù)tcp協(xié)議分析出讀寫標(biāo)志位(這里網(wǎng)絡(luò)棧,不過細(xì)講,能明白會(huì)建立連接后會(huì)得到do_select需要的標(biāo)志位即可)。
所以看到__pollwait的實(shí)現(xiàn)。
__pollwait回調(diào)函數(shù):
void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
{
// 騷操作獲取到結(jié)構(gòu)體的基址。
struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
//
struct poll_table_page *table = p->table;
// 隊(duì)列不存在,或者內(nèi)部元素為0.
// 也就是初始化的操作
if (!table || POLL_TABLE_FULL(table)) {
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
if (!new_table) {
p->error = -ENOMEM;
__set_current_state(TASK_RUNNING);
return;
}
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
/* Add a new entry */
// 放入到隊(duì)列。
{
struct poll_table_entry * entry = table->entry;
// 意思是poll_table_entry是一個(gè)連續(xù)的空間,數(shù)組?
table->entry = entry+1;
// 原子性加引用。代表被使用了。
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
// 初始化wait_queue_t wait;
// 把對(duì)應(yīng)的進(jìn)程結(jié)構(gòu)體、回調(diào)鉤子賦值
init_waitqueue_entry(&entry->wait, current);
// wait_address這個(gè)是sock維護(hù)的隊(duì)列
// &entry->wait這個(gè)是隊(duì)列中的元素
// 所以這是放入到隊(duì)列中
add_wait_queue(wait_address,&entry->wait);
}
}
此方法就是添加到等待隊(duì)列的具體實(shí)現(xiàn)。上面是初始化的操作,下面是添加到等待隊(duì)列的操作。
看到init_waitqueue_entry方法的實(shí)現(xiàn).
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
?? ?q->flags = 0;
?? ?q->task = p;
?? ?// 繼續(xù)放鉤子
?? ?// 被喚醒的鉤子。
?? ?q->func = default_wake_function;
}這里給等待隊(duì)列中的元素又掛上了default_wake_function鉤子回調(diào)函數(shù)。
看到add_wait_queue方法的實(shí)現(xiàn).
//?wait_queue_head_t *q 是當(dāng)前tcp的等待隊(duì)列。
// wait_queue_t * wait 是當(dāng)前文件對(duì)應(yīng)的等待隊(duì)列的元素。
void add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait)
{
?? ?unsigned long flags;?? ?wait->flags &= ~WQ_FLAG_EXCLUSIVE;
?? ?spin_lock_irqsave(&q->lock, flags);
?? ?__add_wait_queue(q, wait);
?? ?spin_unlock_irqrestore(&q->lock, flags);
}這里把當(dāng)前文件對(duì)應(yīng)的等待隊(duì)列的元素鏈到tcp的等待隊(duì)列,而等待隊(duì)列的元素中的func函數(shù)指針是掛上了default_wake_function鉤子。
等待底層網(wǎng)卡給CPU發(fā)出中斷信號(hào),CPU響應(yīng)中斷,中斷處理方法中去處理等待隊(duì)列。并且會(huì)回調(diào)default_wake_function鉤子(這里的論證太底層,偏硬件,筆者能力有限,筆者只能給出論據(jù),不能給出論證了....)
default_wake_function回調(diào)函數(shù):
int default_wake_function(wait_queue_t *curr, unsigned mode, int sync)
{
task_t *p = curr->task;
return try_to_wake_up(p, mode, sync);
}
這里就特別的簡(jiǎn)單了,把當(dāng)前進(jìn)程給喚醒。如何喚醒:把task_struct的state賦值為TASK_RUNNING,然后等待時(shí)鐘中斷的中斷處理函數(shù)調(diào)度此進(jìn)程。
然后當(dāng)前進(jìn)程蘇醒過來以后,又回到了do_select方法的schedule_timeout(__timeout); 中,此時(shí)又開始了新的一輪輪詢,而這輪輪詢是通過poll函數(shù)指針獲取到mask標(biāo)志位(因?yàn)楫?dāng)前中斷喚醒都是因?yàn)閠cp建立了連接通過default_wake_function把進(jìn)程給喚醒,所以已經(jīng)有讀寫事件的產(chǎn)生)然后通過mask判斷把對(duì)應(yīng)的位圖置位,然后返回給用戶態(tài)。
總結(jié):
select多路復(fù)用的弊端
- 文件大小的限制
- 多次輪訓(xùn)——內(nèi)核態(tài)的輪訓(xùn),返回的結(jié)果在用戶態(tài)還需要輪訓(xùn)
- 多次數(shù)據(jù)拷貝,不過是位圖占用空間不多。
最后,如果本帖對(duì)您有一定的幫助,希望能點(diǎn)贊+關(guān)注+收藏!您的支持是給我最大的動(dòng)力,后續(xù)會(huì)一直更新各種框架的使用和框架的源碼解讀~!
原文鏈接:https://blog.csdn.net/qq_43799161/article/details/126279876
相關(guān)推薦
- 2022-09-22 IO流技術(shù)中的File類
- 2022-10-10 C++私有繼承與EBO深入分析講解_C 語言
- 2022-04-16 WPF框架Prism中區(qū)域Region用法介紹_實(shí)用技巧
- 2022-09-29 OpenCV中Grabcut算法的具體使用_C 語言
- 2022-06-23 python基礎(chǔ)之while循環(huán)、for循環(huán)詳解及舉例_python
- 2022-03-20 C語言輸出任意邊長(zhǎng)的菱形(用c語言輸出一個(gè)菱形)
- 2022-09-15 Go位集合相關(guān)操作bitset庫(kù)安裝使用_Golang
- 2022-07-11 lambda表達(dá)式和Stream
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支