日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

深入理解Linux內(nèi)核select多路復(fù)用原理

作者:程序員李哈 更新時(shí)間: 2022-08-13 編程語言

寫在前面:本文以Linux2.6.0的內(nèi)核源碼進(jìn)行講解,使用x86 32位機(jī)講解。

多路復(fù)用原理

講多路復(fù)用的原理,那么一定先要講沒有多路復(fù)用的弊端。傳統(tǒng)的阻塞式,進(jìn)程一旦io讀寫就開始阻塞,效率太低,導(dǎo)致整個(gè)系統(tǒng)的吞吐量急劇下降。那么,就有人要思考,居然要阻塞,那么我就使用線程去阻塞,那么一個(gè)系統(tǒng)的io操作如此之多,要多少個(gè)線程才能滿足呢?線程是不是會(huì)占用資源?當(dāng)線程多起來后分時(shí)復(fù)用的CPU需要一直切換上下文,切換上下文是不是在浪費(fèi)CPU資源呢,會(huì)導(dǎo)致大量的時(shí)間在切換上下文?

所以就引入了多路復(fù)用,把需要io讀寫的操作叫做一個(gè)事件(多個(gè)事件,所以叫多路),把這些io事件維護(hù)在一個(gè)隊(duì)列中,并且每個(gè)事件都綁定上一個(gè)回調(diào)函數(shù)(回調(diào)函數(shù)負(fù)責(zé)喚醒當(dāng)前阻塞的進(jìn)程),而這些事件的綁定回調(diào)函數(shù)和維護(hù)隊(duì)列的操作都是交給一個(gè)進(jìn)程來做(所以叫復(fù)用)。當(dāng)?shù)讓泳W(wǎng)卡沒有觸發(fā)讀寫事件的時(shí)候,這個(gè)進(jìn)程就去阻塞等待(讓出CPU的使用權(quán)),當(dāng)?shù)讓泳W(wǎng)卡建立好連接后會(huì)發(fā)出中斷信號(hào),此時(shí)中斷處理函數(shù)就會(huì)回調(diào)事件對(duì)應(yīng)的回調(diào)函數(shù),而回調(diào)函數(shù)就是讓進(jìn)程喚醒,喚醒后,進(jìn)程會(huì)去遍歷所有事件,得到那個(gè)準(zhǔn)備好了的事件,最終把所有事件返回給用戶態(tài)(所以用戶態(tài)還需要遍歷才能得到準(zhǔn)備好的事件)。

考慮到,文字描述比較難懂,特意畫了一張流程圖幫助讀者來理解(盡力了...)

此多路復(fù)用只是內(nèi)核中select的流程,poll和epoll的流程略有區(qū)別,poll和epoll后續(xù)會(huì)有文章來細(xì)講。

?

?

select系統(tǒng)調(diào)用源碼講解:

如何從用戶態(tài)進(jìn)入內(nèi)核態(tài),執(zhí)行系統(tǒng)調(diào)用的流程,這里就不進(jìn)行講解,那么直接講解sys_select的方法。? ? ? ?

其次,在Linux內(nèi)核中,VFS萬物皆文件的思想,一言以蔽之:不管是普通文件,網(wǎng)絡(luò)、外設(shè)設(shè)備等等都是抽象成一個(gè)文件,而文件在內(nèi)核中的實(shí)現(xiàn)file結(jié)構(gòu)體返回給用戶態(tài)不安全,所以都是返回的fd下標(biāo)給用戶態(tài)。

sys_select源碼:

// n是事件數(shù)量
// inp是輸入事件的位圖
// outp是輸出事件的位圖
// exp是不感興趣事件的位圖
// tvp是計(jì)時(shí)器
asmlinkage long
sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp)
{
	fd_set_bits fds;
	char *bits;
	long timeout;
	int ret, size, max_fdset;

	timeout = MAX_SCHEDULE_TIMEOUT;
	if (tvp) {
		time_t sec, usec;

		if ((ret = verify_area(VERIFY_READ, tvp, sizeof(*tvp)))
		    || (ret = __get_user(sec, &tvp->tv_sec))
		    || (ret = __get_user(usec, &tvp->tv_usec)))
			goto out_nofds;

		ret = -EINVAL;
		if (sec < 0 || usec < 0)
			goto out_nofds;

		if ((unsigned long) sec < MAX_SELECT_SECONDS) {
			timeout = ROUND_UP(usec, 1000000/HZ);
			timeout += sec * (unsigned long) HZ;
		}
	}

	ret = -EINVAL;
	if (n < 0)
		goto out_nofds;

	/* max_fdset can increase, so grab it once to avoid race */
	max_fdset = current->·files->max_fdset;
	if (n > max_fdset)
		n = max_fdset;

	/*
	 * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
	 * since we used fdset we need to allocate memory in units of
	 * long-words. 
	 */
	ret = -ENOMEM;

	// 根據(jù)n的數(shù)量,算出需要多少字節(jié)(也就是多少位)
	size = FDS_BYTES(n);

	// 開辟4 * 6的大小,用來存放fd_set_bits fds;
	bits = select_bits_alloc(size);
	
	if (!bits)
		goto out_nofds;

	// 推指針
	fds.in      = (unsigned long *)  bits;
	fds.out     = (unsigned long *) (bits +   size);
	fds.ex      = (unsigned long *) (bits + 2*size);
	fds.res_in  = (unsigned long *) (bits + 3*size);
	fds.res_out = (unsigned long *) (bits + 4*size);
	fds.res_ex  = (unsigned long *) (bits + 5*size);

	// 把用戶態(tài)的數(shù)據(jù)拷貝到內(nèi)核態(tài),因?yàn)橛脩魬B(tài)的數(shù)據(jù)不可信。
	if ((ret = get_fd_set(n, inp, fds.in)) ||
	    (ret = get_fd_set(n, outp, fds.out)) ||
	    (ret = get_fd_set(n, exp, fds.ex)))
		goto out;

    // 清空返回給用戶態(tài)的位圖
	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);

    // 調(diào)用do_select執(zhí)行具體邏輯
	ret = do_select(n, &fds, &timeout);

	if (tvp && !(current->personality & STICKY_TIMEOUTS)) {
		time_t sec = 0, usec = 0;
		if (timeout) {
			sec = timeout / HZ;
			usec = timeout % HZ;
			usec *= (1000000/HZ);
		}
		put_user(sec, &tvp->tv_sec);
		put_user(usec, &tvp->tv_usec);
	}

	if (ret < 0)
		goto out;
	if (!ret) {
		ret = -ERESTARTNOHAND;
		if (signal_pending(current))
			goto out;
		ret = 0;
	}

    // 把內(nèi)核態(tài)返回?cái)?shù)據(jù)的內(nèi)容拷貝到用戶態(tài).
	set_fd_set(n, inp, fds.res_in);
	set_fd_set(n, outp, fds.res_out);
	set_fd_set(n, exp, fds.res_ex);

out:
	select_bits_free(bits, size);
out_nofds:
	return ret;
}

方法參數(shù):

????????// n是事件數(shù)量
????????// inp是輸入事件的位圖
????????// outp是輸出事件的位圖
????????// exp是不感興趣事件的位圖(如果是不感興趣的事件,就會(huì)被忽略)
????????// tvp是計(jì)時(shí)器

fd_set __user:

fd的位圖形式,用一個(gè)bit位來代表一個(gè)fd,這樣非常非常節(jié)省空間。__user就代表是用戶態(tài)傳來的數(shù)據(jù).

這段代碼并不復(fù)雜,這里根據(jù)事件的數(shù)量,得出內(nèi)核要開辟多少個(gè)位圖大小來存放用戶態(tài)傳過來的數(shù)據(jù),大部分的操作都是把用戶態(tài)的內(nèi)容拷貝到內(nèi)核態(tài),再把內(nèi)核態(tài)的數(shù)據(jù)拷貝到用戶態(tài)。(為什么要把用戶態(tài)的數(shù)據(jù)給拷貝到內(nèi)核態(tài)呢?思考一個(gè)問題,這里是傳入的指針,指針具體指向的數(shù)據(jù)在用戶態(tài),而用戶態(tài)可能會(huì)修改數(shù)據(jù),導(dǎo)致數(shù)據(jù)不安全,所以,對(duì)于內(nèi)核態(tài)來說,用戶態(tài)的數(shù)據(jù)不可信,所以就需要進(jìn)行拷貝

do_select:

把用戶態(tài)的數(shù)據(jù)進(jìn)行拷貝以后,進(jìn)入到do_select方法中進(jìn)行具體的處理。

int do_select(int n, fd_set_bits *fds, long *timeout)
{
	struct poll_wqueues table;
	poll_table *wait;
	int retval, i;
	long __timeout = *timeout;

 	spin_lock(&current->files->file_lock);
	retval = max_select_fd(n, fds);
	spin_unlock(&current->files->file_lock);

	if (retval < 0)
		return retval;
	n = retval;

	poll_initwait(&table);
	wait = &table.pt;
	if (!__timeout)
		wait = NULL;
	retval = 0;
	for (;;) {
		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;

		set_current_state(TASK_INTERRUPTIBLE);

		inp = fds->in; outp = fds->out; exp = fds->ex;
		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;
			struct file_operations *f_op = NULL;
			struct file *file = NULL;

			// 這里是先++ 再*
			// 取數(shù)組的下一位。
			in = *inp++; out = *outp++; ex = *exp++;

			// 二進(jìn)制組合,最后還是為0,就代表這三個(gè)值都為0
			all_bits = in | out | ex;
			if (all_bits == 0) {
				i += __NFDBITS;
				continue;
			}

			for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {

				// 這個(gè)退出條件,代表所有的n都已經(jīng)遍歷完了。
				if (i >= n)
					break;
				if (!(bit & all_bits))
					continue;

				// 獲取到文件
				file = fget(i);
				
				if (file) {
					f_op = file->f_op;
					mask = DEFAULT_POLLMASK;
					if (f_op && f_op->poll)
						//  這里面會(huì)把當(dāng)前fd和當(dāng)前current做綁定放入到一個(gè)隊(duì)列中等待
						mask = (*f_op->poll)(file, retval ? NULL : wait);
					fput(file);

					// in 必須有bit這一位?
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit;
						retval++;
					}

					// out 必須有bit這一位?
					if ((mask & POLLOUT_SET) && (out & bit)) {
						res_out |= bit;
						retval++;
					}

					// ex 必須有bit這一位?
					if ((mask & POLLEX_SET) && (ex & bit)) {
						res_ex |= bit;
						retval++;
					}
				}
			}
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
		}
		// shit
		wait = NULL;
		if (retval || !__timeout || signal_pending(current))
			break;
		if(table.error) {
			retval = table.error;
			break;
		}
		__timeout = schedule_timeout(__timeout);
	}
	__set_current_state(TASK_RUNNING);


	poll_freewait(&table);

	/*
	 * Up-to-date the caller timeout.
	 */
	*timeout = __timeout;
	return retval;
}

嗯,代碼量確實(shí)挺多的,多層for循環(huán)嵌套,但是沒關(guān)系,待筆者細(xì)細(xì)道來。

首先,我們得先要明白所有邏輯都在do_select方法中了,所以給事件掛回調(diào)的處理、隊(duì)列的處理都會(huì)在這里。

void poll_initwait(struct poll_wqueues *pwq)
{
?? ?init_poll_funcptr(&pwq->pt, __pollwait);
?? ?pwq->error = 0;
?? ?pwq->table = NULL;
}

函數(shù)指針的賦值,把__pollwait函數(shù)掛在了poll_wqueues結(jié)構(gòu)體中poll_table pt屬性中。

隨之進(jìn)入到for循環(huán)中。

第一層for循環(huán):都把select稱為輪訓(xùn)式多路復(fù)用,這里體現(xiàn)的淋漓盡致,因?yàn)樗姥h(huán)的機(jī)制+schedule_timeout(__timeout)切換上下文的方式(讓出CPU的使用權(quán),直到有中斷喚醒)

第二層for循環(huán)和第三層for循環(huán):就是來遍歷位圖,因?yàn)槲粓D的一位代表一個(gè)fd,所以通過fd獲取到file結(jié)構(gòu)體。

拿到file結(jié)構(gòu)體后,VFS虛擬文件+函數(shù)指針的魅力又體現(xiàn)出來了,因?yàn)橥ㄟ^if (f_op && f_op->poll)?就可以得知當(dāng)前的文件系統(tǒng)是否實(shí)現(xiàn)了poll函數(shù)指針。因?yàn)閷?duì)于poll函數(shù)指針而言,普通的文件系統(tǒng)是沒有實(shí)現(xiàn)的,而socket套接字的文件系統(tǒng)是有實(shí)現(xiàn)的。而下文會(huì)仔細(xì)道來。而poll返回的mask標(biāo)志位是可以判斷是否有讀寫事件,存在讀寫事件的話,res_in |= bit就把返回的位圖對(duì)應(yīng)置位,并且retval++,后續(xù)就會(huì)break;退出所有循環(huán),回到sys_select中。

所以看到mask = (*f_op->poll)(file, retval ? NULL : wait);具體的函數(shù)實(shí)現(xiàn)。

poll函數(shù)指針socket的具體實(shí)現(xiàn)

?

?我們直接看到tcp的實(shí)現(xiàn),但是此方法不過細(xì)講,我們能明白他內(nèi)部回調(diào)了之前poll_initwait方法掛上的__pollwait回調(diào)函數(shù)即可。

并且此方法,會(huì)返回do_select方法需要的mask標(biāo)志位。也就是可以理解為,當(dāng)tcp完成三次握手,建立上連接以后,會(huì)根據(jù)tcp協(xié)議分析出讀寫標(biāo)志位(這里網(wǎng)絡(luò)棧,不過細(xì)講,能明白會(huì)建立連接后會(huì)得到do_select需要的標(biāo)志位即可)。

所以看到__pollwait的實(shí)現(xiàn)。

__pollwait回調(diào)函數(shù):

void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
{
	// 騷操作獲取到結(jié)構(gòu)體的基址。
	struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);

	// 
	struct poll_table_page *table = p->table;

	// 隊(duì)列不存在,或者內(nèi)部元素為0.
	// 也就是初始化的操作
	if (!table || POLL_TABLE_FULL(table)) {
		struct poll_table_page *new_table;

		new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
		if (!new_table) {
			p->error = -ENOMEM;
			__set_current_state(TASK_RUNNING);
			return;
		}
		new_table->entry = new_table->entries;
		new_table->next = table;
		p->table = new_table;
		table = new_table;
	}

	/* Add a new entry */
	// 放入到隊(duì)列。
	{
		struct poll_table_entry * entry = table->entry;

		// 意思是poll_table_entry是一個(gè)連續(xù)的空間,數(shù)組?
		table->entry = entry+1;

		// 原子性加引用。代表被使用了。
	 	get_file(filp);
		
	 	entry->filp = filp;
		
		entry->wait_address = wait_address;

		// 初始化wait_queue_t wait;
		// 把對(duì)應(yīng)的進(jìn)程結(jié)構(gòu)體、回調(diào)鉤子賦值
		init_waitqueue_entry(&entry->wait, current);

		// wait_address這個(gè)是sock維護(hù)的隊(duì)列
		// &entry->wait這個(gè)是隊(duì)列中的元素
		// 所以這是放入到隊(duì)列中
		add_wait_queue(wait_address,&entry->wait);
	}
}

此方法就是添加到等待隊(duì)列的具體實(shí)現(xiàn)。上面是初始化的操作,下面是添加到等待隊(duì)列的操作。

看到init_waitqueue_entry方法的實(shí)現(xiàn).

static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
?? ?q->flags = 0;
?? ?q->task = p;
?? ?// 繼續(xù)放鉤子
?? ?// 被喚醒的鉤子。
?? ?q->func = default_wake_function;
}

這里給等待隊(duì)列中的元素又掛上了default_wake_function鉤子回調(diào)函數(shù)。

看到add_wait_queue方法的實(shí)現(xiàn).

//?wait_queue_head_t *q 是當(dāng)前tcp的等待隊(duì)列。

// wait_queue_t * wait 是當(dāng)前文件對(duì)應(yīng)的等待隊(duì)列的元素。

void add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait)
{
?? ?unsigned long flags;

?? ?wait->flags &= ~WQ_FLAG_EXCLUSIVE;
?? ?spin_lock_irqsave(&q->lock, flags);
?? ?__add_wait_queue(q, wait);
?? ?spin_unlock_irqrestore(&q->lock, flags);
}

這里把當(dāng)前文件對(duì)應(yīng)的等待隊(duì)列的元素鏈到tcp的等待隊(duì)列,而等待隊(duì)列的元素中的func函數(shù)指針是掛上了default_wake_function鉤子。

等待底層網(wǎng)卡給CPU發(fā)出中斷信號(hào),CPU響應(yīng)中斷,中斷處理方法中去處理等待隊(duì)列。并且會(huì)回調(diào)default_wake_function鉤子(這里的論證太底層,偏硬件,筆者能力有限,筆者只能給出論據(jù),不能給出論證了....)

default_wake_function回調(diào)函數(shù):

int default_wake_function(wait_queue_t *curr, unsigned mode, int sync)
{
	task_t *p = curr->task;
	return try_to_wake_up(p, mode, sync);
}

這里就特別的簡(jiǎn)單了,把當(dāng)前進(jìn)程給喚醒。如何喚醒:把task_struct的state賦值為TASK_RUNNING,然后等待時(shí)鐘中斷的中斷處理函數(shù)調(diào)度此進(jìn)程。

然后當(dāng)前進(jìn)程蘇醒過來以后,又回到了do_select方法的schedule_timeout(__timeout); 中,此時(shí)又開始了新的一輪輪詢,而這輪輪詢是通過poll函數(shù)指針獲取到mask標(biāo)志位(因?yàn)楫?dāng)前中斷喚醒都是因?yàn)閠cp建立了連接通過default_wake_function把進(jìn)程給喚醒,所以已經(jīng)有讀寫事件的產(chǎn)生)然后通過mask判斷把對(duì)應(yīng)的位圖置位,然后返回給用戶態(tài)。

總結(jié):

select多路復(fù)用的弊端

  1. 文件大小的限制
  2. 多次輪訓(xùn)——內(nèi)核態(tài)的輪訓(xùn),返回的結(jié)果在用戶態(tài)還需要輪訓(xùn)
  3. 多次數(shù)據(jù)拷貝,不過是位圖占用空間不多。

最后,如果本帖對(duì)您有一定的幫助,希望能點(diǎn)贊+關(guān)注+收藏!您的支持是給我最大的動(dòng)力,后續(xù)會(huì)一直更新各種框架的使用和框架的源碼解讀~!

原文鏈接:https://blog.csdn.net/qq_43799161/article/details/126279876

欄目分類
最近更新