網(wǎng)站首頁 編程語言 正文
python線程池threadpool
今天在學(xué)習(xí)python進(jìn)程與線程時(shí),無意間發(fā)現(xiàn)了線程池threadpool模塊
模塊使用非常簡單,前提是得需要熟悉線程池的工作原理。
我們知道系統(tǒng)處理任務(wù)時(shí),需要為每個(gè)請求創(chuàng)建和銷毀對象。當(dāng)有大量并發(fā)任務(wù)需要處理時(shí),再使用傳統(tǒng)的多線程就會(huì)造成大量的資源創(chuàng)建銷毀導(dǎo)致服務(wù)器效率的下降。
這時(shí)候,線程池就派上用場了。線程池技術(shù)為線程創(chuàng)建、銷毀的開銷問題和系統(tǒng)資源不足問題提供了很好的解決方案。
優(yōu)點(diǎn)
(1)可以控制產(chǎn)生線程的數(shù)量。通過預(yù)先創(chuàng)建一定數(shù)量的工作線程并限制其數(shù)量,控制線程對象的內(nèi)存消耗。
(2)降低系統(tǒng)開銷和資源消耗。通過對多個(gè)請求重用線程,線程創(chuàng)建、銷毀的開銷被分?jǐn)偟搅硕鄠€(gè)請求上。另外通過限制線程數(shù)量,降低虛擬機(jī)在垃圾回收方面的開銷。
(3)提高系統(tǒng)響應(yīng)速度。線程事先已被創(chuàng)建,請求到達(dá)時(shí)可直接進(jìn)行處理,消除了因線程創(chuàng)建所帶來的延遲,另外多個(gè)線程可并發(fā)處理。
線程池的基本實(shí)現(xiàn)方法
(1)線程池管理器。創(chuàng)建并維護(hù)線程池,根據(jù)需要調(diào)整池的大小,并監(jiān)控線程泄漏現(xiàn)象。
(2)工作線程。它是一個(gè)可以循環(huán)執(zhí)行任務(wù)的線程,沒有任務(wù)時(shí)處于 Wait 狀態(tài),新任務(wù)到達(dá)時(shí)可被喚醒。
(3)任務(wù)隊(duì)列。它提供一種緩沖機(jī)制,用以臨時(shí)存放待處理的任務(wù),同時(shí)作為并發(fā)線程的 monitor 對象。
(4)任務(wù)接口。它是每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,工作線程通過該接口調(diào)度任務(wù)的執(zhí)行。
構(gòu)建線程池管理器時(shí),首先初始化任務(wù)隊(duì)列(Queue),運(yùn)行時(shí)通過調(diào)用添加任務(wù)的方法將任務(wù)添加到任務(wù)隊(duì)列中。
之后創(chuàng)建并啟動(dòng)一定數(shù)量的工作線程,將這些線程保存在線程隊(duì)列中。線程池管理器在運(yùn)行時(shí)可根據(jù)需要增加或減少工作線程數(shù)量。
工作線程運(yùn)行時(shí)首先鎖定任務(wù)隊(duì)列,以保證多線程對任務(wù)隊(duì)列的正確并發(fā)訪問,如隊(duì)列中有待處理的任務(wù),工作線程取走一個(gè)任務(wù)并釋放對任務(wù)隊(duì)列的鎖定,以便其他線程實(shí)現(xiàn)對任務(wù)隊(duì)列的訪問和處理。
在獲取任務(wù)之后工作線程調(diào)用任務(wù)接口完成對任務(wù)的處理。當(dāng)任務(wù)隊(duì)列為空時(shí),工作線程加入到任務(wù)隊(duì)列的等待線程列表中,此時(shí)工作線程處于 Wait 狀態(tài),幾乎不占 CPU 資源。
一旦新的任務(wù)到達(dá),通過調(diào)用任務(wù)列表對象的notify方法,從等待線程列表中喚醒一個(gè)工作線程以對任務(wù)進(jìn)行處理。
通過這種協(xié)作模式,既節(jié)省了線程創(chuàng)建、銷毀的開銷,又保證了對任務(wù)的并發(fā)處理,提高了系統(tǒng)的響應(yīng)速度。
簡而言之:
就是把并發(fā)執(zhí)行的任務(wù)傳遞給一個(gè)線程池,來替代為每個(gè)并發(fā)執(zhí)行的任務(wù)都啟動(dòng)一個(gè)新的線程。只要池里有空閑的線程,任務(wù)就會(huì)分配給一個(gè)線程執(zhí)行。
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable,list_of_args,callback)
[pool.putRequest(req) for req in requests]
pool.wait()
- 第一行的意思是創(chuàng)建一個(gè)可存放poolsize個(gè)數(shù)目的線程的線程池。
- 第二行的意思是調(diào)用makeRequests創(chuàng)建請求。 some_callable是需要開啟多線程處理的函數(shù),list_of_args是函數(shù)參數(shù),callback是可選參數(shù)回調(diào),默認(rèn)是無。
- 第三行的意思是把運(yùn)行多線程的函數(shù)放入線程池中。
- 最后一行的意思是等待所有的線程完成工作后退出。
通過分析源代碼,其實(shí)發(fā)現(xiàn)里面的內(nèi)容很簡單。
import sys
import threading
import Queue
import traceback
# exceptions
class NoResultsPending(Exception):
? ? """All work requests have been processed."""
? ? pass
class NoWorkersAvailable(Exception):
? ? """No worker threads available to process remaining requests."""
? ? pass
# internal module helper functions
def _handle_thread_exception(request, exc_info):
? ? """Default exception handler callback function.
? ? This just prints the exception info via ``traceback.print_exception``.
? ? """
? ? traceback.print_exception(*exc_info)
# utility functions
def makeRequests(callable_, args_list, callback=None, ?#用來創(chuàng)建多個(gè)任務(wù)請求 callback是回調(diào)函數(shù)處理結(jié)果,exc_callback是用來處理發(fā)生的異常
? ? ? ? exc_callback=_handle_thread_exception):
? ? """Create several work requests for same callable with different arguments.
? ? Convenience function for creating several work requests for the same
? ? callable where each invocation of the callable receives different values
? ? for its arguments.
? ? ``args_list`` contains the parameters for each invocation of callable.
? ? Each item in ``args_list`` should be either a 2-item tuple of the list of
? ? positional arguments and a dictionary of keyword arguments or a single,
? ? non-tuple argument.
? ? See docstring for ``WorkRequest`` for info on ``callback`` and
? ? ``exc_callback``.
? ? """
? ? requests = []
? ? for item in args_list:
? ? ? ? if isinstance(item, tuple):
? ? ? ? ? ? requests.append(
? ? ? ? ? ? ? ? WorkRequest(callable_, item[0], item[1], callback=callback,
? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback)
? ? ? ? ? ? )
? ? ? ? else:
? ? ? ? ? ? requests.append(
? ? ? ? ? ? ? ? WorkRequest(callable_, [item], None, callback=callback,
? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback)
? ? ? ? ? ? )
? ? return requests
# classes
class WorkerThread(threading.Thread): ? ? #工作線程
? ? """Background thread connected to the requests/results queues.
? ? A worker thread sits in the background and picks up work requests from
? ? one queue and puts the results in another until it is dismissed.
? ? """
? ? def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
? ? ? ? """Set up thread in daemonic mode and start it immediatedly.
? ? ? ? ``requests_queue`` and ``results_queue`` are instances of
? ? ? ? ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
? ? ? ? worker thread.
? ? ? ? """
? ? ? ? threading.Thread.__init__(self, **kwds)
? ? ? ? self.setDaemon(1)
? ? ? ? self._requests_queue = requests_queue ? ? #任務(wù)隊(duì)列
? ? ? ? self._results_queue = results_queue ? ? ? #結(jié)果隊(duì)列
? ? ? ? self._poll_timeout = poll_timeout
? ? ? ? self._dismissed = threading.Event()
? ? ? ? self.start()
? ? def run(self):
? ? ? ? """Repeatedly process the job queue until told to exit."""
? ? ? ? while True:
? ? ? ? ? ? if self._dismissed.isSet(): ?#如果標(biāo)識(shí)位設(shè)為True,則表示線程非阻塞
? ? ? ? ? ? ? ? # we are dismissed, break out of loop
? ? ? ? ? ? ? ? break
? ? ? ? ? ? # get next work request. If we don't get a new request from the
? ? ? ? ? ? # queue after self._poll_timout seconds, we jump to the start of
? ? ? ? ? ? # the while loop again, to give the thread a chance to exit.
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? request = self._requests_queue.get(True, self._poll_timeout)#獲取待處理任務(wù),block設(shè)為True,標(biāo)識(shí)線程同步 ,并設(shè)置超時(shí)時(shí)間
? ? ? ? ? ? except Queue.Empty:
? ? ? ? ? ? ? ? continue
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? if self._dismissed.isSet():再次判斷,因?yàn)樵谌∪蝿?wù)期間,線程有可能被掛起
? ? ? ? ? ? ? ? ? ? # we are dismissed, put back request in queue and exit loop
? ? ? ? ? ? ? ? ? ? self._requests_queue.put(request) #添加任務(wù)到任務(wù)隊(duì)列
? ? ? ? ? ? ? ? ? ? break
? ? ? ? ? ? ? ? try:
? ? ? ? ? ? ? ? ? ? result = request.callable(*request.args, **request.kwds)
? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, result))
? ? ? ? ? ? ? ? except:
? ? ? ? ? ? ? ? ? ? request.exception = True
? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, sys.exc_info()))
? ? def dismiss(self):
? ? ? ? """Sets a flag to tell the thread to exit when done with current job."""
? ? ? ? self._dismissed.set()
class WorkRequest: ? ? ?#創(chuàng)建單個(gè)任務(wù)請求
? ? """A request to execute a callable for putting in the request queue later.
? ? See the module function ``makeRequests`` for the common case
? ? where you want to build several ``WorkRequest`` objects for the same
? ? callable but with different arguments for each call.
? ? """
? ? def __init__(self, callable_, args=None, kwds=None, requestID=None,
? ? ? ? ? ? callback=None, exc_callback=_handle_thread_exception):
? ? ? ? """Create a work request for a callable and attach callbacks.
? ? ? ? A work request consists of the a callable to be executed by a
? ? ? ? worker thread, a list of positional arguments, a dictionary
? ? ? ? of keyword arguments.
? ? ? ? A ``callback`` function can be specified, that is called when the
? ? ? ? results of the request are picked up from the result queue. It must
? ? ? ? accept two anonymous arguments, the ``WorkRequest`` object and the
? ? ? ? results of the callable, in that order. If you want to pass additional
? ? ? ? information to the callback, just stick it on the request object.
? ? ? ? You can also give custom callback for when an exception occurs with
? ? ? ? the ``exc_callback`` keyword parameter. It should also accept two
? ? ? ? anonymous arguments, the ``WorkRequest`` and a tuple with the exception
? ? ? ? details as returned by ``sys.exc_info()``. The default implementation
? ? ? ? of this callback just prints the exception info via
? ? ? ? ``traceback.print_exception``. If you want no exception handler
? ? ? ? callback, just pass in ``None``.
? ? ? ? ``requestID``, if given, must be hashable since it is used by
? ? ? ? ``ThreadPool`` object to store the results of that work request in a
? ? ? ? dictionary. It defaults to the return value of ``id(self)``.
? ? ? ? """
? ? ? ? if requestID is None:
? ? ? ? ? ? self.requestID = id(self) #id返回對象的內(nèi)存地址
? ? ? ? else:
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? self.requestID = hash(requestID) #哈希處理
? ? ? ? ? ? except TypeError:
? ? ? ? ? ? ? ? raise TypeError("requestID must be hashable.")
? ? ? ? self.exception = False
? ? ? ? self.callback = callback
? ? ? ? self.exc_callback = exc_callback
? ? ? ? self.callable = callable_
? ? ? ? self.args = args or []
? ? ? ? self.kwds = kwds or {}
? ? def __str__(self):
? ? ? ? return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
? ? ? ? ? ? (self.requestID, self.args, self.kwds, self.exception)
class ThreadPool: ?#線程池管理器
? ? """A thread pool, distributing work requests and collecting results.
? ? See the module docstring for more information.
? ? """
? ? def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
? ? ? ? """Set up the thread pool and start num_workers worker threads.
? ? ? ? ``num_workers`` is the number of worker threads to start initially.
? ? ? ? If ``q_size > 0`` the size of the work *request queue* is limited and
? ? ? ? the thread pool blocks when the queue is full and it tries to put
? ? ? ? more work requests in it (see ``putRequest`` method), unless you also
? ? ? ? use a positive ``timeout`` value for ``putRequest``.
? ? ? ? If ``resq_size > 0`` the size of the *results queue* is limited and the
? ? ? ? worker threads will block when the queue is full and they try to put
? ? ? ? new results in it.
? ? ? ? .. warning:
? ? ? ? ? ? If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
? ? ? ? ? ? the possibilty of a deadlock, when the results queue is not pulled
? ? ? ? ? ? regularly and too many jobs are put in the work requests queue.
? ? ? ? ? ? To prevent this, always set ``timeout > 0`` when calling
? ? ? ? ? ? ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
? ? ? ? """
? ? ? ? self._requests_queue = Queue.Queue(q_size) ?#任務(wù)隊(duì)列
? ? ? ? self._results_queue = Queue.Queue(resq_size) #結(jié)果隊(duì)列
? ? ? ? self.workers = [] ?#工作線程
? ? ? ? self.dismissedWorkers = [] #睡眠線程
? ? ? ? self.workRequests = {} ?#一個(gè)字典 鍵是id 值是request
? ? ? ? self.createWorkers(num_workers, poll_timeout)
? ? def createWorkers(self, num_workers, poll_timeout=5):
? ? ? ? """Add num_workers worker threads to the pool.
? ? ? ? ``poll_timout`` sets the interval in seconds (int or float) for how
? ? ? ? ofte threads should check whether they are dismissed, while waiting for
? ? ? ? requests.
? ? ? ? """
? ? ? ? for i in range(num_workers):
? ? ? ? ? ? self.workers.append(WorkerThread(self._requests_queue,
? ? ? ? ? ? ? ? self._results_queue, poll_timeout=poll_timeout))
? ? def dismissWorkers(self, num_workers, do_join=False):
? ? ? ? """Tell num_workers worker threads to quit after their current task."""
? ? ? ? dismiss_list = []
? ? ? ? for i in range(min(num_workers, len(self.workers))):
? ? ? ? ? ? worker = self.workers.pop()
? ? ? ? ? ? worker.dismiss()
? ? ? ? ? ? dismiss_list.append(worker)
? ? ? ? if do_join:
? ? ? ? ? ? for worker in dismiss_list:
? ? ? ? ? ? ? ? worker.join()
? ? ? ? else:
? ? ? ? ? ? self.dismissedWorkers.extend(dismiss_list)
? ? def joinAllDismissedWorkers(self):
? ? ? ? """Perform Thread.join() on all worker threads that have been dismissed.
? ? ? ? """
? ? ? ? for worker in self.dismissedWorkers:
? ? ? ? ? ? worker.join()
? ? ? ? self.dismissedWorkers = []
? ? def putRequest(self, request, block=True, timeout=None):
? ? ? ? """Put work request into work queue and save its id for later."""
? ? ? ? assert isinstance(request, WorkRequest)
? ? ? ? # don't reuse old work requests
? ? ? ? assert not getattr(request, 'exception', None)
? ? ? ? self._requests_queue.put(request, block, timeout)
? ? ? ? self.workRequests[request.requestID] = request ?#確立一對一對應(yīng)關(guān)系 一個(gè)id對應(yīng)一個(gè)request
? ? def poll(self, block=False):#處理任務(wù),
? ? ? ? """Process any new results in the queue."""
? ? ? ? while True:
? ? ? ? ? ? # still results pending?
? ? ? ? ? ? if not self.workRequests: #沒有任務(wù)
? ? ? ? ? ? ? ? raise NoResultsPending
? ? ? ? ? ? # are there still workers to process remaining requests?
? ? ? ? ? ? elif block and not self.workers:#無工作線程
? ? ? ? ? ? ? ? raise NoWorkersAvailable
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? # get back next results
? ? ? ? ? ? ? ? request, result = self._results_queue.get(block=block)
? ? ? ? ? ? ? ? # has an exception occured?
? ? ? ? ? ? ? ? if request.exception and request.exc_callback:
? ? ? ? ? ? ? ? ? ? request.exc_callback(request, result)
? ? ? ? ? ? ? ? # hand results to callback, if any
? ? ? ? ? ? ? ? if request.callback and not \
? ? ? ? ? ? ? ? ? ? ? ?(request.exception and request.exc_callback):
? ? ? ? ? ? ? ? ? ? request.callback(request, result)
? ? ? ? ? ? ? ? del self.workRequests[request.requestID]
? ? ? ? ? ? except Queue.Empty:
? ? ? ? ? ? ? ? break
? ? def wait(self):
? ? ? ? """Wait for results, blocking until all have arrived."""
? ? ? ? while 1:
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? self.poll(True)
? ? ? ? ? ? except NoResultsPending:
? ? ? ? ? ? ? ? break
有三個(gè)類 ThreadPool,workRequest,workThread
第一步我們需要建立一個(gè)線程池調(diào)度ThreadPool實(shí)例(根據(jù)參數(shù)而產(chǎn)生多個(gè)線程works),然后再通過makeRequests創(chuàng)建具有多個(gè)不同參數(shù)的任務(wù)請求workRequest,然后把任務(wù)請求用putRequest放入線程池中的任務(wù)隊(duì)列中,此時(shí)線程workThread就會(huì)得到任務(wù)callable,然后進(jìn)行處理后得到結(jié)果,存入結(jié)果隊(duì)列。如果存在callback就對結(jié)果調(diào)用函數(shù)。
注意:結(jié)果隊(duì)列中的元素是元組(request,result)這樣就一一對應(yīng)了。
python線程池使用樣例
import os
import threading
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
threadPool = ThreadPoolExecutor(max_workers=5,thread_name_prefix="test_")
def test(v1,v2):
print(threading.current_thread().name,v1,v2)
time.sleep(2)
if __name__=='__main__':
for i in range(0,10):
threadPool.submit(test,i,i+1)
threadPool.shutdown(wait=True)
原文鏈接:https://blog.csdn.net/biggbang/article/details/122547980
相關(guān)推薦
- 2023-01-30 PyQt中使用QProcess運(yùn)行一個(gè)進(jìn)程的示例代碼_python
- 2022-06-16 C#實(shí)現(xiàn)二叉查找樹_C#教程
- 2023-08-01 el-date-picker組件中渲染多余文字
- 2024-03-14 AOP切面編程,以及自定義注解實(shí)現(xiàn)切面
- 2022-02-24 antv/g2圖表tooltip自定義并展示坐標(biāo)之外的數(shù)據(jù)
- 2022-04-21 詳解Golang?Map中的key為什么是無序的_Golang
- 2022-04-18 Taro編譯小程序的時(shí)候,就沒有錄音權(quán)限,沒有scope.record這個(gè)權(quán)限
- 2022-11-11 python?使用第三方庫requests-toolbelt?上傳文件流的示例_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支