日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

Python讀取文件的四種方式的實(shí)例詳解_python

作者:天天開心學(xué)編程 ? 更新時間: 2022-05-28 編程語言

故事背景:最近在處理Wikipedia的數(shù)據(jù)時發(fā)現(xiàn)由于數(shù)據(jù)量過大,之前的文件讀取和數(shù)據(jù)處理方法幾乎不可用,或耗時非常久。今天學(xué)校安排統(tǒng)一核酸檢查,剛好和文件讀取的過程非常相似。正好借此機(jī)會和大家一起從頭梳理一下幾種文件讀取方法。

故事設(shè)定:現(xiàn)在學(xué)校要求對所有同學(xué)進(jìn)行核酸采集,每位同學(xué)先在宿舍內(nèi)等候防護(hù)人員(以下簡稱“大白”)叫號,叫到自己時去停車場排隊(duì)等候大白對自己進(jìn)行采集,采集完之后的樣本由大白統(tǒng)一有序收集并儲存。

名詞解釋:

  • 學(xué)生:所有的學(xué)生是一個大文件,每個學(xué)生是其中的一行數(shù)據(jù)
  • 宿舍:硬盤
  • 停車場:內(nèi)存
  • 核酸采集:數(shù)據(jù)處理
  • 樣本:處理后的數(shù)據(jù)
  • 大白:程序

學(xué)生數(shù)量特別少的情況

當(dāng)學(xué)生數(shù)量特別少時,可以考慮將所有學(xué)生統(tǒng)一叫到停車場等候,再依次進(jìn)行核酸采集。

方法一:簡單情況

此時的程序可以模擬為:

import time
from typing import List
 
 
def pick_all_students(dorm: str) -> List[str]:
    with open(dorm, "rt", encoding="utf8") as fin:
        students = fin.readlines()
        return students
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(dorm: str, sample_storeroom: str) -> None:
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        students = pick_all_students(dorm)
        for student in students:
            sample = pick_sample(student)
            fout.write(f"{sample}\n")
            fout.flush()
 
 
if __name__ == "__main__":
    process(
        "student_names.txt",
        "sample_storeroom.txt"
    )

注意,在第19行中,大白一次性把所有同學(xué)都叫到了停車場中。這種做法在學(xué)生比較少時做起來很快,但是如果學(xué)生特別多,停車場裝不下怎么辦?

停車場空間不夠時怎么辦?

方法二:邊讀邊處理

一般來說,由于停車場空間有限,我們不會采用一次性把所有學(xué)生都叫到停車場中,而是會一個一個地處理,這樣可以節(jié)約內(nèi)存空間。

import time
from typing import Iterator
 
 
def pick_one_student(dorm: str) -> Iterator[str]:
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            yield student
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(dorm: str, sample_storeroom: str) -> None:
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        for student in pick_one_student(dorm):
            sample = pick_sample(student)
            fout.write(f"{sample}\n")
            fout.flush()
 
 
if __name__ == "__main__":
    process(
        "student_names.txt",
        "sample_storeroom.txt"
    )

這里pick_one_student函數(shù)中的返回值是用yield返回的,一次只會返回一名同學(xué)。

不過,這種做法雖然確保了停車場不會滿員,但是這種做法在人數(shù)特別多的時候就不再適合了。雖然可以保證完成任務(wù),但由于每次只能采集一個同學(xué),程序的執(zhí)行并不高。特別是當(dāng)你的CPU有多個核時,會浪費(fèi)機(jī)器性能,出現(xiàn)一核有難,其它圍觀的現(xiàn)象。

怎么加快執(zhí)行效率?

大家可能也已經(jīng)注意到了,剛剛我們的場景中,不論采用哪種方法,都只有一名大白在工作。那我們能不能加派人手,從而提高效率呢?

答案當(dāng)然是可行的。我們現(xiàn)在先考慮增加兩名大白,使得一名大白專注于叫號,安排學(xué)生進(jìn)入停車場,另外一名大白專注于采集核酸,最后一名大白用于存儲核酸樣本。

方法三

import time
from multiprocessing import Queue, Process
from typing import Iterator
 
 
def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]:
    print("pick_student: started")
 
    picked_num = 0
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            stu_queue.put(student)
            picked_num += 1
            if picked_num % 500 == 0:
                print(f"pick_student: {picked_num}")
 
    # end signal
    stu_queue.put(None)
    print("pick_student: finished")
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(stu_queue: Queue, store_queue: Queue) -> None:
    print("process: started")
 
    process_num = 0
    while True:
        student = stu_queue.get()
        if student is not None:
            sample = pick_sample(student)
            store_queue.put(sample)
            process_num += 1
            if process_num % 500 == 0:
                print(f"process: {process_num}")
        else:
            break
 
    # end signal
    store_queue.put(None)
    print("process: finished")
 
 
def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
    print("store_sample: started")
 
    store_num = 0
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        while True:
            sample = store_queue.get()
            if sample is not None:
                fout.write(f"{sample}\n")
                fout.flush()
 
                store_num += 1
                if store_num % 500 == 0:
                    print(f"store_sample: {store_num}")
            else:
                break
 
    print("store_sample: finished")
 
 
if __name__ == "__main__":
    dorm = "student_names.txt"
    sample_storeroom = "sample_storeroom.txt"
 
    stu_queue = Queue()
    store_queue = Queue()
 
    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
    store_p.start()
    process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
    process_p.start()
    read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True)
    read_p.start()
 
    store_p.join()

這份代碼中,我們引入了多進(jìn)程的思路,將每個大白看作一個進(jìn)程,并使用了隊(duì)列Queue作為進(jìn)程間通信的媒介。stu_queue表示學(xué)生叫號進(jìn)停車場的隊(duì)列,store_queue表示已經(jīng)采集過的待存儲核酸樣本的隊(duì)列。

此外,為了控制進(jìn)程的停止,我們在pick_student和 process函數(shù)的最后都向各自隊(duì)列中添加了None作為結(jié)束標(biāo)志符。

假設(shè)有1w名學(xué)生(student_names.txt文件有1w行),經(jīng)過測試后發(fā)現(xiàn)上述方法的時間如下:

  • 方法一:1m40.716s
  • 方法二:1m40.717s
  • 方法三:1m41.097s

咦?不是做了分工嗎?怎么速度還變慢了?經(jīng)筆者觀察,這是因?yàn)榻刑柕拇蟀姿俣忍炝耍ㄎ募x取速度快)通常是TA已經(jīng)齊活了,另外倆人還在吭哧吭哧干活呢,體現(xiàn)不出來分工的優(yōu)勢。如果這個時候我們對法二和法三的叫號做延時操作,每個學(xué)生叫號之后停滯10ms再叫下一位學(xué)生,則方法三的處理時間幾乎不變,而方法二的時間則會延長至3m21.345s。

怎么加快處理速度?

上面提到,大白采核酸的時間較長,往往上一個人的核酸還沒采完,下一個人就已經(jīng)在后面等著了。我們能不能提高核酸采集這個動作(數(shù)據(jù)處理)的速度呢?其實(shí)一名大白執(zhí)行一次核酸采集的時間我們幾乎無法再縮短了,但是我們可以通過增加人手的方式,來達(dá)到這個目的。就像去銀行辦業(yè)務(wù),如果開放的窗口越多,那么每個人等待的時間就會越短。這里我們也采取類似的策略,增加核酸采集的窗口。

import time
from multiprocessing import Queue, Process, cpu_count
from typing import Iterator
 
 
def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]:
    print("pick_student: started")
 
    picked_num = 0
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            stu_queue.put(student)
            picked_num += 1
            if picked_num % 500 == 0:
                print(f"pick_student: {picked_num}")
 
    # end signal
    for _ in range(num_workers):
        stu_queue.put(None)
 
    print("pick_student: finished")
 
 
def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample
 
 
def process(stu_queue: Queue, store_queue: Queue) -> None:
    print("process: started")
 
    process_num = 0
    while True:
        student = stu_queue.get()
        if student is not None:
            sample = pick_sample(student)
            store_queue.put(sample)
            process_num += 1
            if process_num % 500 == 0:
                print(f"process: {process_num}")
        else:
            break
 
    print("process: finished")
 
 
def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
    print("store_sample: started")
 
    store_num = 0
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        while True:
            sample = store_queue.get()
            if sample is not None:
                fout.write(f"{sample}\n")
                fout.flush()
 
                store_num += 1
                if store_num % 500 == 0:
                    print(f"store_sample: {store_num}")
            else:
                break
 
    print("store_sample: finished")
 
 
if __name__ == "__main__":
    dorm = "student_names.txt"
    sample_storeroom = "sample_storeroom.txt"
    num_process = max(1, cpu_count() - 1)
 
    maxsize = 10 * num_process
    stu_queue = Queue(maxsize=maxsize)
    store_queue = Queue(maxsize=maxsize)
 
    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
    store_p.start()
    process_workers = []
    for _ in range(num_process):
        process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
        process_p.start()
        process_workers.append(process_p)
    read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True)
    read_p.start()
 
    for worker in process_workers:
        worker.join()
 
    # end signal
    store_queue.put(None)
    store_p.join()

總耗時 0m4.160s !我們來具體看看其中的細(xì)節(jié)部分:

首先我們將CPU核數(shù) - 3作為采核酸的大白數(shù)量。這里減3是為其它工作進(jìn)程保留了一些資源,你也可以根據(jù)自己的具體情況做調(diào)整

這次我們在 Queue中增加了 maxsize參數(shù),這個參數(shù)是限制隊(duì)列的最大長度,這個參數(shù)通常與你的實(shí)際內(nèi)存情況有關(guān)。如果數(shù)據(jù)特別多時要考慮做些調(diào)整。這里我采用10倍的工作進(jìn)程數(shù)目作為隊(duì)列的長度

注意這里pick_student函數(shù)中要為每個后續(xù)的工作進(jìn)程都添加一個結(jié)束標(biāo)志,因此最后會有個for循環(huán)

我們把之前放在process函數(shù)中的結(jié)束標(biāo)志提取出來,放在了最外側(cè),使得所有工作進(jìn)程均結(jié)束之后再關(guān)閉最后的store_p進(jìn)程

結(jié)語

總結(jié)來說,如果你的數(shù)據(jù)集特別小,用法一;通常情況下用法二;數(shù)據(jù)集特別大時用法四。

原文鏈接:https://blog.csdn.net/m0_64355682/article/details/123729265

欄目分類
最近更新