日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

C++中線程池ThreadPool源碼解析_C 語言

作者:喂喂喂–學編程 ? 更新時間: 2022-10-30 編程語言

什么是線程

線程是進程中的?個執?單元,負責當前進程中程序的執?,?個進程中?少有?個線程。?個進程中是可以有多個線程的,這個應?程序也可以稱之為多線程程序。多線程程序作為一種多任務、并發的工作方式

并發與并?

早期計算機的 CPU 都是單核的,一個 CPU 在同一時間只能執行一個進程/線程,當系統中有多個進程/線程等待執行時,CPU 只能執行完一個再執行下一個。為了提高 CPU 利用率,減少等待時間,人們提出了一種 CPU 并發工作的理論.

并發:指兩個或多個事件在同?個時間段內發?,當系統中有多個進程/線程等待執行時,CPU只能執行完一個再執行下一個。

并?:指兩個或多個事件在同?時刻發?(同時發?),多核 CPU 的每個核心都可以獨立地執行一個任務,而且多個核心之間不會相互干擾。在不同核心上執行的多個任務,是真正地同時運行,這種狀態就叫做并行。。

什么是線程池

顧名思義:線程池就是線程的池子,有很多線程,但是數量不會超過池子的限制。需要用到多執行流進行任務出路的時候,就從池子中取出一個線程去處理,線程池就類似于一個實現了消費者業務的生產者與消費者模型。

本質上:這就是一個基于生產者消費者模型來實現的線程池,那么同樣遵守三種規則,生產者和生產者之間存在互斥,處理任務的線程之間存在互斥關系,生產者和消費者之間存在同步和互斥關系

線程池解決什么問題

線程池維護者多個線程,等待著分配可并發執行的任務,可以避免在短時間創建和銷毀大量線程帶來時間成本。

總結為三點:

1.避免線程因為不限制創建數量導致的資源耗盡風險

2.任務隊列緩沖任務,支持忙線不均的作用

3.節省了大量頻繁創建/銷毀線程的時間成本

怎么用線程池

下面展示一些 threadpool實現,源碼來自openharmony。

/*
 * Copyright (c) 2022 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef NETSTACK_THREAD_POOL
#define NETSTACK_THREAD_POOL
#include <atomic>
#include <condition_variable>
#include <queue>
#include <thread>
#include <vector>
namespace OHOS::NetStack {
template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
public:
    /**
     * disallow default constructor
     */
    ThreadPool() = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(const ThreadPool &) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool(ThreadPool &&) = delete;
    /**
     * disallow copy and move
     */
    ThreadPool &operator=(ThreadPool &&) = delete;
    /**
     * make DEFAULT_THREAD_NUM threads
     * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
     */
    explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
    {
        for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
            std::thread([this] { RunTask(); }).detach();
        }
    }
    /**
     * if ~ThreadPool, terminate all thread
     */
    ~ThreadPool()
    {
        // set needRun_ = false, and notify all the thread to wake and terminate
        needRun_ = false;
        while (runningNum_ > 0) {
            needRunCondition_.notify_all();
        }
    }
    /**
     * push it to taskQueue_ and notify a thread to run it
     * @param task new task to Execute
     */
    void Push(const Task &task)
    {
        PushTask(task);
        if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
            std::thread([this] { RunTask(); }).detach();
        }
        needRunCondition_.notify_all();
    }
private:
    bool IsQueueEmpty()
    {
        std::lock_guard<std::mutex> guard(mutex_);
        return taskQueue_.empty();
    }
    bool GetTask(Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        // if taskQueue_ is empty, means timeout
        if (taskQueue_.empty()) {
            return false;
        }
        // if run to this line, means that taskQueue_ is not empty
        task = taskQueue_.top();
        taskQueue_.pop();
        return true;
    }
    void PushTask(const Task &task)
    {
        std::lock_guard<std::mutex> guard(mutex_);
        taskQueue_.push(task);
    }
    class NumWrapper {
    public:
        NumWrapper() = delete;
        explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
        {
            ++num_;
        }
        ~NumWrapper()
        {
            --num_;
        }
    private:
        std::atomic<uint32_t> &num_;
    };
    void Sleep()
    {
        std::mutex needRunMutex;
        std::unique_lock<std::mutex> lock(needRunMutex);
        /**
         * if the thread is waiting, it is idle
         * if wake up, this thread is not idle:
         *     1 this thread should return
         *     2 this thread should run task
         *     3 this thread should go to next loop
         */
        NumWrapper idleWrapper(idleThreadNum_);
        (void)idleWrapper;
        needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
                                   [this] { return !needRun_ || !IsQueueEmpty(); });
    }
    void RunTask()
    {
        NumWrapper runningWrapper(runningNum_);
        (void)runningWrapper;
        while (needRun_) {
            Task task;
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            Sleep();
            if (!needRun_) {
                return;
            }
            if (GetTask(task)) {
                task.Execute();
                continue;
            }
            if (runningNum_ > DEFAULT_THREAD_NUM) {
                return;
            }
        }
    }
private:
    /**
     * other thread put a task to the taskQueue_
     */
    std::mutex mutex_;
    std::priority_queue<Task> taskQueue_;
    /**
     * 1 terminate the thread if it is idle for timeout_ seconds
     * 2 wait for the thread started util timeout_
     * 3 wait for the thread notified util timeout_
     * 4 wait for the thread terminated util timeout_
     */
    uint32_t timeout_;
    /**
     * if idleThreadNum_ is zero, make a new thread
     */
    std::atomic<uint32_t> idleThreadNum_;
    /**
     * when ThreadPool object is deleted, wait until runningNum_ is zero.
     */
    std::atomic<uint32_t> runningNum_;
    /**
     * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
     */
    std::atomic_bool needRun_;
    std::condition_variable needRunCondition_;
};
} // namespace OHOS::NetStack
#endif /* NETSTACK_THREAD_POOL */

這份源碼的實現,沒有使用一些較難理解的語法,基本上就是使用線程+優先級隊列實現的。提前創建指定數目的線程,每次取一個任務并執行。任務隊列負責存放線程需要處理的任務,工作線程負責從任務隊列中取出和運行任務,可以看成是一個生產者和多個消費者的模型。

#include "doctest.h"
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
#include <stdexcept>
DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END
//#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
//#define DOCTEST_CONFIG_DISABLE
#include <string>
#include <iostream>
#include "thread_pool.h"
//
// Created by Administrator on 2022/8/10.
//
class Task {
public:
    Task() = default;
    explicit Task(std::string context){
        mContext = context;
    }
    bool operator<(const Task &e) const{
        return priority_ < e.priority_;
    }
    void Execute(){
        std::lock_guard<std::mutex> guard(mutex_);
        std::cout <<  "task is execute,name is:"<<mContext<<std::endl;
    }
public:
    uint32_t priority_;
private:
    std::string mContext;
    static std::mutex mutex_;
};
#define DEFAULT_THREAD_NUM 3
#define MAX_THREAD_NUM 6
#define TIME_OUT 500
std::mutex Task::mutex_;
static int threadpoolTest(){
    static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT);
    Task task1("name_1");
    Task task2("name_2");
    Task task3("name_3");
    Task task4("name_4");
    threadPool_.Push(task1);
    threadPool_.Push(task2);
    threadPool_.Push(task3);
    threadPool_.Push(task4);
    return 0;
}
TEST_CASE("threadPool simple use example, test by doctest unit tool") {
    threadpoolTest();
}

以上該版本thread_pool的簡單使用示例,可以看到使用稍微麻煩了些。必須定義格式如下的task類,必須實現operator<和Execute()方法,不過整體實現還是很不錯的,通俗易懂!

總結

線程池的應用場景:當有大量的數據請求,需要多執行流并發/并行處理時,可以采用線程池來處理任務,可避免大量線程頻繁創建或銷毀所帶來的時間成本,也可避免在峰值壓力下,系統資源耗盡的風險。

原文鏈接:https://blog.csdn.net/weixin_44834554/article/details/126668425

欄目分類
最近更新