網站首頁 編程語言 正文
什么是線程
線程是進程中的?個執?單元,負責當前進程中程序的執?,?個進程中?少有?個線程。?個進程中是可以有多個線程的,這個應?程序也可以稱之為多線程程序。多線程程序作為一種多任務、并發的工作方式
并發與并?
早期計算機的 CPU 都是單核的,一個 CPU 在同一時間只能執行一個進程/線程,當系統中有多個進程/線程等待執行時,CPU 只能執行完一個再執行下一個。為了提高 CPU 利用率,減少等待時間,人們提出了一種 CPU 并發工作的理論.
并發:指兩個或多個事件在同?個時間段內發?,當系統中有多個進程/線程等待執行時,CPU只能執行完一個再執行下一個。
并?:指兩個或多個事件在同?時刻發?(同時發?),多核 CPU 的每個核心都可以獨立地執行一個任務,而且多個核心之間不會相互干擾。在不同核心上執行的多個任務,是真正地同時運行,這種狀態就叫做并行。。
什么是線程池
顧名思義:線程池就是線程的池子,有很多線程,但是數量不會超過池子的限制。需要用到多執行流進行任務出路的時候,就從池子中取出一個線程去處理,線程池就類似于一個實現了消費者業務的生產者與消費者模型。
本質上:這就是一個基于生產者消費者模型來實現的線程池,那么同樣遵守三種規則,生產者和生產者之間存在互斥,處理任務的線程之間存在互斥關系,生產者和消費者之間存在同步和互斥關系
線程池解決什么問題
線程池維護者多個線程,等待著分配可并發執行的任務,可以避免在短時間創建和銷毀大量線程帶來時間成本。
總結為三點:
1.避免線程因為不限制創建數量導致的資源耗盡風險
2.任務隊列緩沖任務,支持忙線不均的作用
3.節省了大量頻繁創建/銷毀線程的時間成本
怎么用線程池
下面展示一些 threadpool
實現,源碼來自openharmony。
/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef NETSTACK_THREAD_POOL #define NETSTACK_THREAD_POOL #include <atomic> #include <condition_variable> #include <queue> #include <thread> #include <vector> namespace OHOS::NetStack { template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool { public: /** * disallow default constructor */ ThreadPool() = delete; /** * disallow copy and move */ ThreadPool(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool &operator=(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool(ThreadPool &&) = delete; /** * disallow copy and move */ ThreadPool &operator=(ThreadPool &&) = delete; /** * make DEFAULT_THREAD_NUM threads * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated */ explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true) { for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) { std::thread([this] { RunTask(); }).detach(); } } /** * if ~ThreadPool, terminate all thread */ ~ThreadPool() { // set needRun_ = false, and notify all the thread to wake and terminate needRun_ = false; while (runningNum_ > 0) { needRunCondition_.notify_all(); } } /** * push it to taskQueue_ and notify a thread to run it * @param task new task to Execute */ void Push(const Task &task) { PushTask(task); if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) { std::thread([this] { RunTask(); }).detach(); } needRunCondition_.notify_all(); } private: bool IsQueueEmpty() { std::lock_guard<std::mutex> guard(mutex_); return taskQueue_.empty(); } bool GetTask(Task &task) { std::lock_guard<std::mutex> guard(mutex_); // if taskQueue_ is empty, means timeout if (taskQueue_.empty()) { return false; } // if run to this line, means that taskQueue_ is not empty task = taskQueue_.top(); taskQueue_.pop(); return true; } void PushTask(const Task &task) { std::lock_guard<std::mutex> guard(mutex_); taskQueue_.push(task); } class NumWrapper { public: NumWrapper() = delete; explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num) { ++num_; } ~NumWrapper() { --num_; } private: std::atomic<uint32_t> &num_; }; void Sleep() { std::mutex needRunMutex; std::unique_lock<std::mutex> lock(needRunMutex); /** * if the thread is waiting, it is idle * if wake up, this thread is not idle: * 1 this thread should return * 2 this thread should run task * 3 this thread should go to next loop */ NumWrapper idleWrapper(idleThreadNum_); (void)idleWrapper; needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_), [this] { return !needRun_ || !IsQueueEmpty(); }); } void RunTask() { NumWrapper runningWrapper(runningNum_); (void)runningWrapper; while (needRun_) { Task task; if (GetTask(task)) { task.Execute(); continue; } Sleep(); if (!needRun_) { return; } if (GetTask(task)) { task.Execute(); continue; } if (runningNum_ > DEFAULT_THREAD_NUM) { return; } } } private: /** * other thread put a task to the taskQueue_ */ std::mutex mutex_; std::priority_queue<Task> taskQueue_; /** * 1 terminate the thread if it is idle for timeout_ seconds * 2 wait for the thread started util timeout_ * 3 wait for the thread notified util timeout_ * 4 wait for the thread terminated util timeout_ */ uint32_t timeout_; /** * if idleThreadNum_ is zero, make a new thread */ std::atomic<uint32_t> idleThreadNum_; /** * when ThreadPool object is deleted, wait until runningNum_ is zero. */ std::atomic<uint32_t> runningNum_; /** * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated */ std::atomic_bool needRun_; std::condition_variable needRunCondition_; }; } // namespace OHOS::NetStack #endif /* NETSTACK_THREAD_POOL */
這份源碼的實現,沒有使用一些較難理解的語法,基本上就是使用線程+優先級隊列實現的。提前創建指定數目的線程,每次取一個任務并執行。任務隊列負責存放線程需要處理的任務,工作線程負責從任務隊列中取出和運行任務,可以看成是一個生產者和多個消費者的模型。
#include "doctest.h" DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN #include <stdexcept> DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END //#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN //#define DOCTEST_CONFIG_DISABLE #include <string> #include <iostream> #include "thread_pool.h" // // Created by Administrator on 2022/8/10. // class Task { public: Task() = default; explicit Task(std::string context){ mContext = context; } bool operator<(const Task &e) const{ return priority_ < e.priority_; } void Execute(){ std::lock_guard<std::mutex> guard(mutex_); std::cout << "task is execute,name is:"<<mContext<<std::endl; } public: uint32_t priority_; private: std::string mContext; static std::mutex mutex_; }; #define DEFAULT_THREAD_NUM 3 #define MAX_THREAD_NUM 6 #define TIME_OUT 500 std::mutex Task::mutex_; static int threadpoolTest(){ static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT); Task task1("name_1"); Task task2("name_2"); Task task3("name_3"); Task task4("name_4"); threadPool_.Push(task1); threadPool_.Push(task2); threadPool_.Push(task3); threadPool_.Push(task4); return 0; } TEST_CASE("threadPool simple use example, test by doctest unit tool") { threadpoolTest(); }
以上該版本thread_pool的簡單使用示例,可以看到使用稍微麻煩了些。必須定義格式如下的task類,必須實現operator<和Execute()方法,不過整體實現還是很不錯的,通俗易懂!
總結
線程池的應用場景:當有大量的數據請求,需要多執行流并發/并行處理時,可以采用線程池來處理任務,可避免大量線程頻繁創建或銷毀所帶來的時間成本,也可避免在峰值壓力下,系統資源耗盡的風險。
原文鏈接:https://blog.csdn.net/weixin_44834554/article/details/126668425
相關推薦
- 2023-10-15 el-input有時候添加不了有時候刪不了
- 2022-10-02 C#使用is、as關鍵字以及顯式強轉實現引用類型轉換_C#教程
- 2022-04-28 WPF依賴屬性用法詳解_實用技巧
- 2024-01-14 “xxx“ is not an enclosing class 解決辦法
- 2022-02-23 C#使用log4net記錄日志_C#教程
- 2022-04-06 .NET?Core使用CZGL.SystemInfo庫獲取主機運行資源_基礎應用
- 2023-06-03 React實現一個倒計時hook組件實戰示例_React
- 2023-12-10 記錄一次導出Excel報表的錯誤
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支