網站首頁 編程語言 正文
join()與task_done()的關系
在網上大多關于join()與task_done()的結束原話是這樣的:
-
Queue.task_done()
在完成一項工作之后,Queue.task_done()函數向任務已經完成的隊列發送一個信號? -
Queue.join()
實際上意味著等到隊列為空,再執行別的操作
但是可能很多人還是不太理解,這里以我自己的理解來闡述這兩者的關聯。
理解
如果線程里每從隊列里取一次,但沒有執行task_done(),則join無法判斷隊列到底有沒有結束,在最后執行個join()是等不到結果的,會一直掛起。
可以理解為,每task_done一次 就從隊列里刪掉一個元素,這樣在最后join的時候根據隊列長度是否為零來判斷隊列是否結束,從而執行主線程。
下面看個自己寫的例子:
下面這個例子,會在join()的地方無限掛起,因為join在等隊列清空,但是由于沒有task_done,它認為隊列還沒有清空,還在一直等。
#!/usr/bin/env python # -*- coding:utf-8 -*- '''threading test''' import threading import queue from time import sleep #之所以為什么要用線程,因為線程可以start后繼續執行后面的主線程,可以put數據,如果不是線程直接在get阻塞。 class Mythread(threading.Thread): def __init__(self,que): threading.Thread.__init__(self) self.queue = que def run(self): while True: sleep(1) if self.queue.empty(): #判斷放到get前面,這樣可以,否則隊列最后一個取完后就空了,直接break,走不到print break item = self.queue.get() print(item,'!') #self.queue.task_done() return que = queue.Queue() tasks = [Mythread(que) for x in range(1)] for x in range(10): que.put(x) #快速生產 for x in tasks: t = Mythread(que) #把同一個隊列傳入2個線程 t.start() que.join() print('---success---')
如果把self.queue.task_done() ?注釋去掉,就會順利執行完主程序。
這就是“ Queue.task_done()函數向任務已經完成的隊列發送一個信號”這句話的意義,能夠讓join()函數能判斷出隊列還剩多少,是否清空了。
而事實上我們看下queue的源碼可以看出確實是執行一次未完成隊列減一:
def task_done(self): '''Indicate that a formerly enqueued task is complete. Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. ''' with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished
快速生產-快速消費
上面的演示代碼是快速生產-慢速消費的場景,我們可以直接用task_done()與join()配合,來讓empty()判斷出隊列是否已經結束。
當然,queue我們可以正確判斷是否已經清空,但是線程里的get隊列是不知道,如果沒有東西告訴它,隊列空了,因此get還會繼續阻塞,那么我們就需要在get程序中加一個判斷,如果empty()成立,break退出循環,否則get()還是會一直阻塞。
慢速生產-快速消費
但是如果生產者速度與消費者速度相當,或者生產速度小于消費速度,則靠task_done()來實現隊列減一則不靠譜,隊列會時常處于供不應求的狀態,常為empty,所以用empty來判斷則不靠譜。
那么這種情況會導致 join可以判斷出隊列結束了,但是線程里不能依靠empty()來判斷線程是否可以結束。
我們可以在消費隊列的每個線程最后塞入一個特定的“標記”,在消費的時候判斷,如果get到了這么一個“標記”,則可以判定隊列結束了,因為生產隊列都結束了,也不會再新增了。
代碼如下:
#!/usr/bin/env python # -*- coding:utf-8 -*- '''threading test''' import threading import queue from time import sleep #之所以為什么要用線程,因為線程可以start后繼續執行后面的主線程,可以put數據,如果不是線程直接在get阻塞。 class Mythread(threading.Thread): def __init__(self,que): threading.Thread.__init__(self) self.queue = que def run(self): while True: item = self.queue.get() self.queue.task_done() #這里要放到判斷前,否則取最后最后一個的時候已經為空,直接break,task_done執行不了,join()判斷隊列一直沒結束 if item == None: break print(item,'!') return que = queue.Queue() tasks = [Mythread(que) for x in range(1)] #快速生產 for x in tasks: t = Mythread(que) #把同一個隊列傳入2個線程 t.start() for x in range(10): sleep(1) que.put(x) for x in tasks: que.put(None) que.join() print('---success---')
注意點
put隊列完成的時候千萬不能用task_done(),否則會報錯:
task_done() called too many times
因為該方法僅僅表示get成功后,執行的一個標記。
總結
原文鏈接:https://nlplearning.blog.csdn.net/article/details/79887720
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2022-12-08 C++?Boost?Any示例分析使用_C 語言
- 2022-07-11 Verilog中reg和SystemVerilog中logic的區別
- 2023-07-15 es6中export和export default的區別
- 2022-06-27 python?使用ctypes調用C/C++?dll詳情_python
- 2022-08-23 .net中的DI框架AutoFac簡單介紹_實用技巧
- 2022-05-11 使用git命令上傳代碼_其它綜合
- 2022-08-28 SpringCloudAlibaba-3.分布式事務(Seata)
- 2022-08-17 python熱力圖實現的完整實例_python
- 欄目分類
-
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支