網站首頁 編程語言 正文
故事背景:最近在處理Wikipedia的數據時發現由于數據量過大,之前的文件讀取和數據處理方法幾乎不可用,或耗時非常久。今天學校安排統一核酸檢查,剛好和文件讀取的過程非常相似。正好借此機會和大家一起從頭梳理一下幾種文件讀取方法。
故事設定:現在學校要求對所有同學進行核酸采集,每位同學先在宿舍內等候防護人員(以下簡稱“大白”)叫號,叫到自己時去停車場排隊等候大白對自己進行采集,采集完之后的樣本由大白統一有序收集并儲存。
名詞解釋:
- 學生:所有的學生是一個大文件,每個學生是其中的一行數據
- 宿舍:硬盤
- 停車場:內存
- 核酸采集:數據處理
- 樣本:處理后的數據
- 大白:程序
學生數量特別少的情況
當學生數量特別少時,可以考慮將所有學生統一叫到停車場等候,再依次進行核酸采集。
方法一:簡單情況
此時的程序可以模擬為:
import time from typing import List def pick_all_students(dorm: str) -> List[str]: with open(dorm, "rt", encoding="utf8") as fin: students = fin.readlines() return students def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(dorm: str, sample_storeroom: str) -> None: with open(sample_storeroom, "wt", encoding="utf8") as fout: students = pick_all_students(dorm) for student in students: sample = pick_sample(student) fout.write(f"{sample}\n") fout.flush() if __name__ == "__main__": process( "student_names.txt", "sample_storeroom.txt" )
注意,在第19行中,大白一次性把所有同學都叫到了停車場中。這種做法在學生比較少時做起來很快,但是如果學生特別多,停車場裝不下怎么辦?
停車場空間不夠時怎么辦?
方法二:邊讀邊處理
一般來說,由于停車場空間有限,我們不會采用一次性把所有學生都叫到停車場中,而是會一個一個地處理,這樣可以節約內存空間。
import time from typing import Iterator def pick_one_student(dorm: str) -> Iterator[str]: with open(dorm, "rt", encoding="utf8") as fin: for student in fin: yield student def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(dorm: str, sample_storeroom: str) -> None: with open(sample_storeroom, "wt", encoding="utf8") as fout: for student in pick_one_student(dorm): sample = pick_sample(student) fout.write(f"{sample}\n") fout.flush() if __name__ == "__main__": process( "student_names.txt", "sample_storeroom.txt" )
這里pick_one_student函數中的返回值是用yield返回的,一次只會返回一名同學。
不過,這種做法雖然確保了停車場不會滿員,但是這種做法在人數特別多的時候就不再適合了。雖然可以保證完成任務,但由于每次只能采集一個同學,程序的執行并不高。特別是當你的CPU有多個核時,會浪費機器性能,出現一核有難,其它圍觀的現象。
怎么加快執行效率?
大家可能也已經注意到了,剛剛我們的場景中,不論采用哪種方法,都只有一名大白在工作。那我們能不能加派人手,從而提高效率呢?
答案當然是可行的。我們現在先考慮增加兩名大白,使得一名大白專注于叫號,安排學生進入停車場,另外一名大白專注于采集核酸,最后一名大白用于存儲核酸樣本。
方法三
import time from multiprocessing import Queue, Process from typing import Iterator def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]: print("pick_student: started") picked_num = 0 with open(dorm, "rt", encoding="utf8") as fin: for student in fin: stu_queue.put(student) picked_num += 1 if picked_num % 500 == 0: print(f"pick_student: {picked_num}") # end signal stu_queue.put(None) print("pick_student: finished") def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(stu_queue: Queue, store_queue: Queue) -> None: print("process: started") process_num = 0 while True: student = stu_queue.get() if student is not None: sample = pick_sample(student) store_queue.put(sample) process_num += 1 if process_num % 500 == 0: print(f"process: {process_num}") else: break # end signal store_queue.put(None) print("process: finished") def store_sample(store_queue: Queue, sample_storeroom: str) -> None: print("store_sample: started") store_num = 0 with open(sample_storeroom, "wt", encoding="utf8") as fout: while True: sample = store_queue.get() if sample is not None: fout.write(f"{sample}\n") fout.flush() store_num += 1 if store_num % 500 == 0: print(f"store_sample: {store_num}") else: break print("store_sample: finished") if __name__ == "__main__": dorm = "student_names.txt" sample_storeroom = "sample_storeroom.txt" stu_queue = Queue() store_queue = Queue() store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True) store_p.start() process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True) process_p.start() read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True) read_p.start() store_p.join()
這份代碼中,我們引入了多進程的思路,將每個大白看作一個進程,并使用了隊列Queue作為進程間通信的媒介。stu_queue表示學生叫號進停車場的隊列,store_queue表示已經采集過的待存儲核酸樣本的隊列。
此外,為了控制進程的停止,我們在pick_student和 process函數的最后都向各自隊列中添加了None作為結束標志符。
假設有1w名學生(student_names.txt文件有1w行),經過測試后發現上述方法的時間如下:
- 方法一:1m40.716s
- 方法二:1m40.717s
- 方法三:1m41.097s
咦?不是做了分工嗎?怎么速度還變慢了?經筆者觀察,這是因為叫號的大白速度太快了(文件讀取速度快)通常是TA已經齊活了,另外倆人還在吭哧吭哧干活呢,體現不出來分工的優勢。如果這個時候我們對法二和法三的叫號做延時操作,每個學生叫號之后停滯10ms再叫下一位學生,則方法三的處理時間幾乎不變,而方法二的時間則會延長至3m21.345s。
怎么加快處理速度?
上面提到,大白采核酸的時間較長,往往上一個人的核酸還沒采完,下一個人就已經在后面等著了。我們能不能提高核酸采集這個動作(數據處理)的速度呢?其實一名大白執行一次核酸采集的時間我們幾乎無法再縮短了,但是我們可以通過增加人手的方式,來達到這個目的。就像去銀行辦業務,如果開放的窗口越多,那么每個人等待的時間就會越短。這里我們也采取類似的策略,增加核酸采集的窗口。
import time from multiprocessing import Queue, Process, cpu_count from typing import Iterator def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]: print("pick_student: started") picked_num = 0 with open(dorm, "rt", encoding="utf8") as fin: for student in fin: stu_queue.put(student) picked_num += 1 if picked_num % 500 == 0: print(f"pick_student: {picked_num}") # end signal for _ in range(num_workers): stu_queue.put(None) print("pick_student: finished") def pick_sample(student: str) -> str: time.sleep(0.01) sample = f"{student.strip()}'s sample" return sample def process(stu_queue: Queue, store_queue: Queue) -> None: print("process: started") process_num = 0 while True: student = stu_queue.get() if student is not None: sample = pick_sample(student) store_queue.put(sample) process_num += 1 if process_num % 500 == 0: print(f"process: {process_num}") else: break print("process: finished") def store_sample(store_queue: Queue, sample_storeroom: str) -> None: print("store_sample: started") store_num = 0 with open(sample_storeroom, "wt", encoding="utf8") as fout: while True: sample = store_queue.get() if sample is not None: fout.write(f"{sample}\n") fout.flush() store_num += 1 if store_num % 500 == 0: print(f"store_sample: {store_num}") else: break print("store_sample: finished") if __name__ == "__main__": dorm = "student_names.txt" sample_storeroom = "sample_storeroom.txt" num_process = max(1, cpu_count() - 1) maxsize = 10 * num_process stu_queue = Queue(maxsize=maxsize) store_queue = Queue(maxsize=maxsize) store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True) store_p.start() process_workers = [] for _ in range(num_process): process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True) process_p.start() process_workers.append(process_p) read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True) read_p.start() for worker in process_workers: worker.join() # end signal store_queue.put(None) store_p.join()
總耗時 0m4.160s !我們來具體看看其中的細節部分:
首先我們將CPU核數 - 3作為采核酸的大白數量。這里減3是為其它工作進程保留了一些資源,你也可以根據自己的具體情況做調整
這次我們在 Queue中增加了 maxsize參數,這個參數是限制隊列的最大長度,這個參數通常與你的實際內存情況有關。如果數據特別多時要考慮做些調整。這里我采用10倍的工作進程數目作為隊列的長度
注意這里pick_student函數中要為每個后續的工作進程都添加一個結束標志,因此最后會有個for循環
我們把之前放在process函數中的結束標志提取出來,放在了最外側,使得所有工作進程均結束之后再關閉最后的store_p進程
結語
總結來說,如果你的數據集特別小,用法一;通常情況下用法二;數據集特別大時用法四。
原文鏈接:https://blog.csdn.net/m0_64355682/article/details/123729265
相關推薦
- 2022-04-05 Springboot為什么加載不上application.yml的配置文件
- 2022-08-31 Centos安裝python3與scapy模塊的問題及解決方法_python
- 2022-04-20 教你python?中如何取出colomap部分的顏色范圍_python
- 2022-08-10 python中ThreadPoolExecutor線程池和ProcessPoolExecutor進程
- 2022-06-25 React服務端渲染和同構的實現_React
- 2022-08-10 Python多任務版靜態Web服務器實現示例_python
- 2022-11-06 Android淺析viewBinding和DataBinding_Android
- 2022-05-25 Jenkins 簡單的從git上構建一個maven項目
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支