日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無先后,達(dá)者為師

網(wǎng)站首頁 編程語言 正文

python中的線程池threadpool_python

作者:虛壞叔叔 ? 更新時(shí)間: 2022-12-12 編程語言

python線程池threadpool

今天在學(xué)習(xí)python進(jìn)程與線程時(shí),無意間發(fā)現(xiàn)了線程池threadpool模塊

模塊使用非常簡單,前提是得需要熟悉線程池的工作原理。

我們知道系統(tǒng)處理任務(wù)時(shí),需要為每個(gè)請求創(chuàng)建和銷毀對象。當(dāng)有大量并發(fā)任務(wù)需要處理時(shí),再使用傳統(tǒng)的多線程就會(huì)造成大量的資源創(chuàng)建銷毀導(dǎo)致服務(wù)器效率的下降。

這時(shí)候,線程池就派上用場了。線程池技術(shù)為線程創(chuàng)建、銷毀的開銷問題和系統(tǒng)資源不足問題提供了很好的解決方案。

優(yōu)點(diǎn)

(1)可以控制產(chǎn)生線程的數(shù)量。通過預(yù)先創(chuàng)建一定數(shù)量的工作線程并限制其數(shù)量,控制線程對象的內(nèi)存消耗。

(2)降低系統(tǒng)開銷和資源消耗。通過對多個(gè)請求重用線程,線程創(chuàng)建、銷毀的開銷被分?jǐn)偟搅硕鄠€(gè)請求上。另外通過限制線程數(shù)量,降低虛擬機(jī)在垃圾回收方面的開銷。

(3)提高系統(tǒng)響應(yīng)速度。線程事先已被創(chuàng)建,請求到達(dá)時(shí)可直接進(jìn)行處理,消除了因線程創(chuàng)建所帶來的延遲,另外多個(gè)線程可并發(fā)處理。

線程池的基本實(shí)現(xiàn)方法

(1)線程池管理器。創(chuàng)建并維護(hù)線程池,根據(jù)需要調(diào)整池的大小,并監(jiān)控線程泄漏現(xiàn)象。

(2)工作線程。它是一個(gè)可以循環(huán)執(zhí)行任務(wù)的線程,沒有任務(wù)時(shí)處于 Wait 狀態(tài),新任務(wù)到達(dá)時(shí)可被喚醒。

(3)任務(wù)隊(duì)列。它提供一種緩沖機(jī)制,用以臨時(shí)存放待處理的任務(wù),同時(shí)作為并發(fā)線程的 monitor 對象。

(4)任務(wù)接口。它是每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,工作線程通過該接口調(diào)度任務(wù)的執(zhí)行。

構(gòu)建線程池管理器時(shí),首先初始化任務(wù)隊(duì)列(Queue),運(yùn)行時(shí)通過調(diào)用添加任務(wù)的方法將任務(wù)添加到任務(wù)隊(duì)列中。

之后創(chuàng)建并啟動(dòng)一定數(shù)量的工作線程,將這些線程保存在線程隊(duì)列中。線程池管理器在運(yùn)行時(shí)可根據(jù)需要增加或減少工作線程數(shù)量。

工作線程運(yùn)行時(shí)首先鎖定任務(wù)隊(duì)列,以保證多線程對任務(wù)隊(duì)列的正確并發(fā)訪問,如隊(duì)列中有待處理的任務(wù),工作線程取走一個(gè)任務(wù)并釋放對任務(wù)隊(duì)列的鎖定,以便其他線程實(shí)現(xiàn)對任務(wù)隊(duì)列的訪問和處理。

在獲取任務(wù)之后工作線程調(diào)用任務(wù)接口完成對任務(wù)的處理。當(dāng)任務(wù)隊(duì)列為空時(shí),工作線程加入到任務(wù)隊(duì)列的等待線程列表中,此時(shí)工作線程處于 Wait 狀態(tài),幾乎不占 CPU 資源。

一旦新的任務(wù)到達(dá),通過調(diào)用任務(wù)列表對象的notify方法,從等待線程列表中喚醒一個(gè)工作線程以對任務(wù)進(jìn)行處理。

通過這種協(xié)作模式,既節(jié)省了線程創(chuàng)建、銷毀的開銷,又保證了對任務(wù)的并發(fā)處理,提高了系統(tǒng)的響應(yīng)速度。

簡而言之:

就是把并發(fā)執(zhí)行的任務(wù)傳遞給一個(gè)線程池,來替代為每個(gè)并發(fā)執(zhí)行的任務(wù)都啟動(dòng)一個(gè)新的線程。只要池里有空閑的線程,任務(wù)就會(huì)分配給一個(gè)線程執(zhí)行。

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable,list_of_args,callback)
[pool.putRequest(req) for req in requests]
pool.wait()
  • 第一行的意思是創(chuàng)建一個(gè)可存放poolsize個(gè)數(shù)目的線程的線程池。
  • 第二行的意思是調(diào)用makeRequests創(chuàng)建請求。 some_callable是需要開啟多線程處理的函數(shù),list_of_args是函數(shù)參數(shù),callback是可選參數(shù)回調(diào),默認(rèn)是無。
  • 第三行的意思是把運(yùn)行多線程的函數(shù)放入線程池中。
  • 最后一行的意思是等待所有的線程完成工作后退出。

通過分析源代碼,其實(shí)發(fā)現(xiàn)里面的內(nèi)容很簡單。

import sys
import threading
import Queue
import traceback

# exceptions
class NoResultsPending(Exception):
? ? """All work requests have been processed."""
? ? pass

class NoWorkersAvailable(Exception):
? ? """No worker threads available to process remaining requests."""
? ? pass


# internal module helper functions
def _handle_thread_exception(request, exc_info):
? ? """Default exception handler callback function.

? ? This just prints the exception info via ``traceback.print_exception``.

? ? """
? ? traceback.print_exception(*exc_info)


# utility functions
def makeRequests(callable_, args_list, callback=None, ?#用來創(chuàng)建多個(gè)任務(wù)請求 callback是回調(diào)函數(shù)處理結(jié)果,exc_callback是用來處理發(fā)生的異常
? ? ? ? exc_callback=_handle_thread_exception):
? ? """Create several work requests for same callable with different arguments.

? ? Convenience function for creating several work requests for the same
? ? callable where each invocation of the callable receives different values
? ? for its arguments.

? ? ``args_list`` contains the parameters for each invocation of callable.
? ? Each item in ``args_list`` should be either a 2-item tuple of the list of
? ? positional arguments and a dictionary of keyword arguments or a single,
? ? non-tuple argument.

? ? See docstring for ``WorkRequest`` for info on ``callback`` and
? ? ``exc_callback``.

? ? """
? ? requests = []
? ? for item in args_list:
? ? ? ? if isinstance(item, tuple):
? ? ? ? ? ? requests.append(
? ? ? ? ? ? ? ? WorkRequest(callable_, item[0], item[1], callback=callback,
? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback)
? ? ? ? ? ? )
? ? ? ? else:
? ? ? ? ? ? requests.append(
? ? ? ? ? ? ? ? WorkRequest(callable_, [item], None, callback=callback,
? ? ? ? ? ? ? ? ? ? exc_callback=exc_callback)
? ? ? ? ? ? )
? ? return requests


# classes
class WorkerThread(threading.Thread): ? ? #工作線程
? ? """Background thread connected to the requests/results queues.

? ? A worker thread sits in the background and picks up work requests from
? ? one queue and puts the results in another until it is dismissed.

? ? """

? ? def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
? ? ? ? """Set up thread in daemonic mode and start it immediatedly.

? ? ? ? ``requests_queue`` and ``results_queue`` are instances of
? ? ? ? ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
? ? ? ? worker thread.

? ? ? ? """
? ? ? ? threading.Thread.__init__(self, **kwds)
? ? ? ? self.setDaemon(1)
? ? ? ? self._requests_queue = requests_queue ? ? #任務(wù)隊(duì)列
? ? ? ? self._results_queue = results_queue ? ? ? #結(jié)果隊(duì)列
? ? ? ? self._poll_timeout = poll_timeout
? ? ? ? self._dismissed = threading.Event()
? ? ? ? self.start()

? ? def run(self):
? ? ? ? """Repeatedly process the job queue until told to exit."""
? ? ? ? while True:
? ? ? ? ? ? if self._dismissed.isSet(): ?#如果標(biāo)識(shí)位設(shè)為True,則表示線程非阻塞
? ? ? ? ? ? ? ? # we are dismissed, break out of loop
? ? ? ? ? ? ? ? break
? ? ? ? ? ? # get next work request. If we don't get a new request from the
? ? ? ? ? ? # queue after self._poll_timout seconds, we jump to the start of
? ? ? ? ? ? # the while loop again, to give the thread a chance to exit.
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? request = self._requests_queue.get(True, self._poll_timeout)#獲取待處理任務(wù),block設(shè)為True,標(biāo)識(shí)線程同步 ,并設(shè)置超時(shí)時(shí)間
? ? ? ? ? ? except Queue.Empty:
? ? ? ? ? ? ? ? continue
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? if self._dismissed.isSet():再次判斷,因?yàn)樵谌∪蝿?wù)期間,線程有可能被掛起
? ? ? ? ? ? ? ? ? ? # we are dismissed, put back request in queue and exit loop
? ? ? ? ? ? ? ? ? ? self._requests_queue.put(request) #添加任務(wù)到任務(wù)隊(duì)列
? ? ? ? ? ? ? ? ? ? break
? ? ? ? ? ? ? ? try:
? ? ? ? ? ? ? ? ? ? result = request.callable(*request.args, **request.kwds)
? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, result))
? ? ? ? ? ? ? ? except:
? ? ? ? ? ? ? ? ? ? request.exception = True
? ? ? ? ? ? ? ? ? ? self._results_queue.put((request, sys.exc_info()))

? ? def dismiss(self):
? ? ? ? """Sets a flag to tell the thread to exit when done with current job."""
? ? ? ? self._dismissed.set()


class WorkRequest: ? ? ?#創(chuàng)建單個(gè)任務(wù)請求
? ? """A request to execute a callable for putting in the request queue later.

? ? See the module function ``makeRequests`` for the common case
? ? where you want to build several ``WorkRequest`` objects for the same
? ? callable but with different arguments for each call.

? ? """

? ? def __init__(self, callable_, args=None, kwds=None, requestID=None,
? ? ? ? ? ? callback=None, exc_callback=_handle_thread_exception):
? ? ? ? """Create a work request for a callable and attach callbacks.

? ? ? ? A work request consists of the a callable to be executed by a
? ? ? ? worker thread, a list of positional arguments, a dictionary
? ? ? ? of keyword arguments.

? ? ? ? A ``callback`` function can be specified, that is called when the
? ? ? ? results of the request are picked up from the result queue. It must
? ? ? ? accept two anonymous arguments, the ``WorkRequest`` object and the
? ? ? ? results of the callable, in that order. If you want to pass additional
? ? ? ? information to the callback, just stick it on the request object.

? ? ? ? You can also give custom callback for when an exception occurs with
? ? ? ? the ``exc_callback`` keyword parameter. It should also accept two
? ? ? ? anonymous arguments, the ``WorkRequest`` and a tuple with the exception
? ? ? ? details as returned by ``sys.exc_info()``. The default implementation
? ? ? ? of this callback just prints the exception info via
? ? ? ? ``traceback.print_exception``. If you want no exception handler
? ? ? ? callback, just pass in ``None``.

? ? ? ? ``requestID``, if given, must be hashable since it is used by
? ? ? ? ``ThreadPool`` object to store the results of that work request in a
? ? ? ? dictionary. It defaults to the return value of ``id(self)``.

? ? ? ? """
? ? ? ? if requestID is None:
? ? ? ? ? ? self.requestID = id(self) #id返回對象的內(nèi)存地址
? ? ? ? else:
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? self.requestID = hash(requestID) #哈希處理
? ? ? ? ? ? except TypeError:
? ? ? ? ? ? ? ? raise TypeError("requestID must be hashable.")
? ? ? ? self.exception = False
? ? ? ? self.callback = callback
? ? ? ? self.exc_callback = exc_callback
? ? ? ? self.callable = callable_
? ? ? ? self.args = args or []
? ? ? ? self.kwds = kwds or {}

? ? def __str__(self):
? ? ? ? return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
? ? ? ? ? ? (self.requestID, self.args, self.kwds, self.exception)

class ThreadPool: ?#線程池管理器
? ? """A thread pool, distributing work requests and collecting results.

? ? See the module docstring for more information.

? ? """

? ? def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
? ? ? ? """Set up the thread pool and start num_workers worker threads.

? ? ? ? ``num_workers`` is the number of worker threads to start initially.

? ? ? ? If ``q_size > 0`` the size of the work *request queue* is limited and
? ? ? ? the thread pool blocks when the queue is full and it tries to put
? ? ? ? more work requests in it (see ``putRequest`` method), unless you also
? ? ? ? use a positive ``timeout`` value for ``putRequest``.

? ? ? ? If ``resq_size > 0`` the size of the *results queue* is limited and the
? ? ? ? worker threads will block when the queue is full and they try to put
? ? ? ? new results in it.

? ? ? ? .. warning:
? ? ? ? ? ? If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
? ? ? ? ? ? the possibilty of a deadlock, when the results queue is not pulled
? ? ? ? ? ? regularly and too many jobs are put in the work requests queue.
? ? ? ? ? ? To prevent this, always set ``timeout > 0`` when calling
? ? ? ? ? ? ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.

? ? ? ? """
? ? ? ? self._requests_queue = Queue.Queue(q_size) ?#任務(wù)隊(duì)列
? ? ? ? self._results_queue = Queue.Queue(resq_size) #結(jié)果隊(duì)列
? ? ? ? self.workers = [] ?#工作線程
? ? ? ? self.dismissedWorkers = [] #睡眠線程
? ? ? ? self.workRequests = {} ?#一個(gè)字典 鍵是id 值是request
? ? ? ? self.createWorkers(num_workers, poll_timeout)

? ? def createWorkers(self, num_workers, poll_timeout=5):
? ? ? ? """Add num_workers worker threads to the pool.

? ? ? ? ``poll_timout`` sets the interval in seconds (int or float) for how
? ? ? ? ofte threads should check whether they are dismissed, while waiting for
? ? ? ? requests.

? ? ? ? """
? ? ? ? for i in range(num_workers):
? ? ? ? ? ? self.workers.append(WorkerThread(self._requests_queue,
? ? ? ? ? ? ? ? self._results_queue, poll_timeout=poll_timeout))

? ? def dismissWorkers(self, num_workers, do_join=False):
? ? ? ? """Tell num_workers worker threads to quit after their current task."""
? ? ? ? dismiss_list = []
? ? ? ? for i in range(min(num_workers, len(self.workers))):
? ? ? ? ? ? worker = self.workers.pop()
? ? ? ? ? ? worker.dismiss()
? ? ? ? ? ? dismiss_list.append(worker)

? ? ? ? if do_join:
? ? ? ? ? ? for worker in dismiss_list:
? ? ? ? ? ? ? ? worker.join()
? ? ? ? else:
? ? ? ? ? ? self.dismissedWorkers.extend(dismiss_list)

? ? def joinAllDismissedWorkers(self):
? ? ? ? """Perform Thread.join() on all worker threads that have been dismissed.
? ? ? ? """
? ? ? ? for worker in self.dismissedWorkers:
? ? ? ? ? ? worker.join()
? ? ? ? self.dismissedWorkers = []

? ? def putRequest(self, request, block=True, timeout=None):
? ? ? ? """Put work request into work queue and save its id for later."""
? ? ? ? assert isinstance(request, WorkRequest)
? ? ? ? # don't reuse old work requests
? ? ? ? assert not getattr(request, 'exception', None)
? ? ? ? self._requests_queue.put(request, block, timeout)
? ? ? ? self.workRequests[request.requestID] = request ?#確立一對一對應(yīng)關(guān)系 一個(gè)id對應(yīng)一個(gè)request

? ? def poll(self, block=False):#處理任務(wù),
? ? ? ? """Process any new results in the queue."""
? ? ? ? while True:
? ? ? ? ? ? # still results pending?
? ? ? ? ? ? if not self.workRequests: #沒有任務(wù)
? ? ? ? ? ? ? ? raise NoResultsPending
? ? ? ? ? ? # are there still workers to process remaining requests?
? ? ? ? ? ? elif block and not self.workers:#無工作線程
? ? ? ? ? ? ? ? raise NoWorkersAvailable
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? # get back next results
? ? ? ? ? ? ? ? request, result = self._results_queue.get(block=block)
? ? ? ? ? ? ? ? # has an exception occured?
? ? ? ? ? ? ? ? if request.exception and request.exc_callback:
? ? ? ? ? ? ? ? ? ? request.exc_callback(request, result)
? ? ? ? ? ? ? ? # hand results to callback, if any
? ? ? ? ? ? ? ? if request.callback and not \
? ? ? ? ? ? ? ? ? ? ? ?(request.exception and request.exc_callback):
? ? ? ? ? ? ? ? ? ? request.callback(request, result)
? ? ? ? ? ? ? ? del self.workRequests[request.requestID]
? ? ? ? ? ? except Queue.Empty:
? ? ? ? ? ? ? ? break

? ? def wait(self):
? ? ? ? """Wait for results, blocking until all have arrived."""
? ? ? ? while 1:
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? self.poll(True)
? ? ? ? ? ? except NoResultsPending:
? ? ? ? ? ? ? ? break

有三個(gè)類 ThreadPool,workRequest,workThread

第一步我們需要建立一個(gè)線程池調(diào)度ThreadPool實(shí)例(根據(jù)參數(shù)而產(chǎn)生多個(gè)線程works),然后再通過makeRequests創(chuàng)建具有多個(gè)不同參數(shù)的任務(wù)請求workRequest,然后把任務(wù)請求用putRequest放入線程池中的任務(wù)隊(duì)列中,此時(shí)線程workThread就會(huì)得到任務(wù)callable,然后進(jìn)行處理后得到結(jié)果,存入結(jié)果隊(duì)列。如果存在callback就對結(jié)果調(diào)用函數(shù)。

注意:結(jié)果隊(duì)列中的元素是元組(request,result)這樣就一一對應(yīng)了。

python線程池使用樣例

import os
import threading
import time
from threading import Thread
from concurrent.futures import ThreadPoolExecutor


threadPool = ThreadPoolExecutor(max_workers=5,thread_name_prefix="test_")

def test(v1,v2):
    print(threading.current_thread().name,v1,v2)
    time.sleep(2)

if __name__=='__main__':

    for i in range(0,10):
        threadPool.submit(test,i,i+1)
    threadPool.shutdown(wait=True)

原文鏈接:https://blog.csdn.net/biggbang/article/details/122547980

欄目分類
最近更新