網站首頁 編程語言 正文
本文實例為大家分享了Python實現線程池之線程安全隊列的具體代碼,供大家參考,具體內容如下
一、線程池組成
一個完整的線程池由下面幾部分組成,線程安全隊列、任務對象、線程處理對象、線程池對象。其中一個線程安全的隊列是實現線程池和任務隊列的基礎,本節我們通過threading包中的互斥量threading.Lock()和條件變量threading.Condition()來實現一個簡單的、讀取安全的線程隊列。
二、線程安全隊列的實現
包括put、pop、get等方法,為保證線程安全,讀寫操作時要添加互斥鎖;并且pop操作可以設置等待時間以阻塞當前獲取元素的線程,當新元素寫入隊列時通過條件變量通知解除等待操作。
class ThreadSafeQueue(object):
? ? def __init__(self, max_size=0):
? ? ? ? self.queue = []
? ? ? ? self.max_size = max_size ?# max_size為0表示無限大
? ? ? ? self.lock = threading.Lock() ?# 互斥量
? ? ? ? self.condition = threading.Condition() ?# 條件變量
? ? def size(self):
? ? ? ? """
? ? ? ? 獲取當前隊列的大小
? ? ? ? :return: 隊列長度
? ? ? ? """
? ? ? ? # 加鎖
? ? ? ? self.lock.acquire()
? ? ? ? size = len(self.queue)
? ? ? ? self.lock.release()
? ? ? ? return size
? ? def put(self, item):
? ? ? ? """
? ? ? ? 將單個元素放入隊列
? ? ? ? :param item:
? ? ? ? :return:
? ? ? ? """
? ? ? ? # 隊列已滿 max_size為0表示無限大
? ? ? ? if self.max_size != 0 and self.size() >= self.max_size:
? ? ? ? ? ? return ThreadSafeException()
? ? ? ? # 加鎖
? ? ? ? self.lock.acquire()
? ? ? ? self.queue.append(item)
? ? ? ? self.lock.release()
? ? ? ? self.condition.acquire()
? ? ? ? # 通知等待讀取的線程
? ? ? ? self.condition.notify()
? ? ? ? self.condition.release()
? ? ? ? return item
? ? def batch_put(self, item_list):
? ? ? ? """
? ? ? ? 批量添加元素
? ? ? ? :param item_list:
? ? ? ? :return:
? ? ? ? """
? ? ? ? if not isinstance(item_list, list):
? ? ? ? ? ? item_list = list(item_list)
? ? ? ? res = [self.put(item) for item in item_list]
? ? ? ? return res
? ? def pop(self, block=False, timeout=0):
? ? ? ? """
? ? ? ? 從隊列頭部取出元素
? ? ? ? :param block: 是否阻塞線程
? ? ? ? :param timeout: 等待時間
? ? ? ? :return:
? ? ? ? """
? ? ? ? if self.size() == 0:
? ? ? ? ? ? if block:
? ? ? ? ? ? ? ? self.condition.acquire()
? ? ? ? ? ? ? ? self.condition.wait(timeout)
? ? ? ? ? ? ? ? self.condition.release()
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? return None
? ? ? ? # 加鎖
? ? ? ? self.lock.acquire()
? ? ? ? item = None
? ? ? ? if len(self.queue):
? ? ? ? ? ? item = self.queue.pop()
? ? ? ? self.lock.release()
? ? ? ? return item
? ? def get(self, index):
? ? ? ? """
? ? ? ? 獲取指定位置的元素
? ? ? ? :param index:
? ? ? ? :return:
? ? ? ? """
? ? ? ? if self.size() == 0 or index >= self.size():
? ? ? ? ? ? return None
? ? ? ? # 加鎖
? ? ? ? self.lock.acquire()
? ? ? ? item = self.queue[index]
? ? ? ? self.lock.release()
? ? ? ? return item
class ThreadSafeException(Exception):
? ? pass
三、測試邏輯
3.1、測試阻塞邏輯
def thread_queue_test_1():
? ? thread_queue = ThreadSafeQueue(10)
? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)
? ? def consumer():
? ? ? ? while True:
? ? ? ? ? ? print('current time before pop is %d' % time.time())
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=3)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('get value from queue is %s' % item)
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print(item)
? ? ? ? ? ? print('current time after pop is %d' % time.time())
? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer)
? ? t1.start()
? ? t2.start()
? ? t1.join()
? ? t2.join()
測試結果:
我們可以看到生產者線程每隔2s向隊列寫入一個元素,消費者線程當無數據時默認阻塞3s。通過執行時間發現消費者線程確實發生了阻塞,當生產者寫入數據時結束當前等待操作。
3.2、測試讀寫加鎖邏輯
def thread_queue_test_2():
? ? thread_queue = ThreadSafeQueue(10)
? ? def producer():
? ? ? ? while True:
? ? ? ? ? ? thread_queue.put(random.randint(0, 10))
? ? ? ? ? ? time.sleep(2)
? ? def consumer(name):
? ? ? ? while True:
? ? ? ? ? ? item = thread_queue.pop(block=True, timeout=1)
? ? ? ? ? ? # item = thread_queue.get(2)
? ? ? ? ? ? if item is not None:
? ? ? ? ? ? ? ? print('%s get value from queue is %s' % (name, item))
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print('%s get value from queue is None' % name)
? ? t1 = threading.Thread(target=producer)
? ? t2 = threading.Thread(target=consumer, args=('thread1',))
? ? t3 = threading.Thread(target=consumer, args=('thread2',))
? ? t1.start()
? ? t2.start()
? ? t3.start()
? ? t1.join()
? ? t2.join()
? ? t3.join()
測試結果:
生產者還是每2s生成一個元素寫入隊列,消費者開啟兩個線程進行消費,默認阻塞時間為1s,打印結果顯示通過加鎖確保每次只有一個線程能獲取數據,保證了線程讀寫的安全。
原文鏈接:https://blog.csdn.net/wang_xiaowang/article/details/105933224
相關推薦
- 2022-12-06 靜態pod?創建使用示例詳解_docker
- 2023-04-06 Pytorch中關于model.eval()的作用及分析_python
- 2023-06-21 python?import?引用上上上級包的三種方法_python
- 2022-04-24 Pygame?Surface創建圖像的實現_python
- 2022-09-22 vector 迭代器失效問題
- 2022-09-17 ASP.NET?Core實現AES-GCM加密算法_實用技巧
- 2022-04-14 Qt自定義控件實現儀表盤_C 語言
- 2022-09-22 在容器內獲取 Pod 信息
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支