日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

在?Python?中利用Pool?進(jìn)行多處理_python

作者:宇宙之一粟 ? 更新時(shí)間: 2022-06-22 編程語言

為什么要引入線程池

如果在程序中經(jīng)常要用到線程,頻繁的創(chuàng)建和銷毀線程會(huì)浪費(fèi)很多硬件資源,
所以需要把線程和任務(wù)分離。線程可以反復(fù)利用,省去了重復(fù)創(chuàng)建的麻煩。
在 Process 類中,我們必須顯式地創(chuàng)建流程。但是,Pool 類更方便,您不必手動(dòng)管理它。創(chuàng)建池對(duì)象的語法是 ?multiprocessing.Pool(processes, initializer, initargs, maxtasksperchild, context)?? 。所有參數(shù)都是可選的。

  • processes 表示您要?jiǎng)?chuàng)建的工作進(jìn)程的數(shù)量。默認(rèn)值通過 os.cpu_count() 獲取。
  • initializer第二個(gè)初始化器參數(shù)是一個(gè)用于初始化的函數(shù)。
  • initargs 是傳遞給它的參數(shù)。
  • maxtasksperchild 表示分配給每個(gè)子進(jìn)程的任務(wù)數(shù)。在完成該數(shù)量的任務(wù)之后,該進(jìn)程將被一個(gè)新的工作進(jìn)程替換。指定它的好處是任何未使用的資源都將被釋放。如果未提供任何內(nèi)容,則只要池存在,進(jìn)程就會(huì)存在。
import time
from multiprocessing import Pool


def square(x):
print(f"start process:{x}")
square = x * x
print(f"square {x}:{square}")
time.sleep(1)
print(f"end process:{x}")


if __name__ == "__main__":
starttime = time.time()
pool = Pool()
pool.map(square, range(0, 5))
pool.close()
endtime = time.time()
print(f"Time taken {endtime-starttime} seconds")

結(jié)果為:

start process:0
start process:1
square 1:1
square 0:0
end process:1
start process:2
end process:0
start process:3
square 2:4
square 3:9
end process:3
end process:2
start process:4
square 4:16
end process:4
Time taken 3.0474610328674316 seconds

在這里,我們從多處理模塊中導(dǎo)入 Pool 類。在主函數(shù)中,我們創(chuàng)建了一個(gè) Pool 類的對(duì)象。 pool.map() 將我們想要并行化的函數(shù)和一個(gè)可迭代的函數(shù)作為參數(shù)。它在可迭代的每個(gè)項(xiàng)目上運(yùn)行給定的函數(shù)。它還接受一個(gè)可選的 chunksize 參數(shù),它將可迭代對(duì)象拆分為等于給定大小的塊,并將每個(gè)塊作為單獨(dú)的任務(wù)傳遞。 pool.close() 用于拒絕新任務(wù)。
我們可以看到花費(fèi)的時(shí)間大約是 3 秒。
?pool.imap()?? 與 ?pool.map()?? 方法幾乎相同。不同的是,每個(gè)項(xiàng)目的結(jié)果都是在準(zhǔn)備好后立即收到的,而不是等待所有項(xiàng)目都完成。此外, ?map()?? 方法將可迭代對(duì)象轉(zhuǎn)換為列表(如果不是)。但是, ?imap()?? 方法沒有。

來看下一個(gè)例子:

import time
from multiprocessing import Pool


def square(x):
print(f"start process {x}")
square = x * x
time.sleep(1)
print(f"end process {x}")
return square


if __name__ == "__main__":
pool = Pool()
a = pool.map(square, range(0, 5))
print(a)

運(yùn)行結(jié)果:

start process 0
start process 1
end process 0
start process 2
end process 1
start process 3
end process 2
start process 4
end process 3
end process 4
[0, 1, 4, 9, 16]

from concurrent.futures import ThreadPoolExecutor

def say_hello():
print("Hello")

executor = ThreadPoolExecutor(50)
for i in range(0, 10):
executor.submit(say_hello)

練習(xí)

利用 Python 多線程模擬商品秒殺過程,不可以出現(xiàn)超買和超賣的情況。假設(shè)A商品有50件參與秒殺活動(dòng),10分鐘秒殺自動(dòng)結(jié)束。

  • kill_total 商品總數(shù)
  • kill_num 成功搶購數(shù)
  • kill_flag 有效標(biāo)志位
  • kill_user 成功搶購的用戶ID
from redis_db import pool
import redis
import random
from concurrent.futures import ThreadPoolExecutor

s = set()
while True:
if len(s) == 1000:
break
num = random.randint(10000, 100000)
s.add(num)
print(s)

con = redis.Redis(
connection_pool=pool
)

try:
con.delete("kill_total", "kill_num", "kill_flag", "kill_user")
con.set("kill_total", 50)
con.set("kill_num", 0)
con.set("kill_flag", 1)
con.expire("kill_flag", 600)

except Exception as e:
print(e)
finally:
del con

executor = ThreadPoolExecutor(200)
def buy():
connection = redis.Redis(
connection_pool=pool
)
pipline = connection.pipline()
try:
if connection.exists("kill_flag") == 1:
pipline.watch("kill_num", "kill_user")
total = pipline.get("kill_total")
num = int(pipline.get("kill_num").decode("utf-8"))
if num < total:
pipline.multi()
pipline.incr("kill_num")
user_id = s.pop()
pipline.rpush("kill_user", user_id)

pipline.execute()
except Exception as e:
print(e)
finally:
if "pipline" in dir():
pipline.reset()
del connection

for i in range(0, 1000):
executor.submit(buy)
print("秒殺活動(dòng)已經(jīng)結(jié)束")

原文鏈接:https://blog.51cto.com/yuzhou1su/5200609

欄目分類
最近更新