日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

python?協程并發數控制_python

作者:??夢想橡皮擦???? ? 更新時間: 2022-07-09 編程語言

前言:

本篇博客要采集的站點:【看歷史,通天下-歷史劇網】

目標數據是該站點下的熱門歷史事件,列表頁分頁規則如下所示:

http://www.lishiju.net/hotevents/p0
http://www.lishiju.net/hotevents/p1
http://www.lishiju.net/hotevents/p2

首先我們通過普通的多線程,對該數據進行采集,由于本文主要目的是學習如何控制并發數,所以每頁僅輸出歷史事件的標題內容。

普通的多線程代碼:

import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.__url = url
    def run(self):
        res = requests.get(url=self.__url)
        soup = BeautifulSoup(res.text, 'html.parser')
        title_tags = soup.find_all(attrs={'class': 'item-title'})
        event_names = [item.a.text for item in title_tags]
        print(event_names)
        print("")
if __name__ == "__main__":
    start_time = time.perf_counter()
    threads = []
    for i in range(111):  # 創建了110個線程。
        threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
    for t in threads:
        t.start()  # 啟動了110個線程。
    for t in threads:
        t.join()  # 等待線程結束
    print("累計耗時:", time.perf_counter() - start_time)
    # 累計耗時: 1.537718624

上述代碼同時開啟所有線程,累計耗時?1.5 秒,程序采集結束。

多線程之信號量

python 信號量(Semaphore)用來控制線程并發數,信號量管理一個內置的計數器。 信號量對象每次調用其?acquire()方法時,信號量計數器執行?-1?操作,調用?release()方法,計數器執行?+1?操作,當計數器等于 0 時,acquire()方法會阻塞線程,一直等到其它線程調用?release()后,計數器重新?+1,線程的阻塞才會解除。

使用?threading.Semaphore()創建一個信號量對象。

修改上述并發代碼:

import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.__url = url
    def run(self):
        if semaphore.acquire():  # 計數器 -1
            print("正在采集:", self.__url)
            res = requests.get(url=self.__url)
            soup = BeautifulSoup(res.text, 'html.parser')
            title_tags = soup.find_all(attrs={'class': 'item-title'})
            event_names = [item.a.text for item in title_tags]
            print(event_names)
            print("")
            semaphore.release()  # 計數器 +1
if __name__ == "__main__":
    semaphore = threading.Semaphore(5)  # 控制每次最多執行 5 個線程
    start_time = time.perf_counter()
    threads = []
    for i in range(111):  # 創建了110個線程。
        threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
    for t in threads:
        t.start()  # 啟動了110個線程。
    for t in threads:
        t.join()  # 等待線程結束
    print("累計耗時:", time.perf_counter() - start_time)
    # 累計耗時: 2.8005530640000003

當控制并發線程數量之后,累計耗時變多。

補充知識點之 GIL:

GIL是 python 里面的全局解釋器鎖(互斥鎖),在同一進程,同一時間下,只能運行一個線程,這就導致了同一個進程下多個線程,只能實現并發而不能實現并行。

需要注意 python 語言并沒有全局解釋鎖,只是因為歷史的原因,在?CPython解析器中,無法移除?GIL,所以使用?CPython解析器,是會受到互斥鎖影響的。

還有一點是在編寫爬蟲程序時,多線程比單線程性能是有所提升的,因為遇到 I/O 阻塞會自動釋放?GIL鎖。

協程中使用信號量控制并發

下面將信號量管理并發數,應用到協程代碼中,在正式編寫前,使用協程寫法重構上述代碼。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(url):
    print("正在采集:", url)
    async with aiohttp.request('GET', url) as res:
        html = await res.text()
        soup = BeautifulSoup(html, 'html.parser')
        title_tags = soup.find_all(attrs={'class': 'item-title'})
        event_names = [item.a.text for item in title_tags]
        print(event_names)
async def main():
    tasks = [asyncio.ensure_future(get_title("http://www.lishiju.net/hotevents/p{}".format(i))) for i in range(111)]
    dones, pendings = await asyncio.wait(tasks)
    # for task in dones:
    #     print(len(task.result()))
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    print("代碼運行時間為:", time.perf_counter() - start_time)
    # 代碼運行時間為: 1.6422313430000002

代碼一次性并發 110 個協程,耗時?1.6 秒執行完畢,接下來就對上述代碼,增加信號量管理代碼。

核心代碼是?semaphore = asyncio.Semaphore(10),控制事件循環中并發的協程數量。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(semaphore, url):
    async with semaphore:
        print("正在采集:", url)
        async with aiohttp.request('GET', url) as res:
            html = await res.text()
            soup = BeautifulSoup(html, 'html.parser')
            title_tags = soup.find_all(attrs={'class': 'item-title'})
            event_names = [item.a.text for item in title_tags]
            print(event_names)
async def main():
    semaphore = asyncio.Semaphore(10)  # 控制每次最多執行 10 個線程
    tasks = [asyncio.ensure_future(get_title(semaphore, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
             range(111)]
    dones, pendings = await asyncio.wait(tasks)
    # for task in dones:
    #     print(len(task.result()))
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    print("代碼運行時間為:", time.perf_counter() - start_time)
    # 代碼運行時間為: 2.227831242

aiohttp 中 TCPConnector 連接池

既然上述代碼已經用到了?aiohttp?模塊,該模塊下通過限制同時連接數,也可以控制線程并發數量,不過這個不是很好驗證,所以從數據上進行驗證,先設置控制并發數為 2,測試代碼運行時間為?5.56?秒,然后修改并發數為 10,得到的時間為?1.4?秒,與協程信號量控制并發數得到的時間一致。所以使用?TCPConnector?連接池控制并發數也是有效的。

import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(session, url):
    async with session.get(url) as res:
        print("正在采集:", url)
        html = await res.text()
        soup = BeautifulSoup(html, 'html.parser')
        title_tags = soup.find_all(attrs={'class': 'item-title'})
        event_names = [item.a.text for item in title_tags]
        print(event_names)
async def main():
    connector = aiohttp.TCPConnector(limit=1)  # 限制同時連接數
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [asyncio.ensure_future(get_title(session, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
                 range(111)]
        await asyncio.wait(tasks)
if __name__ == '__main__':
    start_time = time.perf_counter()
    asyncio.run(main())
    print("代碼運行時間為:", time.perf_counter() - start_time)

原文鏈接:https://juejin.cn/post/7068824518902906916

欄目分類
最近更新