網站首頁 編程語言 正文
前言
這段時間看了《C++并發編程實戰》的基礎內容,想著利用最近學的知識自己實現一個簡單的線程池。
什么是線程池
線程池(thread pool)是一種線程使用模式。線程過多或者頻繁創建和銷毀線程會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著管理器分配可并發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價,以及保證了線程的可復用性。線程池不僅能夠保證內核的充分利用,還能防止過分調度。
思路
個人對線程池的理解是:利用已經創建的固定數量的線程去執行指定的任務,從而避免線程重復創建和銷毀帶來的額外開銷。
C++11中,線程我們可以理解為對應一個thread對象,任務可以理解為要執行的函數,通常是耗時的函數。
我們的任務多少和順序并非固定的,因此需要有一個方法能添加指定的任務,任務存放的地方應該是一個任務隊列,因為我們的線程數量有限,當任務很多時同時執行的任務數量也有限,因此任務需要排隊,遵循先來后到的原則。
當要執行一個任務時,意味著先將這個任務從隊列取出,再執行相應任務,而“取出”動作的執行者是線程池中的線程,這意味我們的隊列需要考慮多個線程在同一隊列上執行“取出”操作的問題,實際上,取出任務操作和添加任務操作也不能同時進行,否則會產生競爭條件;另一方面,程序本身如果就是多線程的,多個線程同時添加任務的操作也應該是互斥的。
當沒有任務可以執行時,所有線程應該什么也不做,當出現了一個任務時,應該將這個任務分配到任一線程中執行。實現上我們固然可以使用輪詢的方式判斷當前隊列是否有任務,有則取出(即使加了互斥鎖似乎也無法避免競爭條件?),但這樣會消耗無謂的CPU資源,寫輪詢周期難以選取。其實,我們可以使用condition_variable代替輪詢。
上述任務的創建和取出其實就是經典的生產者消費者模型。
我們將上面的內容都封裝在一個類中,取名ThreadPool,用戶可以在構造ThreadPool對象時指定線程池大小,之后可以隨時添加要執行的任務。
實現
class ThreadPool { public: ThreadPool(int n); ~ThreadPool(); void pushTask(packaged_task<void()> &&task); private: vector<thread*> threadPool; deque<packaged_task<void()>> taskQueue; void taskConsumer(); mutex taskMutex; condition_variable taskQueueCond; }; ThreadPool::ThreadPool(int n) { for (int i = 0; i < n; i++) { thread *t = new thread(&ThreadPool::taskConsumer,this); threadPool.push_back(t); t->detach(); } } ThreadPool::~ThreadPool() { while (!threadPool.empty()) { thread *t=threadPool.back(); threadPool.pop_back(); delete t; } } void ThreadPool::pushTask(packaged_task<void()> &&task) { { lock_guard<mutex> guard(taskMutex); taskQueue.push_back(std::move(task)); } taskQueueCond.notify_one(); } void ThreadPool::taskConsumer() { while (true) { unique_lock<mutex> lk(taskMutex); taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); }); packaged_task<void()> task=std::move(taskQueue.front()); taskQueue.pop_front(); lk.unlock(); task(); } }
這里我使用packaged_task作為任務,每當添加一個任務,就調用condition_variable::notify_one方法,調用condition_variable::wait的線程就會被喚醒,并檢查等待條件。這里有個小細節是notify_one在解鎖后執行,這樣避免線程喚醒后還要等待互斥鎖解鎖。
使用示例:
void Task1() { Sleep(1000); cout << "Task1"<<endl; } void Task5() { Sleep(5000); cout << "Task5" << endl; } class Worker { public: void run(); }; void Worker::run() { cout << "Worker::run start" << endl; Sleep(5000); cout << "Worker::run end" << endl; } int main() { ThreadPool pool(2); pool.pushTask(packaged_task<void()>(Task5)); pool.pushTask(packaged_task<void()>(Task1)); pool.pushTask(packaged_task<void()>(Task1)); Worker worker; pool.pushTask(packaged_task<void()>(bind(&Worker::run,&worker))); pool.pushTask(packaged_task<void()>([&](){worker.run();})); Sleep(20000); }
這個線程池目前有幾個缺點:
- 只能傳入調用形式為void()形式的函數或可調用對象,不能返回任務執行的值,只能通過其他方式同步任務執行結果(如果有)
- 傳入參數較為復雜,必須封裝一層packaged_task,調用對象方法時需要使用bind或者lambda表達式的方法封裝
以上缺點在當前版本的實現不予解決,日后另寫博文優化。
2021/12/29 更新之一:
事實上,我們只要將packaged_task改為funtion模板類,就可以簡化我們的調用參數:
class ThreadPool { public: ThreadPool(int n); ~ThreadPool(); void pushTask(function<void()> task); private: vector<thread*> threadPool; deque<function<void()>> taskQueue; void taskConsumer(); mutex taskMutex; condition_variable taskQueueCond; }; ThreadPool::ThreadPool(int n) { for (int i = 0; i < n; i++) { thread *t = new thread(&ThreadPool::taskConsumer,this); threadPool.push_back(t); t->detach(); } } ThreadPool::~ThreadPool() { while (!threadPool.empty()) { thread *t=threadPool.back(); threadPool.pop_back(); delete t; } } void ThreadPool::pushTask(function<void()> task) { { lock_guard<mutex> guard(taskMutex); taskQueue.push_back(std::move(task)); } taskQueueCond.notify_one(); } void ThreadPool::taskConsumer() { while (true) { unique_lock<mutex> lk(taskMutex); taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); }); function<void()> task=taskQueue.front(); taskQueue.pop_front(); lk.unlock(); task(); } }
調用代碼改為如下:
ThreadPool pool(2);
pool.pushTask(&Task5);
pool.pushTask(&Task1);
pool.pushTask(&Task1);
Worker worker;
pool.pushTask((bind(&Worker::run, &worker)));
pool.pushTask([&](){worker.run(); });//1
Sleep(15000);
我們可以執行指定的函數,也可以將要執行的代碼放入lambda表達式的函數體中,正如1處所示,這樣就能在其他線程中執行指定的代碼了。
2021/12/29 更新之二:
我們發現,main最后都要調用sleep函數來避免主線程在線程任務完成之前就退出,因此我們希望添加一個接口,等待線程所有任務完成,改進如下,其他函數同前:
class ThreadPool { public: ThreadPool(int n); ~ThreadPool(); void pushTask(function<void()> task); void waitAllTask(); private: vector<thread*> threadPool; deque<function<void()>> taskQueue; atomic<int> busyCount; bool bStop; void taskConsumer(); mutex taskQueueMutex; condition_variable taskQueueCond; condition_variable taskFinishedCond; }; void ThreadPool::taskConsumer() { while (!bStop) { unique_lock<mutex> lk(taskQueueMutex); taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); }); busyCount++; function<void()> task=taskQueue.front(); taskQueue.pop_front(); lk.unlock(); task(); busyCount--; taskFinishedCond.notify_one(); } } void ThreadPool::waitAllTask() { unique_lock<mutex> lk(taskQueueMutex); taskFinishedCond.wait(lk, [&] {return taskQueue.empty() && busyCount==0; });//所有任務均已完成 }
這樣我們只要調用waitAllTask就可以等待所有任務完成啦。
原文鏈接:https://blog.csdn.net/mrbone11/article/details/122153149
相關推薦
- 2022-09-10 Python實現自定義異常堆棧信息的示例代碼_python
- 2022-11-01 AndroidView與Compose框架交互實現介紹_Android
- 2022-02-18 TypeError: ‘Image‘ object does not support item as
- 2022-12-04 Jetpack?Compose慣性衰減動畫AnimateDecay詳解_Android
- 2021-11-22 C++?STL中五個常用算法使用教程及實例講解_C 語言
- 2022-08-22 詳解C#對Dictionary內容的通用操作_C#教程
- 2024-04-05 mybatis(mybatis-plus)報invalid bound statement (not
- 2022-10-13 Pygame?zero集合_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支