網站首頁 編程語言 正文
Okio是一個IO庫,底層基于Java原生的輸入輸出流實現。但原生的輸入輸出流并沒有提供超時的檢測機制。而Okio實現了這個功能。建議讀者先閱讀 Android | 徹底理解 Okio 之源碼篇 ,然后再閱讀本篇內容會更好理解。
Timeout 類的設計
探討超時機制,首先要了解Timeout
這個類。Timeout
實現了Okio的同步超時檢測,這里的同步指的是“任務執行”和“超時檢測”是同步的,有順序的。同步超時不會直接中斷任務執行,它首先會檢查是否發生超時,然后決定是否中斷任務執行。throwIfReached
就是一個同步超時檢測的方法。
理解 timeout 與 deadline 的區別
timeout
中文意為“超時”,deadline
中文意為“最后期限”,它們是有明顯區別的。 Timeout
類中有一系列的timeoutXxx
方法,timeoutXxx
是用來設置**一次操作完成的最大等待時間。若這個操作在等待時間內沒有結束,則認為超時。 deadlineXxx
系列方法則是用來設置一項任務完成的最大等待時間。**意味著在未來多長時間內,需要將這項任務完成,否則認為超時。它可能包含一次或多次的操作。
讀取文件的例子
回顧下之前Okio讀取文件例子。
public void readFile() { try { FileInputStream fis = new FileInputStream("test.txt"); okio.Source source = Okio.source(fis); BufferedSource bs = Okio.buffer(source); source.timeout().deadline(1, TimeUnit.MILLISECONDS); String res = bs.readUtf8(); System.out.println(res); } catch (Exception e){ e.printStackTrace(); } }
在這個例子中,我們使用deadline
設置了超時時間為1ms,這意味著從現在開始,讀取文件的這項任務,必須在未來的1ms內完成,否則認為超時。而讀取文件的這項任務,就包含了多次的文件讀取操作。
搖骰子的例子
我們再來看下面這個搖骰子的程序。Dice
是一個骰子類,roll
方法表示搖骰子,搖出來的點數latestTotal
不會超過12。rollAtFixedRate
會開啟一個線程,每隔一段時間調用roll
方法搖一次骰子。awaitTotal
方法會當骰子的點數與我們傳遞進去的total
值一樣或者超時而結束。
private class Dice { Random random = new Random(); int latestTotal; // 搖骰子 public synchronized void roll() { latestTotal = 2 + random.nextInt(6) + random.nextInt(6); System.out.println("Rolled " + latestTotal); notifyAll(); } // 開啟一個線程,每隔一段時間執行 roll 方法 public void rollAtFixedRate(int period, TimeUnit timeUnit) { Executors.newScheduledThreadPool(0).scheduleAtFixedRate(new Runnable() { public void run() { roll(); } }, 0, period, timeUnit); } // 超時檢測 public synchronized void awaitTotal(Timeout timeout, int total) throws InterruptedIOException { while (latestTotal != total) { timeout.waitUntilNotified(this); } } }
timeout()
是一個測試骰子類的方法,在主線程中運行。該程序設置每隔3s搖一次骰子,主線程設置超時時間為6s,期望搖到的點數是20。因為設置的超時是timeoutXxx
系列的方法,所以這里超時的意思是“只要我搖一次骰子的時間不超過6s,那么我就不會超時,可以一直搖骰子”。因為搖出骰子的最大點數是12,而期望值是20,永遠也搖不出來20這個點數,且搖一次骰子的時間是3s多,也不滿足超時的時間。所以主線程就會一直處于等待狀態。
public void timeout(){ try { Dice dice = new Dice(); dice.rollAtFixedRate(3, TimeUnit.SECONDS); Timeout timeout = new Timeout(); timeout.timeout(6, TimeUnit.SECONDS); dice.awaitTotal(timeout, 20); } catch (Exception e) { e.printStackTrace(); } }
現在將timeout()
方法修改一下,將timeout.timeout(6, TimeUnit.SECONDS)
改為timeout.deadline(6, TimeUnit.SECONDS)
,之前我們說過deadlineXxx
設置的超時**意味著在未來多長時間內,需要將這項任務完成。**在搖骰子這里的意思就是“從現在開始,我只可以搖6s的骰子。超過這個時間你還在搖,則認為超時”。它關注的是可以搖多久的骰子,而不是搖一次骰子不能超過多久的時間。
public void timeout(){ try { Dice dice = new Dice(); dice.rollAtFixedRate(3, TimeUnit.SECONDS); Timeout timeout = new Timeout(); timeout.deadline(6, TimeUnit.SECONDS); dice.awaitTotal(timeout, 20); } catch (Exception e) { e.printStackTrace(); } }
上述程序,主線程會在6s后因超時而停止等待,結束運行。
等待直到喚醒
前面舉了兩個例子讓大家理解Okio中timeout
和deadline
的區別。在搖骰子的例子中用到了waitUntilNotified
這個方法來檢測超時,中文意思為“等待直到喚醒”。也就是Java多線程中經典的“等待-喚醒”機制,該機制常常用于多線程之間的通信。調用waitUntilNotified
方法的線程會一直處于等待狀態,除非被喚醒或者因超時而拋出異常。下面是該方法的源碼。
public final void waitUntilNotified(Object monitor) throws InterruptedIOException { try { boolean hasDeadline = hasDeadline(); long timeoutNanos = timeoutNanos(); // 若沒有設置 deadline && timeout,則一直等待直到喚醒 if (!hasDeadline && timeoutNanos == 0L) { monitor.wait(); // There is no timeout: wait forever. return; } // Compute how long we'll wait. // 計算等待的時長,若同時設置了deadline 和 timeout,則 deadline 優先 long waitNanos; long start = System.nanoTime(); if (hasDeadline && timeoutNanos != 0) { long deadlineNanos = deadlineNanoTime() - start; waitNanos = Math.min(timeoutNanos, deadlineNanos); } else if (hasDeadline) { waitNanos = deadlineNanoTime() - start; } else { waitNanos = timeoutNanos; } // Attempt to wait that long. This will break out early if the monitor is notified. long elapsedNanos = 0L; if (waitNanos > 0L) { long waitMillis = waitNanos / 1000000L; // 等待 waitNanos monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L)); // 計算從等待 waitNanos 到喚醒所用時間 elapsedNanos = System.nanoTime() - start; } // Throw if the timeout elapsed before the monitor was notified. // 若等待了 waitNanos 還沒喚醒,認為超時 if (elapsedNanos >= waitNanos) { throw new InterruptedIOException("timeout"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // Retain interrupted status. throw new InterruptedIOException("interrupted"); } }
查看waitUntilNotified
的源碼,我們發現該方法基于“等待-通知”機制,添加了多線程之間的超時檢測功能,一個線程用來執行具體的任務,一個線程調用該方法來檢測超時。在Okio中的管道就使用了waitUntilNotified
這個方法。
AsyncTimeout 類的設計
AsyncTimeout
內部維護一個單鏈表,節點的類型是AsyncTimeout
,以到超時之前的剩余時間升序排序,即超時的剩余時間越大,節點就在鏈表越后的位置。對鏈表的操作,使用了synchronized
關鍵字加類鎖,保證在同一時間,只有一個線程可以對鏈表進行修改訪問操作。
AsyncTimeout
實現了Okio的異步超時檢測。這里的異步指的是“任務執行”和“超時檢測”是異步的,在執行任務的同時,也在進行任務的“超時檢測”。你會覺得這和上面搖骰子的例子很像,一個線程執行任務,一個線程檢測超時。事實上,AsyncTimeout
也正是這樣實現的,它內部的Watchdog
線程就是用來檢測超時的。當我們要對一次操作或一項任務設置超時,使用成對的enter()
和exit()
,模板代碼如下。
enter(); // do something exit();
若上面do something
的操作超時,timedOut()
方法將會在Watchdog
線程被回調。可以看見,這種包裹性的模板代碼,靈活性很大,我們幾乎可以在其中放置任何想要檢測超時的一個或多個操作。
AsyncTimeout 成員變量
下面是AsyncTimeout
類主要的成員變量。
private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60); static @Nullable AsyncTimeout head; private boolean inQueue; private @Nullable AsyncTimeout next; private long timeoutAt;
-
IDLE_TIMEOUT_MILLIS
,在單鏈表中沒有節點時,Watchdog
線程等待的時間 -
head
,單鏈表的頭結點,是一個虛假節點。當鏈表中只存在該節點,認為該鏈表為空。 -
inQueue
,當前節點是否在鏈表中。 -
next
,當前節點的下一個節點。 -
timeoutAt
,以當前時間為基準,當前節點在將來何時超時。
AsyncTimeout 成員方法
scheduleTimeout 有序的將超時節點加入到鏈表中
scheduleTimeout
方法可以將一個超時節點按照超時的剩余時間有序的插入到鏈表當中。注意該方法使用synchronized
修飾,是一個同步方法,可以保證對鏈表的操作是線程安全的。
private static synchronized void scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // Start the watchdog thread and create the head node when the first timeout is scheduled. // 若 head 節點為 null, 初始化 head 并啟動 Watchdog 線程 if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); } // 計算 node 節點的 timeoutAt 值 long now = System.nanoTime(); if (timeoutNanos != 0 && hasDeadline) { // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around, // Math.min() is undefined for absolute values, but meaningful for relative ones. node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now); } else if (timeoutNanos != 0) { node.timeoutAt = now + timeoutNanos; } else if (hasDeadline) { node.timeoutAt = node.deadlineNanoTime(); } else { throw new AssertionError(); } // Insert the node in sorted order. // 返回 node 節點的超時剩余時間 long remainingNanos = node.remainingNanos(now); // 從 head 節點開始遍歷鏈表, 將 node 節點插入到合適的位置 for (AsyncTimeout prev = head; true; prev = prev.next) { // 若當前遍歷的節點下一個節點為 null 或者 node 節點的超時剩余時間小于下一個節點 if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { // 將 node 節點插入到鏈表 node.next = prev.next; prev.next = node; // 若當前遍歷的節點是 head, 喚醒 watchdog 線程 if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; } } }
Watchdog 線程
在scheduleTimeout
方法中,若head
為null
,則會初始化head
并啟動Watchdog
線程。Watchdog
是一個守護線程,因此它會隨著JVM進程的結束而結束。前面我們說過Watchdog
線程是用來檢測超時的,它會逐個檢查鏈表中的超時節點是否超時,直到鏈表中所有節點檢查完畢后結束運行。
private static final class Watchdog extends Thread { Watchdog() { super("Okio Watchdog"); setDaemon(true); } public void run() { while (true) { try { // 超時的節點 AsyncTimeout timedOut; // 加鎖,同步代碼塊 synchronized (AsyncTimeout.class) { // 等待節點超時 timedOut = awaitTimeout(); // Didn't find a node to interrupt. Try again. // 當前該節點沒有超時,繼續檢查 if (timedOut == null) continue; // The queue is completely empty. Let this thread exit and let another watchdog thread // get created on the next call to scheduleTimeout(). // 鏈表中已經沒有超時節點,結束運行 if (timedOut == head) { head = null; return; } } // Close the timed out node. // timedOut 節點超時,回調 timedOut() 方法 timedOut.timedOut(); } catch (InterruptedException ignored) { } } } }
awaitTimeout 等待節點超時
在Watchdog
線程中會調用awaitTimeout
方法來等待檢測的節點超時,若檢測的節點沒有超時,該方法返回null
。否則返回超時的節點。
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { // Get the next eligible node. // 檢測的節點 AsyncTimeout node = head.next; // The queue is empty. Wait until either something is enqueued or the idle timeout elapses. // 若鏈表為空 if (node == null) { long startNanos = System.nanoTime(); // Watchdog 線程等待 60s,期間會釋放類鎖 AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS); // 等待 60s 后若鏈表還為空則返回 head,否則返回 null return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head // The idle timeout elapsed. : null; // The situation has changed. } // node 節點超時剩余的時間 long waitNanos = node.remainingNanos(System.nanoTime()); // The head of the queue hasn't timed out yet. Await that. // node 節點超時剩余的時間 > 0,說明 node 還未超時,繼續等待 waitNanos 后返回 null if (waitNanos > 0) { // Waiting is made complicated by the fact that we work in nanoseconds, // but the API wants (millis, nanos) in two arguments. long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); AsyncTimeout.class.wait(waitMillis, (int) waitNanos); return null; } // The head of the queue has timed out. Remove it. // node 節點超時了,將 node 從鏈表中移除并返回 head.next = node.next; node.next = null; return node; }
enter 進入超時檢測
分析完上面三個方法后再來看enter
就非常的簡單了,enter
內部調用了scheduleTimeout
方法來添加一個超時節點到鏈表當中,而Watchdog
線程隨即會開始檢測超時。
public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && !hasDeadline) { return; // No timeout and no deadline? Don't bother with the queue. } // 更新 inQueue 為 true inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); }
exit 退出超時檢測
前面說過,enter
和exit
在檢測超時是需要成對出現的。它們之間的代碼就是需要檢測超時的代碼。exit
方法的返回值表示enter
和exit
中間檢測的代碼是否超時。
public final boolean exit() { if (!inQueue) return false; // 更新 inQueue 為 false inQueue = false; return cancelScheduledTimeout(this); }
cancelScheduledTimeout
方法會將當前的超時節點從鏈表中移除。為了保證對鏈表的操作是線程安全的,該方法也是一個同步方法。我們知道在awaitTimeout
方法中,若某個節點超時了會將它從鏈表中移除。那么當調用cancelScheduledTimeout
發現node
不在鏈表中,則一定表明node
超時了。
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) { // Remove the node from the linked list. // 若 node 在鏈表中,將其移除。 for (AsyncTimeout prev = head; prev != null; prev = prev.next) { if (prev.next == node) { prev.next = node.next; node.next = null; return false; } } // The node wasn't found in the linked list: it must have timed out! // node 不在鏈表中,則 node 一定超時了,返回 true return true; }
總結
本文詳細講解了Okio中超時機制的實現原理,主要是Timeout
和AsyncTimeout
類的源碼分析與解讀。相信大家已經掌握了這部分知識,現總結一下文中要點。
- Okio 基于等待-喚醒機制,使用
Watchdog
線程來檢測超時。 - 當要對某項操作或任務進行超時檢測時,將它們放到
enter
和exit
的中間。 - Okio 對鏈表的使用非常頻繁,在文件讀寫和超時檢測都使用到了鏈表這個結構。
原文鏈接:https://juejin.cn/post/7200406897078157368
相關推薦
- 2022-11-01 golang連接MongoDB數據庫及數據庫操作指南_Golang
- 2022-11-19 Kotlin協程之Flow異常示例處理_Android
- 2022-10-27 Apache?Hive?通用調優featch抓取機制?mr本地模式_Linux
- 2022-05-29 .NET中的字符串駐留池介紹_基礎應用
- 2022-09-23 Android?創建AIDL文件使用教程_Android
- 2022-09-25 注解@Autowired如何自動裝配
- 2022-09-26 RNN的手動推導與代碼逐行實現
- 2022-08-29 Python可視化神器pyecharts繪制漏斗圖_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支