網站首頁 編程語言 正文
前言
Multiprocessing.Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;
但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來執行它。
Pool類用于需要執行的目標很多,而手動限制進程數量又太繁瑣時,如果目標少且不用控制進程數量則可以用Process類。
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes: 是要使用的工作進程數。如果進程是None,那么使用返回的數字os.cpu_count()。也就是說根據本地的cpu個數決定,processes小于等于本地的cpu個數;
- initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
- maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活。
- context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context。
實例方法
(1)apply(func [,args [,kwds ] ] )
使用參數args和關鍵字參數kwds調用func。它會阻塞,直到結果準備就緒。鑒于此塊,更適合并行執行工作。此外,func 僅在池中的一個工作程序中執行。
from multiprocessing import Pool
import time
def test(p):
print(p)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=10)
for i in range(500):
'''
('\n'
' (1)遍歷500個可迭代對象,往進程池放一個子進程\n'
' (2)執行這個子進程,等子進程執行完畢,再往進程池放一個子進程,再執行。(同時只執行一個子進程)\n'
' for循環執行完畢,再執行print函數。\n'
' ')
'''
pool.apply(test, args=(i,)) #維持執行的進程總數為10,當一個進程執行完后啟動一個新進程.
print('test')
pool.close()
pool.join()
'''
1
2
3
4
5
6
7
8
Process finished with exit code -1
'''
for循環內執行的步驟順序,往進程池中添加一個子進程,執行子進程,等待執行完畢再添加一個子進程……等500個子進程都執行完了,再執行print。(從結果來看,并沒有多進程并發)
(2)apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ] )
異步進程池(非阻塞),返回結果對象的方法的變體。如果指定了回調,則它應該是可調用的,它接受單個參數。當結果變為就緒時,將對其應用回調,即除非調用失敗,在這種情況下將應用error_callback。如果指定了error_callback,那么它應該是一個可調用的,它接受一個參數。如果目標函數失敗,則使用異常實例調用error_callback。回調應立即完成,否則處理結果的線程將被阻止。
from multiprocessing import Pool
import time
def test(p):
print(p)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=2)
for i in range(500):
'''
(1)循環遍歷,將500個子進程添加到進程池(相對父進程會阻塞)\n'
(2)每次執行2個子進程,等一個子進程執行完后,立馬啟動新的子進程。(相對父進程不阻塞)\n'
'''
pool.apply_async(test, args=(i,)) #維持執行的進程總數為10,當一個進程執行完后啟動一個新進程.
print('test')
pool.close()
pool.join()
'''
test
0
1
2
3
4
5
6
7
Process finished with exit code -1
'''
調用join之前,先調用close或者terminate方法,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束。
(3)map(func,iterable [,chunksize ] )
map()內置函數的并行等價物(盡管它只支持一個可迭代的參數)。它會阻塞,直到結果準備就緒。此方法將iterable內的每一個對象作為單獨的任務提交給進程池。可以通過將chunksize設置為正整數來指定這些塊的(近似)大小。
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = [1, 2, 3]
pool = Pool(processes=2) #定義最大的進程數
pool.map(test, lists) #p必須是一個可迭代變量。
pool.close()
pool.join()
'''
1
2
3
'''
(4)map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ] )
map()返回結果對象的方法的變體。需要傳入可迭代對象iterable
from multiprocessing import Pool
import time
def test(p):
print(p)
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=2)
# for i in range(500):
# '''
# (1)循環遍歷,將500個子進程添加到進程池(相對父進程會阻塞)\n'
# (2)每次執行2個子進程,等一個子進程執行完后,立馬啟動新的子進程。(相對父進程不阻塞)\n'
# '''
# pool.apply_async(test, args=(i,)) #維持執行的進程總數為10,當一個進程執行完后啟動一個新進程.
pool.map_async(test, range(500))
print('test')
pool.close()
pool.join()
'''
test
0
63
1
64
2
65
3
66
Process finished with exit code -1
'''
(5)imap(func,iterable [,chunksize ] )
返回迭代器,next()調用返回的迭代器的方法得到結果,imap()方法有一個可選的超時參數: next(timeout)將提高multiprocessing.TimeoutError如果結果不能內退回超時秒。
(6)close()
防止任何更多的任務被提交到池中。 一旦完成所有任務,工作進程將退出。
(7)terminate()
立即停止工作進程而不完成未完成的工作。當池對象被垃圾收集時,terminate()將立即調用。
(8)join()
等待工作進程退出。必須打電話close()或 terminate()使用之前join()。
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
'''
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
Traceback (most recent call last):
File "C:/Users/BruceWong/Desktop/develop/multiprocessingpool.py", line 19, in <module>
print(next(res))
TypeError: 'MapResult' object is not an iterator
Process finished with exit code 1
原文鏈接:https://blog.csdn.net/qdPython/article/details/127073777
相關推薦
- 2022-02-26 docker 分布式 lnmp 鏡像制作
- 2022-03-14 Response to preflight request doesn't pass access
- 2022-08-01 基于Python編寫一個二維碼生成器_python
- 2022-05-21 python中的變量命名規則詳情_python
- 2022-11-23 python?Multiprocessing.Pool進程池模塊詳解_python
- 2022-08-19 MapReduce讀取定長文件入庫Hive表Orc格式
- 2022-06-10 使用Android實現一個懸浮在軟鍵盤上的輸入欄_Android
- 2022-11-21 基于C++實現一個日期計算器_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支