日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Python使用future處理并發問題方案詳解_python

作者:lijiachang8 ? 更新時間: 2023-04-17 編程語言

從Python3.2引入的concurrent.futures模塊,Python2.5以上需要在pypi中安裝futures包。

future指一種對象,表示異步執行的操作。這個概念的作用很大,是concurrent.futures模塊和asyncio包的基礎。

網絡下載的三種風格

為了高效的處理網絡IO,需要使用并發,因為網絡有很高的延遲,所以為了不浪費CPU周期去等待,最好再收到網絡響應之前去做其他的事情。

下面有三種示例程序,

第一個程序是依序下載的,第二個是使用theadpool來自concurrent.futures模塊,第三個是使用asyncio包

按照順序下載

下面示例是依次下載

import os
import time
import sys
import requests
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def sava_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content
def show(text):
    print(text, end=" ")
    sys.stdout.flush()  # 能在一行中顯示
def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        sava_flag(image, cc.lower() + ".gif")
    return len(cc_list)
def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags download in {:.2f}s'
    print(msg.format(count, elapsed))
if __name__ == '__main__':
    main()

打印
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN?
20 flags download in 24.16s

知識點:

  • 按照慣例,requests不在標準庫中,在導入標準庫之后,用一個空行分隔開
  • sys.stdout.flush() : 顯示一個字符串,然后刷新sys.stdout,這樣能在一行消息中看到進度。Python中正常情況下,遇到換行才會刷新stdout緩沖。

使用conrurrent.futures模塊多線程下載

concurrent.futures模塊的主要特色是TheadPoolExecutor和ProcessPoolExecutor類,這兩個類實現的結構能分別在不同線程或者進程中執行可調用對象。

這兩個類內部維護著一個工作線程池或者進程池,以及要執行的任務隊列。不過,這個接口抽象的層級很高,無需關心任何實現細節。

下面展示如何使用TheadPoolExecutor.map方法,最簡單的方式實現并發下載。

import os
import time
import sys
from concurrent import futures
import requests
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
max_workers = 20  # 設定線程數
def sava_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content
def show(text):
    print(text, end=" ")
    sys.stdout.flush()  # 能在一行中顯示
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    sava_flag(image, cc.lower() + ".gif")
def download_many(cc_list):
    works = min(len(cc_list), max_workers)  # 取其中的最小值,以免創建多余的線程
    with futures.ThreadPoolExecutor(works) as executor:
        res = executor.map(download_one, cc_list)
    return len(list(res))
def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags download in {:.2f}s'
    print(msg.format(count, elapsed))
if __name__ == '__main__':
    main()

打印
FR IN RU ID BD JP CN VN TR CD PH NG DE ET US EG IR BR MX PK?
20 flags download in 2.92s

知識點:

  • 使用線程數實例化ThreadPoolExecute類。executor.__exit__方法會調用executor.shutdown(wait=True)方法,它會在所有線程執行完畢前阻塞線程。
  • executor.map方法的作用于內置map函數類似,區別是在多個線程中并發調用,此map方法會返回一個生成器,因此可以迭代,獲取各個函數返回的值。
  • return len(list(res))這里要注意下,讀取res也就是executor.map各個函數的返回值,如果線程中有異常,會在這里拋出,這與隱式調用next()函數從迭代器中獲取相應的返回值一樣。

使用asyncio異步下載

后續章節會介紹

future是什么

從Python3.4開始,標準庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future

這兩個類的作用相同:兩個future類的實例都可以表示已經完成或者尚未完成的延遲計算。這與Twister引擎中的Deferred類、Tornado框架中的Future類,以及多個JavaScript庫中的Promise對象類似。

future封裝待完成的操作,可以放入隊列,完成的狀態可以查詢,得到結果(或拋出異常)。

通常情況下自己不應該創建future,而只能由并發框架concurrent.futures和asyncio實例化。

原因很簡單:future表示終將發生的事情,而確定某件時間發生的唯一方式是執行的時間(順序)已經排定。因此,只有排定把某件事情交給concurrent.futures.Executor子類處理時,才會創建concurrent.futures.Future實例。

Executor.submit()方法的參數是一個可調用的對象,調用這個方法后會為傳入的可調用對象排期,并返回一個future。

客戶端代碼不應該改變future的狀態,并發框架在future表示的延遲計算結束后會改變future的狀態,而我們無法控制計算何時結束。

兩種future都有.done()方法,這個方法并不阻塞,返回值是布爾值,指明future鏈接的可調用對象是否已經執行。

客戶端代碼通常不會詢問future是否運行結束,而是會等待通知。因

兩個Future類都有.add_done_callback()方法:這個方法只有一個參數,類型是可調用對象,future運行結束后會調用此可調用對象。

還有.result()方法,如果在future運行結束后調用的haunt,這個方法在兩個Future類的作用相同:返回可調用對象的結果,或者拋出異常(重新拋出執行可調用對象時拋出的異常)。可是,如果future沒有運行結束,result方法在兩個Future類中的行為相差很大:

對于concurrent.futures.Future實例來說,調用了f.result()方法會阻塞調用方所在的線程,直到有結果返回,此時的result方法,可以接受可選的timeout參數,如果在指定實現內future沒有運行完畢,會拋出TimeoutError異常。

對于asyncio.Future.result方法,不支持設置timeout超時時間,在那個庫中獲取future結果最好使用yield from結構。

這兩個庫中有幾個函數會返回future,其他函數則使用future以用戶易于理解的方式實現自身。

Executor.map方法是使用future:返回值是一個迭代器,迭代器的__next__方法調用各個future的result方法,因此得到各個future的結果。

concurrent.futures.as_completed函數參數是一個列表,返回值是一個迭代器,在future運行結束后產出future。

為了理解future,使用as_completed函數,把較為抽象的executor.map調換成兩個for循環:一個用戶創建并排定future(使用summit方法),一個用于獲取future的結果。

示例,一窺神秘的future。

...其余代碼省略
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    sava_flag(image, cc.lower() + ".gif")
    return cc
def download_many(cc_list):
    cc_list = cc_list[:5]  # 這次演示5個國家
    future_list = []
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # 線程池為3,便于觀察
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)  # submit方法排定可調用對象的執行時間,然后返回一個future,表示待執行操作
            future_list.append(future)
            print("{}: {}".format(cc, future))
        res = []
        for future in futures.as_completed(future_list):  # as_completed函數在future都執行完畢后,返回future
            result = future.result()  # 獲取future結果
            print('{} result: {!r}'.format(future, result))
            res.append(result)

打印
BR: <Future at 0x3af2870 state=running>
CN: <Future at 0x3af2c90 state=running>
ID: <Future at 0x3af2ff0 state=running>
IN: <Future at 0x3aff3b0 state=pending>
US: <Future at 0x3aff410 state=pending>
CN <Future at 0x3af2c90 state=finished returned str> result: 'CN'
ID <Future at 0x3af2ff0 state=finished returned str> result: 'ID'
BR <Future at 0x3af2870 state=finished returned str> result: 'BR'
IN <Future at 0x3aff3b0 state=finished returned str> result: 'IN'
US <Future at 0x3aff410 state=finished returned str> result: 'US'

知識點:

  • summit方法把一個可執行對象(函數),變為一個future對象,并且記錄了執行時間,表示待執行打操作。
  • futures.as_completed在所有的future執行完畢后,產出future對象。而后可以使用future.result()獲取結果
  • 直接打印future對象(調用future的repr()方法),會顯示future的狀態,例如running、pending(等待)、finished

GIL和阻塞型I/O

嚴格來說,上面實現的多線程并發腳本都不能實現并行下載。

使用concurrent.futures庫實現的示例,都會受到GIL(Global Interpreter Lock,全局解釋器鎖)的限制,腳本只能在單個線程中執行。

CPython解釋器本身就不是線程安全的,因此會有全局解釋器鎖GIL,一次只允許使用一個線程執行Python字節碼。

因此一個Python進程通常不能同時使用多個CPU核心。(在Jython和IronPython中沒有此限制,目前最快的PyPy解釋器也存在GIL) IronPython是.net實現的

Python代碼無法控制GIL,然而,標準庫中所以執行阻塞型IO操作的函數,在等待操作系統返回結果時都會釋放GIL。這意味著在Python語言這個層次上可以使用多線程,而IO密集型操作能從中受益:一個Python線程等待網絡響應時,阻塞型IO函數會釋放GIL,再運行一個線程。比如time.sleep()函數也會釋放GIL。

GIL簡化了CPython和C語言擴展的實現。得益于GIL,Python有很多C語言擴展。

使用concurrent.futures模塊多進程

在處理CPU密集型操作時,可以使用多進程,實現真正的并行計算。

使用ProcessPoolExecutor類把任務分配給多個Python進程處理。因此如果需要做CPU密集型操作,使用這個模塊多進程能繞開GIL,利用所有可用的CPU核心。

ProcessPoolExecutor和ThreadPoolExecutor類都實現了通用的Executor接口,因此使用concurrent.futures模塊能輕松的把基于線程的方案轉成基于進程的方案。

這兩個實現Executor接口的類,唯一的區別是,ThreadPoolExecutor.__init__方法需要max_workers參數,指定線程池中線程的數量。

但是在ProcessPoolExecutor類中,這個參數是可選的,而且大多數情況下使用默認值:os.cpu_count()函數返回的CPU數量。因為對于CPU密集型操作來說,不可能要求使用超過CPU數量的進程。

經過測試,使用ProcessPoolExecutor實例下載20個國旗的時間,要比ThreadPoolExecutor要慢,主要原因是我電腦是四核八線程,八個邏輯處理器,因此限制只有4個并發下載,而使用線程池的版本有20個工作線程。

ProcessPoolExecutor的價值體現在CPU密集型操作上。比如對于加密算法上,使用ProcessPoolExecutor類派生出四個工作進程后,性能可以提高兩倍。

如果使用PyPy比CPython相比,速度又能提高3.8倍。所以使用Python進行CPU密集型操作,應該試試PyPy,普遍快3.8~5.1倍。

實驗Executor.map方法

若想并發運行多個可調用對象,最簡單是是使用Executor.map方法。

示例,演示Executor.map方法的某些運作細節

import time
from concurrent import futures
def display(*args):
    """把參數打印前,加上時間顯示"""
    print(time.strftime('[%H:%M:%S]'), end=" ")
    print(*args)
def loiter(n):
    """開始時顯示一個消息,然后休眠n秒,最后再結束的時候在顯示一個消息
        消息使用制表符縮進,縮進量由n值確定
        loiter:徘徊,閑著,閑蕩
    """
    msg = '{}loiter({}):doing nothing for {}s'
    display(msg.format('\t' * n, n, n))
    time.sleep(n)
    msg = '{}loiter({}):done'
    display(msg.format('\t' * n, n))
    return n * 10  # 隨意返回一個結果
def main():
    display('Script starting.')  # 腳本開始
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))  # 把5個任務交個3個線程
    display('results :', results) # 打印調用executor.map的結果,是一個生成器
    display('waiting for individual results:')  # 等待個體結果
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))
if __name__ == '__main__':
    main()

打印
[17:40:08] Script starting.
[17:40:08] loiter(0):doing nothing for 0s
[17:40:08] loiter(0):done
[17:40:08] ?? ?loiter(1):doing nothing for 1s
[17:40:08] ?? ??? ?loiter(2):doing nothing for 2s
[17:40:08][17:40:08] ?results :?? ??? ??? ?loiter(3):doing nothing for 3s?
<generator object Executor.map.<locals>.result_iterator at 0x0318CDB0>
[17:40:08] waiting for individual results:
[17:40:08] result 0: 0
[17:40:09] ?? ?loiter(1):done
[17:40:09] ?? ??? ??? ??? ?loiter(4):doing nothing for 4s
[17:40:09] result 1: 10
[17:40:10] ?? ??? ?loiter(2):done
[17:40:10] result 2: 20
[17:40:11] ?? ??? ??? ?loiter(3):done
[17:40:11] result 3: 30
[17:40:13] ?? ??? ??? ??? ?loiter(4):done
[17:40:13] result 4: 40

知識點:

  • 示例中把5個任務交給executor(3個線程),其中前三個任務會立即開始;這是非阻塞調用。
  • 在for循環中會隱式調用next(results),這個函數又會在第一個任務的future上調用future.result()方法。result方法會阻塞,直到這個future運行結束。所以這個for循環每次迭代都會阻塞,等到結果出來后,才會繼續。
  • 每次的打印結果都可能不一樣。由于sleep函數總會釋放GIL,即使是sleep(0),所以loiter(1)有可能在loiter(0)結束之前開始運行,但是這個示例中沒有。三個線程是同時開始。
  • executor.map的結果是一個生成器,這個操作不會阻塞。
  • loiter(0)的結果result 0: 0打印沒有阻塞的原因是,在for循環之前future已經執行完成,可以看到輸出了done。

綜上,Executor.map函數易于使用,有個特征算是優點,但也可能沒用變成缺點,具體情況取決于需求:map函數返回的結果順序于調用開始的順序一致。

如果第一個任務生成結果用時10秒,而其他任務調用只用1秒,代碼就會阻塞10秒,獲取map方法返回生成器的第一個結果。在此之后,獲取后續結果時不會阻塞,因為后續的調用已經結束,所以,獲取循環所有結果,阻塞的用時等于最長的任務時間。

如果必須等待獲取所有結果后再處理的場景,這種行為沒問題;不過,通常更常用的方式是,不管提交的順序,只有有結果就獲取。這樣就要使用executor.submit方法和futures.as_completed函數結合起來使用。

executor.submit和futures.as_completed這個組合比executor.map更靈活:

  • 因為submit方法能處理不同的可調用對象和參數,而executor.map只能處理參數不同的同一個可調用對象
  • 此外,傳給future.as_completed函數的future集合可以來自多個Executor實例
  • futures.as_completed只返回已經運行結束的future

顯示下載進度條

Python內置庫有tqdm包,taqadum在阿拉伯語中的意思是進展。

tqdm可以在長循環中添加一個進度提示信息,用戶只需要封裝任意的迭代器 tqdm(iterator)

import time
from tqdm import tqdm
for i in tqdm(range(1000)):
    time.sleep(.01)
100%|██████████| 1000/1000 [00:10<00:00, 95.34it/s]

tqdm能處理任何可迭代對象,生成一個迭代器;使用這個迭代器時,顯示進度條和完成全部迭代

為了計算剩余時間,tqdm函數要獲取可以使用len函數的可迭代對象,或者在第二個參數中指定預期的元素數量。

例如,futures.as_completed函數的結果,就不支持len函數,只能使用tdqm的第二個參數total=來指定數量。

網絡下載增加錯誤處理和進度條

下面的示例中負責下載一個文件的函數(download_one)中使用相同的策略處理HTTP 404錯誤。其他異常則向上冒泡,交給download_many函數處理。

示例,負責下載的基本函數。

def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()  # 如果狀態碼不是200,產生一個HttpError的異常
    return resp.content
def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:  # 如果不是404異常,向上冒泡,傳給調用方
            raise
    else:
        sava_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'ok'
    if verbose:
        print(cc, msg)
    return Result(status, cc)

示例,依序下載的download_many函數

def download_many(cc_list, base_url, verbose, max_req):
    """實現依序下載"""
    counter = collections.Counter()  # 統計不同的下載狀態:HTTPStatus.ok、HTTPStatus.not_found、HTTPStatus.error
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)  # 如果不需要詳細模式,就使用進度條展示
    for cc in cc_iter:
        try:
            res = download_one(cc, base_url, verbose)
        except requests.exceptions.HTTPError as exc:
            error_msg = "HTTP error {res.status_code} - {res.reason}"
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection error'
        else:
            error_msg = ''
            status = res.status
        if error_msg:
            status = HTTPStatus.error
        counter[status] += 1
        if verbose and error_msg:
            print('*** Error for {}:{}'.format(cc, error_msg))
    return counter

知識點:

requests.exceptions中有所有的requests相關的異常類,可以用來捕獲相關異常。

如果有響應信息后,產生的異常,異常對象exc.response的status_code狀態碼和reason異常原因

示例,多線程下載的download_many函數

default_concur_req = 30  # 默認的線程池大小
max_concur_req = 1000  # 最大并發請求數,這是一個安全措施
def download_many(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc, base_url, verbose)  # submit排定一個可調用對象的執行時間,返回一個Future實例
            to_do_map[future] = cc  # 把各個Future實例映射到國家代碼上,在錯誤處理時使用
        done_iter = futures.as_completed(to_do_map) # 返回一個迭代器,在future運行結束后產出future
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # 如果不是詳細模式,就顯示進度條,因為done_iter沒有len函數,只能通過total參數傳入
        for future in done_iter:
            try:
                res = future.result()
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = "Connection error"
            else:
                error_msg = ""
                status = res.status
            if error_msg:
                status = HTTPStatus.error
            counter[status] +=1
            if verbose and error_msg:
                cc = to_do_map[future]
                print('*** Error for {}:{}'.format(cc, error_msg))
        return counter

知識點:

對futures.as_completed函數的慣用法:構建一個字典,把各個future映射到其他數據上,future運行結束后可能會有用。比如上述示例,把future映射到國家代碼上。

線程和多進程的代替方案

對于多線程,如果futures.ThreadPoolExecutor類對某個作業來首不夠靈活,可能要使用到threading模塊中的組件(如Thread、Lock、Semaphore等)自行制定方案,

比如使用queue模塊創建線程安全的隊列,在線程之間傳遞數據。futures.ThreadPoolExecutor類已經封裝好了這些組件。

對于CPU密集型工作來說,要啟動多個進程,規避GIL。創建多個進程的最簡單方式是用futures.ProcessPoolExecutor類。如果使用場景較復雜,需要更高級的工具,multiprocessing模塊的API和threading模塊相仿,不過作業交給多個進程處理。

原文鏈接:https://blog.csdn.net/lijiachang8/article/details/128877478

欄目分類
最近更新