網站首頁 編程語言 正文
從Python3.2引入的concurrent.futures模塊,Python2.5以上需要在pypi中安裝futures包。
future指一種對象,表示異步執行的操作。這個概念的作用很大,是concurrent.futures模塊和asyncio包的基礎。
網絡下載的三種風格
為了高效的處理網絡IO,需要使用并發,因為網絡有很高的延遲,所以為了不浪費CPU周期去等待,最好再收到網絡響應之前去做其他的事情。
下面有三種示例程序,
第一個程序是依序下載的,第二個是使用theadpool來自concurrent.futures模塊,第三個是使用asyncio包
按照順序下載
下面示例是依次下載
import os
import time
import sys
import requests
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def sava_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content
def show(text):
print(text, end=" ")
sys.stdout.flush() # 能在一行中顯示
def download_many(cc_list):
for cc in sorted(cc_list):
image = get_flag(cc)
show(cc)
sava_flag(image, cc.lower() + ".gif")
return len(cc_list)
def main():
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags download in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main()
打印
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN?
20 flags download in 24.16s
知識點:
- 按照慣例,requests不在標準庫中,在導入標準庫之后,用一個空行分隔開
- sys.stdout.flush() : 顯示一個字符串,然后刷新sys.stdout,這樣能在一行消息中看到進度。Python中正常情況下,遇到換行才會刷新stdout緩沖。
使用conrurrent.futures模塊多線程下載
concurrent.futures模塊的主要特色是TheadPoolExecutor和ProcessPoolExecutor類,這兩個類實現的結構能分別在不同線程或者進程中執行可調用對象。
這兩個類內部維護著一個工作線程池或者進程池,以及要執行的任務隊列。不過,這個接口抽象的層級很高,無需關心任何實現細節。
下面展示如何使用TheadPoolExecutor.map方法,最簡單的方式實現并發下載。
import os
import time
import sys
from concurrent import futures
import requests
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
max_workers = 20 # 設定線程數
def sava_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def get_flag(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content
def show(text):
print(text, end=" ")
sys.stdout.flush() # 能在一行中顯示
def download_one(cc):
image = get_flag(cc)
show(cc)
sava_flag(image, cc.lower() + ".gif")
def download_many(cc_list):
works = min(len(cc_list), max_workers) # 取其中的最小值,以免創建多余的線程
with futures.ThreadPoolExecutor(works) as executor:
res = executor.map(download_one, cc_list)
return len(list(res))
def main():
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags download in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main()
打印
FR IN RU ID BD JP CN VN TR CD PH NG DE ET US EG IR BR MX PK?
20 flags download in 2.92s
知識點:
- 使用線程數實例化ThreadPoolExecute類。executor.__exit__方法會調用executor.shutdown(wait=True)方法,它會在所有線程執行完畢前阻塞線程。
- executor.map方法的作用于內置map函數類似,區別是在多個線程中并發調用,此map方法會返回一個生成器,因此可以迭代,獲取各個函數返回的值。
- return len(list(res))這里要注意下,讀取res也就是executor.map各個函數的返回值,如果線程中有異常,會在這里拋出,這與隱式調用next()函數從迭代器中獲取相應的返回值一樣。
使用asyncio異步下載
后續章節會介紹
future是什么
從Python3.4開始,標準庫中有兩個名為Future的類:concurrent.futures.Future和asyncio.Future
這兩個類的作用相同:兩個future類的實例都可以表示已經完成或者尚未完成的延遲計算。這與Twister引擎中的Deferred類、Tornado框架中的Future類,以及多個JavaScript庫中的Promise對象類似。
future封裝待完成的操作,可以放入隊列,完成的狀態可以查詢,得到結果(或拋出異常)。
通常情況下自己不應該創建future,而只能由并發框架concurrent.futures和asyncio實例化。
原因很簡單:future表示終將發生的事情,而確定某件時間發生的唯一方式是執行的時間(順序)已經排定。因此,只有排定把某件事情交給concurrent.futures.Executor子類處理時,才會創建concurrent.futures.Future實例。
Executor.submit()方法的參數是一個可調用的對象,調用這個方法后會為傳入的可調用對象排期,并返回一個future。
客戶端代碼不應該改變future的狀態,并發框架在future表示的延遲計算結束后會改變future的狀態,而我們無法控制計算何時結束。
兩種future都有.done()方法,這個方法并不阻塞,返回值是布爾值,指明future鏈接的可調用對象是否已經執行。
客戶端代碼通常不會詢問future是否運行結束,而是會等待通知。因
兩個Future類都有.add_done_callback()方法:這個方法只有一個參數,類型是可調用對象,future運行結束后會調用此可調用對象。
還有.result()方法,如果在future運行結束后調用的haunt,這個方法在兩個Future類的作用相同:返回可調用對象的結果,或者拋出異常(重新拋出執行可調用對象時拋出的異常)。可是,如果future沒有運行結束,result方法在兩個Future類中的行為相差很大:
對于concurrent.futures.Future實例來說,調用了f.result()方法會阻塞調用方所在的線程,直到有結果返回,此時的result方法,可以接受可選的timeout參數,如果在指定實現內future沒有運行完畢,會拋出TimeoutError異常。
對于asyncio.Future.result方法,不支持設置timeout超時時間,在那個庫中獲取future結果最好使用yield from結構。
這兩個庫中有幾個函數會返回future,其他函數則使用future以用戶易于理解的方式實現自身。
Executor.map方法是使用future:返回值是一個迭代器,迭代器的__next__方法調用各個future的result方法,因此得到各個future的結果。
concurrent.futures.as_completed函數參數是一個列表,返回值是一個迭代器,在future運行結束后產出future。
為了理解future,使用as_completed函數,把較為抽象的executor.map調換成兩個for循環:一個用戶創建并排定future(使用summit方法),一個用于獲取future的結果。
示例,一窺神秘的future。
...其余代碼省略
def download_one(cc):
image = get_flag(cc)
show(cc)
sava_flag(image, cc.lower() + ".gif")
return cc
def download_many(cc_list):
cc_list = cc_list[:5] # 這次演示5個國家
future_list = []
with futures.ThreadPoolExecutor(max_workers=3) as executor: # 線程池為3,便于觀察
for cc in sorted(cc_list):
future = executor.submit(download_one, cc) # submit方法排定可調用對象的執行時間,然后返回一個future,表示待執行操作
future_list.append(future)
print("{}: {}".format(cc, future))
res = []
for future in futures.as_completed(future_list): # as_completed函數在future都執行完畢后,返回future
result = future.result() # 獲取future結果
print('{} result: {!r}'.format(future, result))
res.append(result)
打印
BR: <Future at 0x3af2870 state=running>
CN: <Future at 0x3af2c90 state=running>
ID: <Future at 0x3af2ff0 state=running>
IN: <Future at 0x3aff3b0 state=pending>
US: <Future at 0x3aff410 state=pending>
CN <Future at 0x3af2c90 state=finished returned str> result: 'CN'
ID <Future at 0x3af2ff0 state=finished returned str> result: 'ID'
BR <Future at 0x3af2870 state=finished returned str> result: 'BR'
IN <Future at 0x3aff3b0 state=finished returned str> result: 'IN'
US <Future at 0x3aff410 state=finished returned str> result: 'US'
知識點:
- summit方法把一個可執行對象(函數),變為一個future對象,并且記錄了執行時間,表示待執行打操作。
- futures.as_completed在所有的future執行完畢后,產出future對象。而后可以使用future.result()獲取結果
- 直接打印future對象(調用future的repr()方法),會顯示future的狀態,例如running、pending(等待)、finished
GIL和阻塞型I/O
嚴格來說,上面實現的多線程并發腳本都不能實現并行下載。
使用concurrent.futures庫實現的示例,都會受到GIL(Global Interpreter Lock,全局解釋器鎖)的限制,腳本只能在單個線程中執行。
CPython解釋器本身就不是線程安全的,因此會有全局解釋器鎖GIL,一次只允許使用一個線程執行Python字節碼。
因此一個Python進程通常不能同時使用多個CPU核心。(在Jython和IronPython中沒有此限制,目前最快的PyPy解釋器也存在GIL) IronPython是.net實現的
Python代碼無法控制GIL,然而,標準庫中所以執行阻塞型IO操作的函數,在等待操作系統返回結果時都會釋放GIL。這意味著在Python語言這個層次上可以使用多線程,而IO密集型操作能從中受益:一個Python線程等待網絡響應時,阻塞型IO函數會釋放GIL,再運行一個線程。比如time.sleep()函數也會釋放GIL。
GIL簡化了CPython和C語言擴展的實現。得益于GIL,Python有很多C語言擴展。
使用concurrent.futures模塊多進程
在處理CPU密集型操作時,可以使用多進程,實現真正的并行計算。
使用ProcessPoolExecutor類把任務分配給多個Python進程處理。因此如果需要做CPU密集型操作,使用這個模塊多進程能繞開GIL,利用所有可用的CPU核心。
ProcessPoolExecutor和ThreadPoolExecutor類都實現了通用的Executor接口,因此使用concurrent.futures模塊能輕松的把基于線程的方案轉成基于進程的方案。
這兩個實現Executor接口的類,唯一的區別是,ThreadPoolExecutor.__init__方法需要max_workers參數,指定線程池中線程的數量。
但是在ProcessPoolExecutor類中,這個參數是可選的,而且大多數情況下使用默認值:os.cpu_count()函數返回的CPU數量。因為對于CPU密集型操作來說,不可能要求使用超過CPU數量的進程。
經過測試,使用ProcessPoolExecutor實例下載20個國旗的時間,要比ThreadPoolExecutor要慢,主要原因是我電腦是四核八線程,八個邏輯處理器,因此限制只有4個并發下載,而使用線程池的版本有20個工作線程。
ProcessPoolExecutor的價值體現在CPU密集型操作上。比如對于加密算法上,使用ProcessPoolExecutor類派生出四個工作進程后,性能可以提高兩倍。
如果使用PyPy比CPython相比,速度又能提高3.8倍。所以使用Python進行CPU密集型操作,應該試試PyPy,普遍快3.8~5.1倍。
實驗Executor.map方法
若想并發運行多個可調用對象,最簡單是是使用Executor.map方法。
示例,演示Executor.map方法的某些運作細節
import time
from concurrent import futures
def display(*args):
"""把參數打印前,加上時間顯示"""
print(time.strftime('[%H:%M:%S]'), end=" ")
print(*args)
def loiter(n):
"""開始時顯示一個消息,然后休眠n秒,最后再結束的時候在顯示一個消息
消息使用制表符縮進,縮進量由n值確定
loiter:徘徊,閑著,閑蕩
"""
msg = '{}loiter({}):doing nothing for {}s'
display(msg.format('\t' * n, n, n))
time.sleep(n)
msg = '{}loiter({}):done'
display(msg.format('\t' * n, n))
return n * 10 # 隨意返回一個結果
def main():
display('Script starting.') # 腳本開始
executor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(loiter, range(5)) # 把5個任務交個3個線程
display('results :', results) # 打印調用executor.map的結果,是一個生成器
display('waiting for individual results:') # 等待個體結果
for i, result in enumerate(results):
display('result {}: {}'.format(i, result))
if __name__ == '__main__':
main()
打印
[17:40:08] Script starting.
[17:40:08] loiter(0):doing nothing for 0s
[17:40:08] loiter(0):done
[17:40:08] ?? ?loiter(1):doing nothing for 1s
[17:40:08] ?? ??? ?loiter(2):doing nothing for 2s
[17:40:08][17:40:08] ?results :?? ??? ??? ?loiter(3):doing nothing for 3s?
<generator object Executor.map.<locals>.result_iterator at 0x0318CDB0>
[17:40:08] waiting for individual results:
[17:40:08] result 0: 0
[17:40:09] ?? ?loiter(1):done
[17:40:09] ?? ??? ??? ??? ?loiter(4):doing nothing for 4s
[17:40:09] result 1: 10
[17:40:10] ?? ??? ?loiter(2):done
[17:40:10] result 2: 20
[17:40:11] ?? ??? ??? ?loiter(3):done
[17:40:11] result 3: 30
[17:40:13] ?? ??? ??? ??? ?loiter(4):done
[17:40:13] result 4: 40
知識點:
- 示例中把5個任務交給executor(3個線程),其中前三個任務會立即開始;這是非阻塞調用。
- 在for循環中會隱式調用next(results),這個函數又會在第一個任務的future上調用future.result()方法。result方法會阻塞,直到這個future運行結束。所以這個for循環每次迭代都會阻塞,等到結果出來后,才會繼續。
- 每次的打印結果都可能不一樣。由于sleep函數總會釋放GIL,即使是sleep(0),所以loiter(1)有可能在loiter(0)結束之前開始運行,但是這個示例中沒有。三個線程是同時開始。
- executor.map的結果是一個生成器,這個操作不會阻塞。
- loiter(0)的結果result 0: 0打印沒有阻塞的原因是,在for循環之前future已經執行完成,可以看到輸出了done。
綜上,Executor.map函數易于使用,有個特征算是優點,但也可能沒用變成缺點,具體情況取決于需求:map函數返回的結果順序于調用開始的順序一致。
如果第一個任務生成結果用時10秒,而其他任務調用只用1秒,代碼就會阻塞10秒,獲取map方法返回生成器的第一個結果。在此之后,獲取后續結果時不會阻塞,因為后續的調用已經結束,所以,獲取循環所有結果,阻塞的用時等于最長的任務時間。
如果必須等待獲取所有結果后再處理的場景,這種行為沒問題;不過,通常更常用的方式是,不管提交的順序,只有有結果就獲取。這樣就要使用executor.submit方法和futures.as_completed函數結合起來使用。
executor.submit和futures.as_completed這個組合比executor.map更靈活:
- 因為submit方法能處理不同的可調用對象和參數,而executor.map只能處理參數不同的同一個可調用對象
- 此外,傳給future.as_completed函數的future集合可以來自多個Executor實例
- futures.as_completed只返回已經運行結束的future
顯示下載進度條
Python內置庫有tqdm包,taqadum在阿拉伯語中的意思是進展。
tqdm可以在長循環中添加一個進度提示信息,用戶只需要封裝任意的迭代器 tqdm(iterator)
import time
from tqdm import tqdm
for i in tqdm(range(1000)):
time.sleep(.01)
100%|██████████| 1000/1000 [00:10<00:00, 95.34it/s]
tqdm能處理任何可迭代對象,生成一個迭代器;使用這個迭代器時,顯示進度條和完成全部迭代
為了計算剩余時間,tqdm函數要獲取可以使用len函數的可迭代對象,或者在第二個參數中指定預期的元素數量。
例如,futures.as_completed函數的結果,就不支持len函數,只能使用tdqm的第二個參數total=來指定數量。
網絡下載增加錯誤處理和進度條
下面的示例中負責下載一個文件的函數(download_one)中使用相同的策略處理HTTP 404錯誤。其他異常則向上冒泡,交給download_many函數處理。
示例,負責下載的基本函數。
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = requests.get(url)
if resp.status_code != 200:
resp.raise_for_status() # 如果狀態碼不是200,產生一個HttpError的異常
return resp.content
def download_one(cc, base_url, verbose=False):
try:
image = get_flag(base_url, cc)
except requests.exceptions.HTTPError as exc:
res = exc.response
if res.status_code == 404:
status = HTTPStatus.not_found
msg = 'not found'
else: # 如果不是404異常,向上冒泡,傳給調用方
raise
else:
sava_flag(image, cc.lower() + '.gif')
status = HTTPStatus.ok
msg = 'ok'
if verbose:
print(cc, msg)
return Result(status, cc)
示例,依序下載的download_many函數
def download_many(cc_list, base_url, verbose, max_req):
"""實現依序下載"""
counter = collections.Counter() # 統計不同的下載狀態:HTTPStatus.ok、HTTPStatus.not_found、HTTPStatus.error
cc_iter = sorted(cc_list)
if not verbose:
cc_iter = tqdm.tqdm(cc_iter) # 如果不需要詳細模式,就使用進度條展示
for cc in cc_iter:
try:
res = download_one(cc, base_url, verbose)
except requests.exceptions.HTTPError as exc:
error_msg = "HTTP error {res.status_code} - {res.reason}"
error_msg = error_msg.format(res=exc.response)
except requests.exceptions.ConnectionError as exc:
error_msg = 'Connection error'
else:
error_msg = ''
status = res.status
if error_msg:
status = HTTPStatus.error
counter[status] += 1
if verbose and error_msg:
print('*** Error for {}:{}'.format(cc, error_msg))
return counter
知識點:
requests.exceptions中有所有的requests相關的異常類,可以用來捕獲相關異常。
如果有響應信息后,產生的異常,異常對象exc.response的status_code狀態碼和reason異常原因
示例,多線程下載的download_many函數
default_concur_req = 30 # 默認的線程池大小
max_concur_req = 1000 # 最大并發請求數,這是一個安全措施
def download_many(cc_list, base_url, verbose, concur_req):
counter = collections.Counter()
with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
to_do_map = {}
for cc in sorted(cc_list):
future = executor.submit(download_one, cc, base_url, verbose) # submit排定一個可調用對象的執行時間,返回一個Future實例
to_do_map[future] = cc # 把各個Future實例映射到國家代碼上,在錯誤處理時使用
done_iter = futures.as_completed(to_do_map) # 返回一個迭代器,在future運行結束后產出future
if not verbose:
done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # 如果不是詳細模式,就顯示進度條,因為done_iter沒有len函數,只能通過total參數傳入
for future in done_iter:
try:
res = future.result()
except requests.exceptions.HTTPError as exc:
error_msg = 'HTTP {res.status_code} - {res.reason}'
error_msg = error_msg.format(res=exc.response)
except requests.exceptions.ConnectionError as exc:
error_msg = "Connection error"
else:
error_msg = ""
status = res.status
if error_msg:
status = HTTPStatus.error
counter[status] +=1
if verbose and error_msg:
cc = to_do_map[future]
print('*** Error for {}:{}'.format(cc, error_msg))
return counter
知識點:
對futures.as_completed函數的慣用法:構建一個字典,把各個future映射到其他數據上,future運行結束后可能會有用。比如上述示例,把future映射到國家代碼上。
線程和多進程的代替方案
對于多線程,如果futures.ThreadPoolExecutor類對某個作業來首不夠靈活,可能要使用到threading模塊中的組件(如Thread、Lock、Semaphore等)自行制定方案,
比如使用queue模塊創建線程安全的隊列,在線程之間傳遞數據。futures.ThreadPoolExecutor類已經封裝好了這些組件。
對于CPU密集型工作來說,要啟動多個進程,規避GIL。創建多個進程的最簡單方式是用futures.ProcessPoolExecutor類。如果使用場景較復雜,需要更高級的工具,multiprocessing模塊的API和threading模塊相仿,不過作業交給多個進程處理。
原文鏈接:https://blog.csdn.net/lijiachang8/article/details/128877478
相關推薦
- 2022-09-01 MongoDB數據庫索引用法詳解_MongoDB
- 2023-07-08 QMessageBox顯示中文為亂碼
- 2023-07-25 使用Http請求調用第三方API
- 2023-03-20 淺談Redis在秒殺場景的作用_Redis
- 2022-05-11 解決GitLab中使用SSH的git clone總是提示輸入密碼且任何密碼都不對
- 2022-12-11 Golang?內存模型The?Go?Memory?Model_Golang
- 2022-05-13 Django-Rest framwork框架 序列化與反序列化
- 2022-06-02 Python學習之迭代器詳解_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支