日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

Python?Asyncio調(diào)度原理詳情_python

作者:??so1n???? ? 更新時(shí)間: 2022-08-19 編程語(yǔ)言

前言

在文章《Python?Asyncio中Coroutines,Tasks,Future可等待對(duì)象的關(guān)系及作用》中介紹了Python的可等待對(duì)象作用,特別是Task對(duì)象在啟動(dòng)的時(shí)候可以自我驅(qū)動(dòng),但是一個(gè)Task對(duì)象只能驅(qū)動(dòng)一條執(zhí)行鏈,如果要多條鏈執(zhí)行(并發(fā)),還是需要EventLoop來(lái)安排驅(qū)動(dòng),接下來(lái)將通過(guò)Python.Asyncio庫(kù)的源碼來(lái)了解EventLoop是如何運(yùn)作的。

1.基本介紹

Python.Asyncio是一個(gè)大而全的庫(kù),它包括很多功能,而跟核心調(diào)度相關(guān)的邏輯除了三種可等待對(duì)象外,還有其它一些功能,它們分別位于runners.pybase_event.pyevent.py三個(gè)文件中。

runners.py文件有一個(gè)主要的類--Runner,它的主要職責(zé)是做好進(jìn)入?yún)f(xié)程模式的事件循環(huán)等到初始化工作,以及在退出協(xié)程模式時(shí)清理還在內(nèi)存的協(xié)程,生成器等對(duì)象。

協(xié)程模式只是為了能方便理解,對(duì)于計(jì)算機(jī)而言,并沒有這樣區(qū)分

event.py文件除了存放著EventLoop對(duì)象的接口以及獲取和設(shè)置EventLoop的函數(shù)外,還有兩個(gè)EventLoop可調(diào)度的對(duì)象,分別為HandlerTimerHandler,它們可以認(rèn)為是EvnetLoop調(diào)用其它對(duì)象的容器,用于連接待調(diào)度對(duì)象和事件循環(huán)的關(guān)系,不過(guò)它們的實(shí)現(xiàn)非常簡(jiǎn)單,對(duì)于Handler它的源碼如下:

# 已經(jīng)移除了一些不想關(guān)的代碼
class Handle:
    def __init__(self, callback, args, loop, context=None):
        # 初始化上下文,確保執(zhí)行的時(shí)候能找到Handle所在的上下文
        if context is None:
            context = contextvars.copy_context()
        self._context = context
        self._loop = loop
        self._callback = callback
        self._args = args
        self._cancelled = False

    def cancel(self):
        # 設(shè)置當(dāng)前Handle為取消狀態(tài)
        if not self._cancelled:
            self._cancelled = True
            self._callback = None
            self._args = None
    def cancelled(self):
        return self._cancelled
    def _run(self):
        # 用于執(zhí)行真正的函數(shù),且通過(guò)context.run方法來(lái)確保在自己的上下文內(nèi)執(zhí)行。
        try:
            # 保持在自己持有的上下文中執(zhí)行對(duì)應(yīng)的回調(diào)
            self._context.run(self._callback, *self._args)
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as exc:
            cb = format_helpers._format_callback_source(
                self._callback, self._args)
            msg = f'Exception in callback {cb}'
            context = {
                'message': msg,
                'exception': exc,
                'handle': self,
            }
            self._loop.call_exception_handler(context)

通過(guò)源碼可以發(fā)現(xiàn),Handle功能十分簡(jiǎn)單,提供了可以被取消以及可以在自己所處的上下文執(zhí)行的功能,而TimerHandle繼承于HandleHandle多了一些和時(shí)間以及排序相關(guān)的參數(shù),源碼如下:

class TimerHandle(Handle):
    def __init__(self, when, callback, args, loop, context=None):
        super().__init__(callback, args, loop, context)
        self._when = when
        self._scheduled = False
    def __hash__(self):
        return hash(self._when)
    def __lt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when
        return NotImplemented
    def __le__(self, other):
        if isinstance(other, TimerHandle):
            return self._when < other._when or self.__eq__(other)
        return NotImplemented
    def __gt__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when
        return NotImplemented
    def __ge__(self, other):
        if isinstance(other, TimerHandle):
            return self._when > other._when or self.__eq__(other)
        return NotImplemented
    def __eq__(self, other):
        if isinstance(other, TimerHandle):
            return (self._when == other._when and
                    self._callback == other._callback and
                    self._args == other._args and
                    self._cancelled == other._cancelled)
        return NotImplemented
    def cancel(self):
        if not self._cancelled:
            # 用于通知事件循環(huán)當(dāng)前Handle已經(jīng)退出了
            self._loop._timer_handle_cancelled(self)
        super().cancel()
    def when(self):
        return self._when

通過(guò)代碼可以發(fā)現(xiàn),這兩個(gè)對(duì)象十分簡(jiǎn)單,而我們?cè)谑褂?code>Python.Asyncio時(shí)并不會(huì)直接使用到這兩個(gè)對(duì)象,而是通過(guò)loop.call_xxx系列方法來(lái)把調(diào)用封裝成Handle對(duì)象,然后等待EventLoop執(zhí)行。 所以loop.call_xxx系列方法可以認(rèn)為是EventLoop的注冊(cè)操作,基本上所有非IO的異步操作都需要通過(guò)loop.call_xxx方法來(lái)把自己的調(diào)用注冊(cè)到EventLoop中,比如Task對(duì)象就在初始化后通過(guò)調(diào)用loop.call_soon方法來(lái)注冊(cè)到EventLoop中,loop.call_sonn的實(shí)現(xiàn)很簡(jiǎn)單,

它的源碼如下:

class BaseEventLoop:
    ...
    def call_soon(self, callback, *args, context=None):
        # 檢查是否事件循環(huán)是否關(guān)閉,如果是則直接拋出異常
        self._check_closed()
        handle = self._call_soon(callback, args, context)
        return handle

   def _call_soon(self, callback, args, context):
        # 把調(diào)用封裝成一個(gè)handle,這樣方便被事件循環(huán)調(diào)用
        handle = events.Handle(callback, args, self, context)
        # 添加一個(gè)handle到_ready,等待被調(diào)用
        self._ready.append(handle)
        return handle

可以看到call_soon真正相關(guān)的代碼只有10幾行,它負(fù)責(zé)把一個(gè)調(diào)用封裝成一個(gè)Handle,并添加到self._reday中,從而實(shí)現(xiàn)把調(diào)用注冊(cè)到事件循環(huán)之中。

loop.call_xxx系列函數(shù)除了loop.call_soon系列函數(shù)外,還有另外兩個(gè)方法--loop.call_atloop.call_later,它們類似于loop.call_soon,不過(guò)多了一個(gè)時(shí)間參數(shù),來(lái)告訴EventLoop在什么時(shí)間后才可以調(diào)用,同時(shí)通過(guò)loop.call_atloop.call_later注冊(cè)的調(diào)用會(huì)通過(guò)Python的堆排序模塊headpq注冊(cè)到self._scheduled變量中,

具體代碼如下:

class BaseEventLoop:
    ...
    def call_later(self, delay, callback, *args, context=None):
        if delay is None:
            raise TypeError('delay must not be None')
        timer = self.call_at(self.time() + delay, callback, *args, context=context)
        return timer

    def call_at(self, when, callback, *args, context=None):
        if when is None:
            raise TypeError("when cannot be None")
        self._check_closed()
        # 創(chuàng)建一個(gè)timer handle,然后添加到事件循環(huán)的_scheduled中,等待被調(diào)用
        timer = events.TimerHandle(when, callback, args, self, context)
        heapq.heappush(self._scheduled, timer)
        timer._scheduled = True
        return timer

2.EventLoop的調(diào)度實(shí)現(xiàn)

在文章《Python?Asyncio中Coroutines,Tasks,Future可等待對(duì)象的關(guān)系及作用》中已經(jīng)分析到了runner會(huì)通過(guò)loop.run_until_complete來(lái)調(diào)用mainTask從而開啟EventLoop的調(diào)度,所以在分析EventLoop的調(diào)度時(shí),應(yīng)該先從loop.run_until_complete入手,

對(duì)應(yīng)的源碼如下:

class BaseEventLoop:
    def run_until_complete(self, future):
        ...
        new_task = not futures.isfuture(future)
        # 把coroutine轉(zhuǎn)換成task,這樣事件循環(huán)就可以調(diào)度了,事件循環(huán)的最小調(diào)度單位為task
        # 需要注意的是此時(shí)事件循環(huán)并沒注冊(cè)到全局變量中,所以需要顯示的傳進(jìn)去,
        # 同時(shí)Task對(duì)象注冊(cè)的時(shí)候,已經(jīng)通過(guò)loop.call_soon把自己注冊(cè)到事件循環(huán)中,等待調(diào)度
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False
        # 當(dāng)該task完成時(shí),意味著當(dāng)前事件循環(huán)失去了調(diào)度對(duì)象,無(wú)法繼續(xù)調(diào)度,所以需要關(guān)閉當(dāng)前事件循環(huán),程序會(huì)由協(xié)程模式返回到線程模式
        future.add_done_callback(_run_until_complete_cb)
        try:
            # 事件循環(huán)開始運(yùn)行
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

    def run_forever(self):
        # 進(jìn)行一些初始化工作
        self._check_closed()
        self._check_running()
        self._set_coroutine_origin_tracking(self._debug)
        self._thread_id = threading.get_ident()

        old_agen_hooks = sys.get_asyncgen_hooks()
        # 通過(guò)asyncgen鉤子來(lái)自動(dòng)關(guān)閉asyncgen函數(shù),這樣可以提醒用戶生成器還未關(guān)閉
        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                               finalizer=self._asyncgen_finalizer_hook)
        try:
            # 設(shè)置當(dāng)前在運(yùn)行的事件循環(huán)到全局變量中,這樣就可以在任一階段獲取到當(dāng)前的事件循環(huán)了
            events._set_running_loop(self)
            while True:
                # 正真執(zhí)行任務(wù)的邏輯
                self._run_once()
                if self._stopping:
                    break
        finally:
            # 關(guān)閉循環(huán), 并且清理一些資源
            self._stopping = False
            self._thread_id = None
            events._set_running_loop(None)
            self._set_coroutine_origin_tracking(False)
            sys.set_asyncgen_hooks(*old_agen_hooks)

這段源碼并不復(fù)雜,它的主要邏輯是通過(guò)把Corotinue轉(zhuǎn)為一個(gè)Task對(duì)象,然后通過(guò)Task對(duì)象初始化時(shí)調(diào)用loop.call_sonn方法把自己注冊(cè)到EventLoop中,最后再通過(guò)loop.run_forever中的循環(huán)代碼一直運(yùn)行著,直到_stopping被標(biāo)記為True:

while True:
    # 正真執(zhí)行任務(wù)的邏輯
    self._run_once()
    if self._stopping:
        break

可以看出,這段代碼是確保事件循環(huán)能一直執(zhí)行著,自動(dòng)循環(huán)結(jié)束,而真正調(diào)度的核心是_run_once函數(shù),

它的源碼如下:

class BaseEventLoop:
    ...
    def _run_once(self):
        # self._scheduled是一個(gè)列表,它只存放TimerHandle
        sched_count = len(self._scheduled)
        ###############################
        # 第一階段,整理self._scheduled #
        ###############################
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # 當(dāng)待調(diào)度的任務(wù)數(shù)量超過(guò)100且待取消的任務(wù)占總?cè)蝿?wù)的50%時(shí),才進(jìn)入這個(gè)邏輯
            # 把需要取消的任務(wù)移除
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    # 設(shè)置handle的_cancelled為True,并且把handle從_scheduled中移除
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            # 重新排列堆
            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # 需要取消的handle不多,則只會(huì)走這個(gè)邏輯,這里會(huì)把堆頂?shù)膆andle彈出,并標(biāo)記為不可調(diào)度,但不會(huì)訪問(wèn)整個(gè)堆
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        #################################
        # 第二階段,計(jì)算超時(shí)值以及等待事件IO #
        #################################
        timeout = None
        # 當(dāng)有準(zhǔn)備調(diào)度的handle或者是正在關(guān)閉時(shí),不等待,方便盡快的調(diào)度
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            # 如果堆有數(shù)據(jù)時(shí),通過(guò)堆頂?shù)膆andle計(jì)算最短的超時(shí)時(shí)間,但是最多不能超過(guò)MAXIMUM_SELECT_TIMEOUT,以免超過(guò)系統(tǒng)限制
            when = self._scheduled[0]._when
            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

        # 事件循環(huán)等待事件,直到有事件或者超時(shí)
        event_list = self._selector.select(timeout)

        ##################################################
        # 第三階段,把滿足條件的TimeHandle放入到self._ready中 #
        ##################################################
        # 獲取得到的事件的回調(diào),然后裝填到_ready
        self._process_events(event_list)

        # 把一些在self._scheduled且滿足調(diào)度條件的handle放到_ready中,比如TimerHandle。
        # end_time為當(dāng)前時(shí)間+一個(gè)時(shí)間單位,猜測(cè)是能多處理一些這段時(shí)間內(nèi)產(chǎn)生的事件
        end_time = self.time() + self._clock_resolution
        while self._scheduled:
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)

        ################################################################################
        # 第四階段,遍歷所有準(zhǔn)備調(diào)度的handle,并且通過(guò)handle的context來(lái)執(zhí)行handle對(duì)應(yīng)的callback #
        ################################################################################
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            # 如果handle已經(jīng)被取消,則不調(diào)用
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        # 執(zhí)行太久的回調(diào),記錄下來(lái),這些需要開發(fā)者自己優(yōu)化
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

通過(guò)源碼分析,可以很明確的知道調(diào)度邏輯中第一步是先規(guī)整self._scheduled,在規(guī)整的過(guò)程是使用堆排序來(lái)進(jìn)行的,因?yàn)槎雅判蛟谡{(diào)度的場(chǎng)景下效率是非常高的,不過(guò)這段規(guī)整代碼分成兩種,我猜測(cè)是當(dāng)需要取消的數(shù)量過(guò)多時(shí)直接遍歷的效率會(huì)更高。 在規(guī)整self._scheduled后,就進(jìn)入第二步,該步驟開始等待系統(tǒng)事件循環(huán)返回對(duì)應(yīng)的事件,如果self._ready中有數(shù)據(jù),就不做等待了,需要馬上到下一步驟,以便能趕緊安排調(diào)度。 在得到系統(tǒng)事件循環(huán)得到的事件后,就進(jìn)入到了第三步,該步驟會(huì)通過(guò)self._process_events方法處理對(duì)應(yīng)的事件,并把事件對(duì)應(yīng)的回調(diào)存放到了self._ready中,最后再遍歷self._ready中的所有Handle并逐一執(zhí)行(執(zhí)行時(shí)可以認(rèn)為EventLoop把控制權(quán)返回給對(duì)應(yīng)的調(diào)用邏輯),至此一個(gè)完整的調(diào)度邏輯就結(jié)束了,并進(jìn)入下一個(gè)調(diào)度邏輯。

3.網(wǎng)絡(luò)IO事件的處理

注:由于系統(tǒng)事件循環(huán)的限制,所以文件IO一般還是使用多線程來(lái)執(zhí)行,具體見:github.com/python/asyn…

在分析EventLoop調(diào)度實(shí)現(xiàn)的時(shí)候忽略了self._process_events的具體實(shí)現(xiàn)邏輯,因?yàn)?code>_process_events方法所在asyncio.base_event.py文件中的BaseEventLoop類并未有具體實(shí)現(xiàn)的,因?yàn)榫W(wǎng)絡(luò)IO相關(guān)的需要系統(tǒng)的事件循環(huán)來(lái)幫忙處理,所以與系統(tǒng)事件循環(huán)相關(guān)的邏輯都在asyncio.selector_events.py中的BaseSelectorEventLoop類中。BaseSelectorEventLoop類封裝了selector模塊與系統(tǒng)事件循環(huán)交互,使調(diào)用者不需要去考慮sock的創(chuàng)建以及sock產(chǎn)生的文件描述符的監(jiān)聽與注銷等操作,下面以BaseSelectorEventLoop中自帶的pipe為例子,分析BaseSelectorEventLoop是如何進(jìn)行網(wǎng)絡(luò)IO事件處理的。

在分析之前,先看一個(gè)例子,代碼如下:

import asyncio
import threading
def task():
    print("task")
def run_loop_inside_thread(loop):
    loop.run_forever()
loop = asyncio.get_event_loop()
threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()
loop.call_soon(task)

如果直接運(yùn)行這個(gè)例子,它并不會(huì)輸出task(不過(guò)在IDE使用DEBUG模式下線程啟動(dòng)會(huì)慢一點(diǎn),所以會(huì)輸出的),因?yàn)樵谡{(diào)用loop.run_foreverEventLoop會(huì)一直卡在這段邏輯中:

event_list = self._selector.select(timeout)

所以調(diào)用loop.call_soon并不會(huì)使EventLoop馬上安排調(diào)度,而如果把call_soon換成call_soon_threadsafe則可以正常輸出,這是因?yàn)?code>call_soon_threadsafe中多了一個(gè)self._write_to_self的調(diào)用,它的源碼如下:

class BaseEventLoop:
    ...
    def call_soon_threadsafe(self, callback, *args, context=None):
        """Like call_soon(), but thread-safe."""
        self._check_closed()
        handle = self._call_soon(callback, args, context)
        self._write_to_self()
        return handle

由于這個(gè)調(diào)用是涉及到IO相關(guān)的,所以需要到BaseSelectorEventLoop類查看,接下來(lái)以pipe相關(guān)的網(wǎng)絡(luò)IO操作來(lái)分析EventLoop是如何處理IO事件的(只演示reader對(duì)象,writer對(duì)象操作與reader類似),

對(duì)應(yīng)的源碼如下:

class BaseSelectorEventLoop(base_events.BaseEventLoop):
    #######
    # 創(chuàng)建 #
    #######
    def __init__(self, selector=None):
        super().__init__()

        if selector is None:
            # 獲取最優(yōu)的selector
            selector = selectors.DefaultSelector()
        self._selector = selector
        # 創(chuàng)建pipe
        self._make_self_pipe()
        self._transports = weakref.WeakValueDictionary()
    def _make_self_pipe(self):
        # 創(chuàng)建Pipe對(duì)應(yīng)的sock 
        self._ssock, self._csock = socket.socketpair()
        # 設(shè)置sock為非阻塞
        self._ssock.setblocking(False)
        self._csock.setblocking(False)
        self._internal_fds += 1
        # 阻塞服務(wù)端sock讀事件對(duì)應(yīng)的回調(diào)
        self._add_reader(self._ssock.fileno(), self._read_from_self)
    def _add_reader(self, fd, callback, *args):
        # 檢查事件循環(huán)是否關(guān)閉
        self._check_closed()
        # 封裝回調(diào)為handle對(duì)象
        handle = events.Handle(callback, args, self, None)
        try:
            key = self._selector.get_key(fd)
        except KeyError:
            # 如果沒有注冊(cè)到系統(tǒng)的事件循環(huán),則注冊(cè)
            self._selector.register(fd, selectors.EVENT_READ,
                                    (handle, None))
        else:
            # 如果已經(jīng)注冊(cè)過(guò),則更新
            mask, (reader, writer) = key.events, key.data
            self._selector.modify(fd, mask | selectors.EVENT_READ,
                                  (handle, writer))
            if reader is not None:
                reader.cancel()
        return handle

    def _read_from_self(self):
        # 負(fù)責(zé)消費(fèi)sock數(shù)據(jù)
        while True:
            try:
                data = self._ssock.recv(4096)
                if not data:
                    break
                self._process_self_data(data)
            except InterruptedError:
                continue
            except BlockingIOError:
                break
    #######
    # 刪除 #
    #######
    def _close_self_pipe(self):
        # 注銷Pipe對(duì)應(yīng)的描述符 
        self._remove_reader(self._ssock.fileno())
        # 關(guān)閉sock
        self._ssock.close()
        self._ssock = None
        self._csock.close()
        self._csock = None
        self._internal_fds -= 1

    def _remove_reader(self, fd):
        # 如果事件循環(huán)已經(jīng)關(guān)閉了,就不用操作了
        if self.is_closed():
            return False
        try:
            # 查詢文件描述符是否在selector中
            key = self._selector.get_key(fd)
        except KeyError:
            # 不存在則返回
            return False
        else:
            # 存在則進(jìn)入移除的工作
            mask, (reader, writer) = key.events, key.data
            # 通過(guò)事件掩碼判斷是否有其它事件
            mask &= ~selectors.EVENT_READ
            if not mask:
                # 移除已經(jīng)注冊(cè)到selector的文件描述符
                self._selector.unregister(fd)
            else:
                # 移除已經(jīng)注冊(cè)到selector的文件描述符,并注冊(cè)新的事件
                self._selector.modify(fd, mask, (None, writer))

            # 如果reader不為空,則取消reader
            if reader is not None:
                reader.cancel()
                return True
            else:
                return False

通過(guò)源碼中的創(chuàng)建部分可以看到,EventLoop在啟動(dòng)的時(shí)候會(huì)創(chuàng)建一對(duì)建立通信的sock,并設(shè)置為非阻塞,然后把對(duì)應(yīng)的回調(diào)封裝成一個(gè)Handle對(duì)象并注冊(cè)到系統(tǒng)事件循環(huán)中(刪除則進(jìn)行對(duì)應(yīng)的反向操作),之后系統(tǒng)事件循環(huán)就會(huì)一直監(jiān)聽對(duì)應(yīng)的事件,也就是EventLoop的執(zhí)行邏輯會(huì)阻塞在下面的調(diào)用中,等待事件響應(yīng):

event_list = self._selector.select(timeout)

這時(shí)如果執(zhí)行loop.call_soon_threadsafe,那么會(huì)通過(guò)write_to_self寫入一點(diǎn)信息:

    def _write_to_self(self):
        csock = self._csock
        if csock is None:
            return
        try:
            csock.send(b'\0')
        except OSError:
            if self._debug:
                logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)

由于csock被寫入了數(shù)據(jù),那么它對(duì)應(yīng)的ssock就會(huì)收到一個(gè)讀事件,系統(tǒng)事件循環(huán)在收到這個(gè)事件通知后就會(huì)把數(shù)據(jù)返回,然后EventLoop就會(huì)獲得到對(duì)應(yīng)的數(shù)據(jù),并交給process_events方法進(jìn)行處理,

它的相關(guān)代碼如下:

class BaseSelectorEventLoop:
    def _process_events(self, event_list):
        for key, mask in event_list:
            # 從回調(diào)事件中獲取到對(duì)應(yīng)的數(shù)據(jù),key.data在注冊(cè)時(shí)是一個(gè)元祖,所以這里要對(duì)元祖進(jìn)行解包
            fileobj, (reader, writer) = key.fileobj, key.data
            if mask & selectors.EVENT_READ and reader is not None:
                # 得到reader handle,如果是被標(biāo)記為取消,就移除對(duì)應(yīng)的文件描述符
                if reader._cancelled:
                    self._remove_reader(fileobj)
                else:
                    # 如果沒被標(biāo)記為取消,則安排到self._ready中
                    self._add_callback(reader)
            if mask & selectors.EVENT_WRITE and writer is not None:
                # 對(duì)于寫對(duì)象,也是同樣的道理。
                if writer._cancelled:
                    self._remove_writer(fileobj)
                else:
                    self._add_callback(writer)

    def _add_callback(self, handle):
        # 把回調(diào)的handle添加到_ready中
        assert isinstance(handle, events.Handle), 'A Handle is required here'
        if handle._cancelled:
            return
        assert not isinstance(handle, events.TimerHandle)
        self._ready.append(handle)

    def _remove_reader(self, fd):
        # 如果事件循環(huán)已經(jīng)關(guān)閉了,就不用操作了
        if self.is_closed():
            return False
        try:
            # 查詢文件描述符是否在selector中
            key = self._selector.get_key(fd)
        except KeyError:
            # 不存在則返回
            return False
        else:
            # 存在則進(jìn)入移除的工作
            mask, (reader, writer) = key.events, key.data
            mask &= ~selectors.EVENT_READ
            if not mask:
                # 移除已經(jīng)注冊(cè)到selector的文件描述符
                self._selector.unregister(fd)
            else:
                self._selector.modify(fd, mask, (None, writer))

            if reader is not None:
                reader.cancel()
                return True
            else:
                return False

從代碼中可以看出_process_events會(huì)對(duì)事件對(duì)應(yīng)的文件描述符進(jìn)行處理,并從事件回調(diào)中獲取到對(duì)應(yīng)的Handle對(duì)象添加到self._ready中,由EventLoop在接下來(lái)遍歷self._ready并執(zhí)行。

可以看到網(wǎng)絡(luò)IO事件的處理并不復(fù)雜,因?yàn)橄到y(tǒng)事件循環(huán)已經(jīng)為我們做了很多工作了,但是用戶所有與網(wǎng)絡(luò)IO相關(guān)的操作都需要有一個(gè)類似的操作,這樣是非常的繁瑣的,幸好asyncio庫(kù)已經(jīng)為我們做了封裝,我們只要調(diào)用就可以了,方便了很多。

原文鏈接:https://juejin.cn/post/7108367062463938597

欄目分類
最近更新