日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

python?協(xié)程并發(fā)數(shù)控制_python

作者:??夢(mèng)想橡皮擦???? ? 更新時(shí)間: 2022-07-09 編程語(yǔ)言

前言:

本篇博客要采集的站點(diǎn):【看歷史,通天下-歷史劇網(wǎng)】

目標(biāo)數(shù)據(jù)是該站點(diǎn)下的熱門(mén)歷史事件,列表頁(yè)分頁(yè)規(guī)則如下所示:

http://www.lishiju.net/hotevents/p0
http://www.lishiju.net/hotevents/p1
http://www.lishiju.net/hotevents/p2

首先我們通過(guò)普通的多線程,對(duì)該數(shù)據(jù)進(jìn)行采集,由于本文主要目的是學(xué)習(xí)如何控制并發(fā)數(shù),所以每頁(yè)僅輸出歷史事件的標(biāo)題內(nèi)容。

普通的多線程代碼:

import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.__url = url
    def run(self):
        res = requests.get(url=self.__url)
        soup = BeautifulSoup(res.text, 'html.parser')
        title_tags = soup.find_all(attrs={'class': 'item-title'})
        event_names = [item.a.text for item in title_tags]
        print(event_names)
        print("")
if __name__ == "__main__":
    start_time = time.perf_counter()
    threads = []
    for i in range(111):  # 創(chuàng)建了110個(gè)線程。
        threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
    for t in threads:
        t.start()  # 啟動(dòng)了110個(gè)線程。
    for t in threads:
        t.join()  # 等待線程結(jié)束
    print("累計(jì)耗時(shí):", time.perf_counter() - start_time)
    # 累計(jì)耗時(shí): 1.537718624

上述代碼同時(shí)開(kāi)啟所有線程,累計(jì)耗時(shí)?1.5 秒,程序采集結(jié)束。

多線程之信號(hào)量

python 信號(hào)量(Semaphore)用來(lái)控制線程并發(fā)數(shù),信號(hào)量管理一個(gè)內(nèi)置的計(jì)數(shù)器。 信號(hào)量對(duì)象每次調(diào)用其?acquire()方法時(shí),信號(hào)量計(jì)數(shù)器執(zhí)行?-1?操作,調(diào)用?release()方法,計(jì)數(shù)器執(zhí)行?+1?操作,當(dāng)計(jì)數(shù)器等于 0 時(shí),acquire()方法會(huì)阻塞線程,一直等到其它線程調(diào)用?release()后,計(jì)數(shù)器重新?+1,線程的阻塞才會(huì)解除。

使用?threading.Semaphore()創(chuàng)建一個(gè)信號(hào)量對(duì)象。

修改上述并發(fā)代碼:

import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.__url = url
    def run(self):
        if semaphore.acquire():  # 計(jì)數(shù)器 -1
            print("正在采集:", self.__url)
            res = requests.get(url=self.__url)
            soup = BeautifulSoup(res.text, 'html.parser')
            title_tags = soup.find_all(attrs={'class': 'item-title'})
            event_names = [item.a.text for item in title_tags]
            print(event_names)
            print("")
            semaphore.release()  # 計(jì)數(shù)器 +1
if __name__ == "__main__":
    semaphore = threading.Semaphore(5)  # 控制每次最多執(zhí)行 5 個(gè)線程
    start_time = time.perf_counter()
    threads = []
    for i in range(111):  # 創(chuàng)建了110個(gè)線程。
        threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
    for t in threads:
        t.start()  # 啟動(dòng)了110個(gè)線程。
    for t in threads:
        t.join()  # 等待線程結(jié)束
    print("累計(jì)耗時(shí):", time.perf_counter() - start_time)
    # 累計(jì)耗時(shí): 2.8005530640000003

當(dāng)控制并發(fā)線程數(shù)量之后,累計(jì)耗時(shí)變多。

補(bǔ)充知識(shí)點(diǎn)之 GIL:

GIL是 python 里面的全局解釋器鎖(互斥鎖),在同一進(jìn)程,同一時(shí)間下,只能運(yùn)行一個(gè)線程,這就導(dǎo)致了同一個(gè)進(jìn)程下多個(gè)線程,只能實(shí)現(xiàn)并發(fā)而不能實(shí)現(xiàn)并行。

需要注意 python 語(yǔ)言并沒(méi)有全局解釋鎖,只是因?yàn)闅v史的原因,在?CPython解析器中,無(wú)法移除?GIL,所以使用?CPython解析器,是會(huì)受到互斥鎖影響的。

還有一點(diǎn)是在編寫(xiě)爬蟲(chóng)程序時(shí),多線程比單線程性能是有所提升的,因?yàn)橛龅?I/O 阻塞會(huì)自動(dòng)釋放?GIL鎖。

協(xié)程中使用信號(hào)量控制并發(fā)

下面將信號(hào)量管理并發(fā)數(shù),應(yīng)用到協(xié)程代碼中,在正式編寫(xiě)前,使用協(xié)程寫(xiě)法重構(gòu)上述代碼。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(url):
    print("正在采集:", url)
    async with aiohttp.request('GET', url) as res:
        html = await res.text()
        soup = BeautifulSoup(html, 'html.parser')
        title_tags = soup.find_all(attrs={'class': 'item-title'})
        event_names = [item.a.text for item in title_tags]
        print(event_names)
async def main():
    tasks = [asyncio.ensure_future(get_title("http://www.lishiju.net/hotevents/p{}".format(i))) for i in range(111)]
    dones, pendings = await asyncio.wait(tasks)
    # for task in dones:
    #     print(len(task.result()))
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    print("代碼運(yùn)行時(shí)間為:", time.perf_counter() - start_time)
    # 代碼運(yùn)行時(shí)間為: 1.6422313430000002

代碼一次性并發(fā) 110 個(gè)協(xié)程,耗時(shí)?1.6 秒執(zhí)行完畢,接下來(lái)就對(duì)上述代碼,增加信號(hào)量管理代碼。

核心代碼是?semaphore = asyncio.Semaphore(10),控制事件循環(huán)中并發(fā)的協(xié)程數(shù)量。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(semaphore, url):
    async with semaphore:
        print("正在采集:", url)
        async with aiohttp.request('GET', url) as res:
            html = await res.text()
            soup = BeautifulSoup(html, 'html.parser')
            title_tags = soup.find_all(attrs={'class': 'item-title'})
            event_names = [item.a.text for item in title_tags]
            print(event_names)
async def main():
    semaphore = asyncio.Semaphore(10)  # 控制每次最多執(zhí)行 10 個(gè)線程
    tasks = [asyncio.ensure_future(get_title(semaphore, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
             range(111)]
    dones, pendings = await asyncio.wait(tasks)
    # for task in dones:
    #     print(len(task.result()))
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    print("代碼運(yùn)行時(shí)間為:", time.perf_counter() - start_time)
    # 代碼運(yùn)行時(shí)間為: 2.227831242

aiohttp 中 TCPConnector 連接池

既然上述代碼已經(jīng)用到了?aiohttp?模塊,該模塊下通過(guò)限制同時(shí)連接數(shù),也可以控制線程并發(fā)數(shù)量,不過(guò)這個(gè)不是很好驗(yàn)證,所以從數(shù)據(jù)上進(jìn)行驗(yàn)證,先設(shè)置控制并發(fā)數(shù)為 2,測(cè)試代碼運(yùn)行時(shí)間為?5.56?秒,然后修改并發(fā)數(shù)為 10,得到的時(shí)間為?1.4?秒,與協(xié)程信號(hào)量控制并發(fā)數(shù)得到的時(shí)間一致。所以使用?TCPConnector?連接池控制并發(fā)數(shù)也是有效的。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(session, url):
    async with session.get(url) as res:
        print("正在采集:", url)
        html = await res.text()
        soup = BeautifulSoup(html, 'html.parser')
        title_tags = soup.find_all(attrs={'class': 'item-title'})
        event_names = [item.a.text for item in title_tags]
        print(event_names)
async def main():
    connector = aiohttp.TCPConnector(limit=1)  # 限制同時(shí)連接數(shù)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [asyncio.ensure_future(get_title(session, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
                 range(111)]
        await asyncio.wait(tasks)
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    print("代碼運(yùn)行時(shí)間為:", time.perf_counter() - start_time)

原文鏈接:https://juejin.cn/post/7068824518902906916

欄目分類
最近更新