日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Combine中錯誤處理和Scheduler使用詳解_Swift

作者:Layer ? 更新時間: 2023-01-31 編程語言

錯誤處理

到目前為止,在我們編寫的大部分代碼中,我們沒有處理錯誤,而處理的都是“happy path”。在前面的文章中,我們了解到,Combine Publisher 聲明了兩個約束:

  • Output定義 Publisher 發出的值的類型;
  • Failure 定義 Publisher 發出的失敗的類型。

現在,我們將深入了解 Failure 在 Publisher 中的作用。

Never

失敗類型為 Never 的 Publisher 表示永遠不會發出失敗。它為這些 Publisher 提供了強大的保證。這類 Publisher 可讓我們專注于使用值,同時絕對確保 Publisher 只有成功完成的事件。

在新的 Playground 頁面添加以下代碼:

import Combine
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
func example(_ desc: String, _ action:() -> Void) {
    print("--- (desc) ---")
    action()
}
var subscriptions = Set<AnyCancellable>()
example("Just") {
  Just("Hello")
}

我們創建了一個帶有 Hello 字符串值的 Just。 Just 是不會發出失敗的。 請按住 Command 并單擊 Just 初始化程序并選擇 Jump to Definition,查看定義:

In contrast with Result.Publisher, a Just publisher can’t fail with an error. And unlike Optional.Publisher, a Just publisher always produces a value.

Combine 對 Never 的障保證不僅是理論上的,而是深深植根于框架及其各種 API 中。Combine 提供了幾個 Operator,這些 Operator 僅在保證 Publisher 永遠不會發出失敗事件時才可用。第一個是 sink 的變體,只處理值:

example("Just") {
  Just("Hello")
    .sink(receiveValue: { print($0) })
    .store(in: &subscriptions)
}

在上面的示例中,我們使用 sink(receiveValue:) ,這種特定的重載使我們可以忽略 Publisher 的完成事件,而只處理其發出的值。

此重載僅適用于這類“可靠”的 Publisher。在錯誤處理方面,Combine 是智能且安全的,如果可能拋出錯誤,它會強制我們處理完成事件。要看到這一點,我們需要將 Never 的 Publisher 變成可能發出失敗事件的 Publisher。

setFailureType(to:)

func setFailureType<E>(to failureType: E.Type) -> Publishers.SetFailureType<Self, E> where E : Error

Never Publisher 轉變為可能發出失敗事件的 Publisher 的第一種方法是使用 setFailureType。這是另一個僅適用于失敗類型為 Never 的 Publisher 的 Operator:

example("setFailureType") {
 &nbsp;Just("Hello")
 &nbsp; &nbsp;.setFailureType(to: MyError.self)
}

可以使用 .eraseToAnyPublisher(),來確認已改變的 Publisher 類型:

繼續修改上述代碼:

enum MyError: Error {
    case ohNo
}
example("setFailureType") {
    Just("Hello")
        .setFailureType(to: MyError.self)
        .sink(
            receiveCompletion: { completion in
                switch completion {
                case .failure(.ohNo):
                    print("Finished with OhNo!")
                case .finished:
                    print("Finished successfully!")
                }
            },
            receiveValue: { value in
                print("Got value: (value)")
            }
        )
        .store(in: &subscriptions)
}

現在我們只能使用 sink(receiveCompletion:receiveValue:)sink(receiveValue:) 重載不再可用,因為此 Publisher 可能會發出失敗事件。可以嘗試注釋掉 receiveCompletion查看編譯錯誤。

此外,失敗類型為為 MyError,這使我們可以針對.failure(.ohNo) 情況而無需進行不必要的強制轉換來處理該錯誤。

當然,setFailureType 的作用只是類型定義。 由于原始 Publisher 是 Just,因此實際上也不會引發任何錯誤。

assign(to:on:)

assign Operator 僅適用于不會發出失敗事件的 Publisher,與 setFailureType 相同。 向提供的 keypath 發送錯誤會導致未定義的行為。添加以下示例進行測試:

example("assign(to:on:)") {
    class Person {
        var name = "Unknown"
    }
    let person = Person()
    print(person.name)
    Just("Layer")
        .handleEvents(
            receiveCompletion: { _ in 
                print(person.name) 
            }
        )
        .assign(to: .name, on: person)
        .store(in: &subscriptions)
}

我們定義一個具有 name 屬性的 Person 類。創建一個 Person 實例并立即打印其 name。一旦 Publisher 發送完成事件,使用 handleEvents 再次打印此 name。最后,使用 assignname 設置為 Publisher 發出的值:

--- assign(to:on:) ---
Unknown
Layer

Just("Layer") 正下方添加以下行:

.setFailureType(to: Error.self)

這意味著它不再是 Publisher<String, Never>,而是現在的 Publisher<String, Error>。運行 Playground,我們將進行驗證:

Referencing instance method 'assign(to:on:)' on 'Publisher' requires the types 'any Error' and 'Never' be equivalent

assign(to:)

assign(to:on:) 有一個棘手的部分——它會 strong 捕獲提供給 on 參數的對象。在上一個示例之后添加以下代碼:

example("assign(to:)") {
  class MyViewModel: ObservableObject {
    @Published var currentDate = Date()
    init() {
      Timer.publish(every: 1, on: .main, in: .common)
        .autoconnect() 
        .prefix(3)
        .assign(to: .currentDate, on: self)
        .store(in: &subscriptions)
    }
  }
  let vm = MyViewModel()
  vm.$currentDate
    .sink(receiveValue: { print($0) })
    .store(in: &subscriptions)
}

我們 MyViewModel 中定義一個 @Published 屬性。 它的初始值為當前日期。在 init 中創建一個 Timer Publisher,它每秒發出當前日期。使用 prefix Operator 只接受 3 個更新。使用 assign(to:on:) 將每個日期更新給@Published 屬性。實例化 MyViewModelsink vm.$currentDate,并打印出每個值:

--- assign(to:) ---
2022-12-24 07:32:33 +0000
2022-12-24 07:32:34 +0000
2022-12-24 07:32:35 +0000
2022-12-24 07:32:36 +0000

看起來一切都很好。但是對assign(to:on:) 的調用創建了一個 strong 持有 self 的 Subscription。 導致 self 掛在Subscription 上,而 Subscription 掛在 self 上,創建了一個導致內存泄漏的引用循環。

因此引入了該 Operator 的另一個重載 assign(to:)。該 Operator 通過對 Publisher 的 inout 引用來將值分配給 @Published 屬性。因此以下兩行:

.assign(to: .currentDate, on: self)
.store(in: &subscriptions)

可以被替換為:

.assign(to: &$currentDate)

使用 assign(to:) Operator 將 inout 引用 Publisher 會打破引用循環。此外,它會在內部自動處理 Subscription 的內存管理,這樣我們就可以省略 store(in: &subscriptions)

assertNoFailure(_:file:line:)

當我們在開發過程確認 Publisher 以失敗事件完成時,assertNoFailure Operator 非常有用。它不會阻止上游發出失敗事件。但是,如果它檢測到錯誤,它會因錯誤而崩潰:

example("assertNoFailure") {
  Just("Hello")
    .setFailureType(to: MyError.self)
    .assertNoFailure()
    .sink(receiveValue: { print("Got value: ($0) ")}) 
    .store(in: &subscriptions)
}

我們使用 Just 創建一個“可靠”的 Publisher 并將其錯誤類型設置為 MyError。如果 Publisher 以錯誤事件完成,則使用 assertNoFailure 以崩潰。這會將 Publisher 的失敗類型轉回 Never。使用 sink 打印出任何接收到的值。請注意,由于 assertNoFailure 將失敗類型設置回 Never,因此 sink(receiveValue:) 重載可以直接使用。

運行 Playground,它可以正常工作:

--- assertNoFailure ---
Got value: Hello 

setFailureType 之后,添加以下行:

.tryMap { _ in throw MyError.ohNo }

一旦 Hello 被推送到下游,使用 tryMap 拋出錯誤。再次運行 Playground:

Playground execution failed:
error: Execution was interrupted, reason: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0).
...
frame #0: 0x00007fff232fbbf2 Combine`Combine.Publishers.AssertNoFailure...

由于 Publisher 發出失敗事件,playground 會 crash。 在某種程度上,我們可以將 assertNoFailure() 視為代碼的保護機制。 雖然我們不應該在生產環境中使用它,但在開發過程中提前發現問題非常有用。

處理錯誤

try* Operator

Combine 提供了一個區分可能引發錯誤和可能不會引發錯誤的 Operator 的方法:try 前綴。

注意:Combine 中所有以 try 為前綴的 Operator 在遇到錯誤時的行為相同。我們將只在本章中嘗試使用 tryMap Operator。

example("tryMap") {
    enum NameError: Error {
        case tooShort(String)
        case unknown
    }
    
    ["Aaaa", "Bbbbb", "Cccccc"]
        .publisher
        .map { value in
            return value.count
        }
        .sink(
            receiveCompletion: { print("Completed with ($0)") },
            receiveValue: { print("Got value: ($0)") }
        )
}

在上面的示例中,我們定義一個 NameError 錯誤枚舉。創建發布三個字符串的 Publisher。將每個字符串映射到它的長度。運行示例并查看控制臺輸出:

--- tryMap ---
Got value: 4
Got value: 5
Got value: 6
Completed with finished

將上面示例中的 map 替換為以下內容:

.tryMap { value -> Int in
    let length = value.count
    guard length >= 5 else {
        throw NameError.tooShort(value)
    }
    return value.count
}

我們檢查字符串的長度是否大于等于 5。否則,我們會拋出錯誤:

--- tryMap ---
Completed with failure(Page_Contents.(unknown context at $10e3cb984).(unknown context at $10e3cba6c).(unknown context at $10e3cbaa8).NameError.tooShort("Aaaa"))

映射錯誤

maptryMap 之間的區別不僅僅是后者允許拋出錯誤。 map 繼承了現有的失敗類型并且只操作 Publisher 的值,但 tryMap 沒有——它實際上將錯誤類型擦除為普通的 Swift 錯誤。 與帶有 try 前綴的所有 Operator 都是如此。

example("map vs tryMap") {
  enum NameError: Error {
    case tooShort(String)
    case unknown
  }
  Just("Hello")
    .setFailureType(to: NameError.self)
    .map { $0 + " World!" }
    .sink(
      receiveCompletion: { completion in
        switch completion {
        case .finished:
          print("Done!")
        case .failure(.tooShort(let name)):
          print("(name) is too short!")
        case .failure(.unknown):
          print("An unknown name error occurred")
        }
      },
      receiveValue: { print("Got value ($0)") }
    )
    .store(in: &subscriptions)
}

我們定義一個用于此示例的 NameError。創建一個只發出字符串 HelloJust。使用 setFailureType 設置失敗類型為 NameError。使用 map 將另一個字符串附加。最后,使用 sinkreceiveCompletionNameError 的每個情況打印出適當的消息。運行 Playground:

--- map vs tryMap ---
Got value Hello World!
Done!

Completion 的失敗類型是 NameError,這正是我們想要的。 setFailureType 允許我們專門針對 NameError 進行處理,例如 failure(.tooShort(let name))

map 更改為 tryMap

.tryMap { throw NameError.tooShort($0) }

我們會立即注意到 Playground 不再編譯。 再次點擊 completion

tryMap 刪除了我們的類型錯誤并將其替換為通用 Swift.Error 類型。即使我們實際上并沒有從 tryMap 中拋出錯誤,也會發生這種情況。

原因很簡單:Swift 還不支持類型化 throws,盡管自 2015 年以來 Swift Evolution 中一直在討論這個主題。這意味著當我們使用帶有 try 前綴的 Operator 時,我們的錯誤類型將總是被抹去到最常見的父類:Swift.Error

一種方法是將通用錯誤手動轉換為特定的錯誤類型,但這不是最理想的。它打破了嚴格類型錯誤的整個目的。幸運的是,Combine 為這個問題提供了一個很好的解決方案,稱為 mapError

在調用 tryMap 之后,添加以下行:

.mapError { $0 as? NameError ?? .unknown }

mapError 接收上游 Publisher 拋出的任何錯誤,并將其映射到我們想要的任何錯誤。在這種情況下,我們可以利用它將錯誤轉換回 NameError。這會將 Failure 恢復為所需要的類型,并將我們的 Publisher 轉回 Publisher<String, NameError>。構建并運行 Playground,最終可以按預期編譯和工作:

--- map vs tryMap ---
Hello is too short!

捕獲錯誤并重試

很多時候,當我們請求資源或執行某些計算時,失敗可能是由于網絡不穩定或其他資源不可用而導致的一次性 事件。

在這些情況下,我們通常會編寫一個機制來重試不同的工作,跟蹤嘗試次數,并處理如果所有嘗試都失敗的情況。Combine 讓這一切變得非常簡單。

retry Operator 接受一個數字。如果 Publisher 失敗,它將重新訂閱上游并重試至我們指定的次數。如果所有重試都失敗,它將錯誤推送到下游,就像沒有 retry Operator 一樣:

example("Catching and retrying") {
    enum MyError: Error {
        case network
    }
    var service1 = PassthroughSubject<Int, MyError>()    service1.send(completion: .failure(.network))
  
    service1
        .handleEvents(
            receiveSubscription: { _ in print("Trying ...") },
            receiveCompletion: {
                guard case .failure(let error) = $0 else { return }
                print("Got error: (error)")
            }
        )
        .retry(3)
        .sink(
            receiveCompletion: { print("($0)") },
            receiveValue: { number in
                print("Got Number: (number)")
            }
        )
        .store(in: &subscriptions)
}

我們有一個 service1,它發出了失敗事件。因此,訂閱 service1 肯定會獲得失敗事件。我們嘗試三次,并通過 handleEvents 打印訂閱和完成:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
failure(Page_Contents.(unknown context at $10fc7b584).(unknown context at $10fc7b77c).(unknown context at $10fc7b7b8).MyError.network)

運行 Playerground,我們會看到有四次 Trying。初始 Trying,加上由 retry Operator 觸發的三次重試。 由于 service1 不斷失敗,因此 Operator 會耗盡所有重試嘗試并將錯誤推送到 sink

調整代碼:

example("Catching and retrying") {
    enum MyError: Error {
        case network
    }
    var service1 = PassthroughSubject<Int, MyError>()
    service1.send(completion: .failure(.network))
    
    service1
        .handleEvents(
            receiveSubscription: { _ in print("Trying ...") },
            receiveCompletion: {
                guard case .failure(let error) = $0 else { return }
                print("Got error: (error)")
            }
        )
        .retry(3)
        .replaceError(with: 1)
        .sink(
            receiveCompletion: { print("($0)") },
            receiveValue: { number in
                print("Got Number: (number)")
            }
        )
        .store(in: &subscriptions)
}

service1 重試后,若還是失敗,我們將通過 replaceError 將失敗替換為 1:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 1
finished

或者,我們可以使用 catch 捕獲 service1 的失敗,并為下游提供另一個 Publisher:

example("Catching and retrying") {
    enum MyError: Error {
        case network
    }
    var service1 = PassthroughSubject<Int, MyError>()
    service1.send(completion: .failure(.network))
    var service2 = PassthroughSubject<Int, MyError>()
    
    service1
        .handleEvents(
            receiveSubscription: { _ in print("Trying ...") },
            receiveCompletion: {
                guard case .failure(let error) = $0 else { return }
                print("Got error: (error)")
            }
        )
        .retry(3)
        .catch { error in
            return service2
        }
        .sink(
            receiveCompletion: { print("($0)") },
            receiveValue: { number in
                print("Got Number: (number)")
            }
        )
        .store(in: &subscriptions)
    
    service2.send(2)
    service2.send(completion: .finished)
}

此時,下游將獲得到 service2 發出的值 2 和完成事件:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 2
finished

cheduler

我們已經遇到了一些將 Scheduler 作為參數的 Operator。大多數情況下,我們會簡單地使用 DispatchQueue.main,因為它方便、易于理解。除了 DispatchQueue.main,我們肯定已經使用了全局并發隊列,或創建一個串行調度隊列來運行操作。

但是為什么 Combine 需要一個新的類似概念呢?我們接著將了解為什么會出現 Scheduler 的概念,將探索 Combine 如何使異步事件和操作更易于使用,當然,我們還會試使用 Combine 提供的所有 Scheduler。

Scheduler 簡介

根據 Apple 的文檔,Scheduler 是一種定義何時及如何執行閉包的協議。Scheduler 提供上下文以盡快或在將來的某個事件執行未來的操作。該操作就是協議本身中定義的閉包。閉包也可以隱藏 Publisher 在特定 Scheduler 上執行的某些值的傳遞。

我們會注意到此定義有意避免對線程的任何引用,這是因為具體的實現是在 Scheduler 協議中,提供的“上下文”中的。因此,我們的代碼將在哪個線程上執行取決于選擇的 Scheduler。

記住這個重要的概念:Scheduler 不等于線程。我們將在后面詳細了解這對每個 Scheduler 意味著什么。讓我們從事件流的角度來看 Scheduler 的概念:

我們在上圖中看到的內容:

  • 在主 (UI) 線程上發生用戶操作,如按鈕按下;
  • 它會觸發一些工作在 Background Scheduler 上進行處理;
  • 要顯示的最終數據在主線程上傳遞給 Subscriber,Subscriber 可以更新 UI。

我們可以看到 Scheduler 的概念深深植根于前臺/后臺執行的概念。此外,根據我們選擇的實現,工作可以串行化或并行化。

因此,要全面了解 Scheduler,需要查看哪些類符合 Scheduler 協議。首先,我們需要了解與 Scheduler 相關的兩個重要 Operator。

Scheduler Operator

Combine 提供了兩個基本的 Operator 來使用 Scheduler:

subscribe(on:)subscribe(on:options:) 在指定的 Scheduler 上創建 Subscription(開始工作);

receive(on:)receive(on:options:) 在指定的 Scheduler 上傳遞值。

此外,以下 Operator 將 Scheduler 和 Scheduler options 作為參數:

debounce(for:scheduler:options:)

delay(for:tolerance:scheduler:options:)

measureInterval(using:options:)

throttle(for:scheduler:latest:)

timeout(_:scheduler:options:customError:)

subscribe(on:) 和 receive(on:)

在我們訂閱它之前,Publisher 是一個無生命的實體。但是當我們訂閱 Publisher 時會發生什么?有幾個步驟:

  • Publiser receive Subscriber 并創建 Subscription;
  • Subscriber receive Subscription 并從 Publiser 請求值(虛線);
  • Publiser 開始工作(通過 Subscription);
  • Publiser 發出值(通過 Subscription);
  • Operator 轉換值;
  • Subscriber 收到最終值。

當代碼訂閱 Publiser 時,步驟一、二和三通常發生在當前線程上。 但是當我們使用 subscribe(on:) Operator 時,所有這些操作都在我們指定的 Scheduler 上運行。

我們可能希望 Publiser 在后臺執行一些昂貴的計算以避免阻塞主線程。 執行此操作的簡單方法是使用 subscribe(on:)。以下是偽代碼:

let queue = DispatchQueue(label: "serial queue")
let subscription = publisher
  .subscribe(on: queue)
  .sink { value in ...

如果我們收到值后,想更新一些 UI 怎么辦?我們可以在閉包中執行類似 DispatchQueue.main.async { ... } 的操作,從主線程執行 UI 更新。有一種更有效的方法可以使用 Combine 的 receive(on:):

let subscription = publisher
  .subscribe(on: queue)
  .receive(on: DispatchQueue.main)
  .sink { value in ...

即使計算工作正常并從后臺線程發出結果,我們現在也可以保證始終在主隊列上接收值。這是安全地執行 UI 更新所需要的。

Scheduler 實現

Apple 提供了幾種 Scheduler 協議的具體實現:

  • ImmediateScheduler:一個簡單的 Scheduler,它立即在當前線程上執行代碼,這是默認的執行上下文,除非使用 subscribe(on:)receive(on:) 或任何其他將 Scheduler 作為參數的 Operator 進行修改。
  • RunLoop:綁定到 Foundation 的 Thread 對象。
  • DispatchQueue:可以是串行的或并發的。
  • OperationQueue:規范工作項執行的隊列。

這里省略了 TestScheduler,是一個虛擬的、模擬的 Scheduler,它是任何響應式編程框架測試時不可或缺的一部分。

ImmediateScheduler

在 Playground 中新增代碼:

example("ImmediateScheduler") { 
    let source = Timer
      .publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .scan(0) { counter, _ in counter + 1 }
    let publisher = source
        .receive(on: ImmediateScheduler.shared)
        .eraseToAnyPublisher()
    publisher.sink(receiveValue: { _ in
        print(Thread.current)
    })
    .store(in: &amp;subscriptions)
}

運行 Playground,我們會看到 Publisher 發出的每個值,都是在 MainThread 上:

--- ImmediateScheduler ---
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}

當前線程是主線程, ImmediateScheduler 立即在當前線程上調度。當我們在 .receive(on: ImmediateScheduler.shared) 前添加一行:

.receive(on: DispatchQueue.global())

執行 Playground,我們將在不同的線程收到值:

--- ImmediateScheduler ---
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}

ImmediateScheduler options 由于大多數 Operator 在其參數中接受 Scheduler,我們還可以找到一個接受 SchedulerOptions 值的參數。在 ImmediateScheduler 的情況下,此類型被定義為 Never,因此在使用 ImmediateScheduler 時,我們永遠不應該為 Operator 的 options 參數傳遞值。

ImmediateScheduler 的陷阱 關于 ImmediateScheduler 的一件事是它是即時的。我們無法使用 Scheduler 協議的任何 schedule(after:) 變體,因為我們需要指定的 SchedulerTimeType 沒有初始化方法,對于 ImmediateScheduler 無意義。

RunLoop scheduler

RunLoop 早于 DispatchQueue,它是一種在線程級別管理輸入源的方法。主線程有一個關聯的 RunLoop,我們還可以通過從當前線程調用 RunLoop.current 為任何線程獲取一個 RunLoop。

在 Playground 中添加此代碼:

example("RunLoop") { 
    let source = Timer
      .publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .scan(0) { counter, _ in counter + 1 }
    let publisher = source
        .receive(on: DispatchQueue.global())
        .handleEvents(receiveOutput: { _ in
            print("DispatchQueue.global: \(Thread.current)")
        })
        .receive(on: RunLoop.current)
        .handleEvents(receiveOutput: { _ in
            print("RunLoop.current: \(Thread.current)")
        })
        .eraseToAnyPublisher()
    publisher.sink(receiveValue: { _ in
    })
    .store(in: &amp;subscriptions)
}

當前 RunLoop.current 就是主線程的 RunLoop。執行 Playground:

--- RunLoop ---
DispatchQueue.global: &lt;NSThread: 0x12a71cd20&gt;{number = 3, name = (null)}
RunLoop.current: &lt;_NSMainThread: 0x12a705760&gt;{number = 1, name = main}
DispatchQueue.global: &lt;NSThread: 0x12a71cd20&gt;{number = 3, name = (null)}
RunLoop.current: &lt;_NSMainThread: 0x12a705760&gt;{number = 1, name = main}
DispatchQueue.global: &lt;NSThread: 0x12a71cd20&gt;{number = 3, name = (null)}
RunLoop.current: &lt;_NSMainThread: 0x12a705760&gt;{number = 1, name = main}

每發出一個值,都通過一個全局并發隊列的線程,然后在主線程上繼續。

RunLoop OptionsImmediateScheduler 一樣,RunLoop 不提供 SchedulerOptions 參數。

RunLoop 陷阱 RunLoop 的使用應僅限于主線程的 RunLoop,以及我們在需要時控制的 Foundation 線程中可用的 RunLoop。要避免的一個是在 DispatchQueue 上執行的代碼中使用 RunLoop.current。這是因為 DispatchQueue 線程可能是短暫的,這使得它們幾乎不可能依賴 RunLoop。

DispatchQueue Scheduler

DispatchQueue 符合 Scheduler 協議,并且完全可用于所有將 Scheduler 作為參數的 Operator。Dispatch 框架是 Foundation 的一個強大組件,它允許我們通過向系統管理的調度隊列提交工作來在多核硬件上同時執行代碼。DispatchQueue 可以是串行的(默認)或并發的。串行隊列按順序執行你提供給它的所有工作項。并發隊列將并行啟動多個工作項,以最大限度地提高 CPU 使用率:

  • 串行隊列通常用于保證某些操作不重疊。因此,如果所有操作都發生在同一個隊列中,他們可以使用共享資源而無需加鎖。
  • 并發隊列將同時執行盡可能多的操作。因此,它更適合純計算。

我們一直使用的最熟悉的隊列是 DispatchQueue.main。它直接映射到主線程,在這個隊列上執行的所有操作都可以自由地更新用戶界面。 當然,UI 更新只能在主線程進行。所有其他隊列,無論是串行的還是并發的,都在系統管理的線程池中執行它們的代碼。這意味著我們永遠不應該對隊列中運行的代碼中的當前線程做出任何假設。尤其不應使用 RunLoop.current 來安排工作,因為 DispatchQueue 管理其線程的方式有不同。

所有調度隊列共享同一個線程池,執行的串行隊列將使用該池中的任何可用線程。一個直接的結果是,來自同一隊列的兩個連續工作項可能使用不同的線程,但仍可以按順序執行。這是一個重要的區別:當使用 subscribe(on:)receive(on:) 或任何其他有 Scheduler 參數的 Operator 時,我們永遠不應假設線程每次都是相同的。

在 Playground 中添加代碼:

example("DispatchQueue") { 
    let source = PassthroughSubject<Void, Never>()
    let sourceQueue = DispatchQueue.main
    let subscription = sourceQueue.schedule(after: sourceQueue.now,
                                            interval: .seconds(1)) {
        source.send()
    }
    .store(in: &subscriptions)    let serialQueue = DispatchQueue(label: "Serial queue")
    source
        .handleEvents(receiveOutput: { _ in
            print("\(Thread.current)")
        })
        .receive(on: serialQueue)
        .handleEvents(receiveOutput: { _ in
            print("\(Thread.current)")
        })
        .sink(receiveValue: { _ in
        })
        .store(in: &subscriptions)
}

Timer 在主隊列 sourceQueue 上觸發并通過 source 發送 Void 值。接著在串行隊列 serialQueue 上接收值:

--- DispatchQueue ---
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x128025cd0>{number = 2, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x117904d90>{number = 5, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}

將 sourceQueue 也改為 DispatchQueue(label: "Serial queue"),也將在全局并發隊列上發出值:

--- DispatchQueue ---
<NSThread: 0x137e275b0>{number = 6, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x127e0f400>{number = 4, name = (null)}
<NSThread: 0x137e275b0>{number = 6, name = (null)}

DispatchQueue Options DispatchQueue 是唯一提供一組 Options 的 Scheduler,當 Operator 需要 SchedulerOptions 參數時,我們可以傳遞這些 Options。主要圍繞 QoS(服務質量)值,獨立于 DispatchQueue 上已設置的值。例如:

.receive(
  on: serialQueue,
  options: DispatchQueue.SchedulerOptions(qos: .userInteractive)
)

我們將 DispatchQueue.SchedulerOptions 的實例傳遞.userInteractive。在實際開發中使用這些 Options 有助于操作系統決定在同時有許多隊列忙碌的情況下首先安排哪個任務。

OperationQueue Scheduler

由于 OperationQueue 在內部使用 Dispatch,因此在表面上幾乎沒有區別:

example("OperationQueue") { 
    let queue = OperationQueue()
    let subscription = (1...10).publisher
        .receive(on: queue)
        .print()
        .sink { value in
            print("Received \(value)")
        }
        .store(in: &amp;subscriptions)
}

創建一個簡單的 Publisher 發出 1 到 10 之間的數字,然后打印該值,執行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1
receive value: (8)
Received 8
receive value: (9)
Received 9
receive value: (6)
Received 6
receive value: (3)
Received 3
receive value: (5)
Received 5
receive finished
receive value: (10)
receive value: (4)
receive value: (7)
receive value: (2)

按順序發出但無序到達!我們可以更改打印行以顯示當前線程:

print("Received \(value) on thread \(Thread.current)")

再次執行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (4)
Received 4 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x14e833620>{number = 6, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x14e80dfd0>{number = 4, name = (null)}
receive value: (1)
Received 1 on thread <NSThread: 0x14d70d840>{number = 5, name = (null)}
receive finished
receive value: (2)
receive value: (9)
receive value: (8)
receive value: (6)

每個值都是在不同的線程上接收的!如果我們查看有關 OperationQueue 的文檔,有一條關于線程的說明,OperationQueue 使用 Dispatch 框架(因此是 DispatchQueue)來執行操作。這意味著它不保證它會為每個交付的值使用相同的底層線程。

此外,每個 OperationQueue 中都有一個參數可以解釋一切:它是 maxConcurrentOperationCount。它默認為系統定義的數字,允許操作隊列同時執行大量操作。由于 Publisher 幾乎在同一時間發出所有值,它們被 Dispatch 的并發隊列分派到多個線程。

對代碼進行一些修改:

queue.maxConcurrentOperationCount = 1

再次執行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (2)
Received 2 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (4)
Received 4 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (6)
Received 6 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (7)
Received 7 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (8)
Received 8 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (9)
Received 9 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive finished

這一次,我們將獲得真正的順序執行——將 maxConcurrentOperationCount 設置為 1 相當于使用串行隊列。

OperationQueue Options OperationQueue 沒有可用的 SchedulerOptions。它實際上是 RunLoop.SchedulerOptions 類型,本身沒有提供任何 Options。

OperationQueue 陷阱 我們剛剛看到 OperationQueue 默認并發執行操作,我們需要非常清楚這一點,因為它可能會給我們帶來麻煩。當我們的 Publisher 發出值時都有大量工作要執行時,它可能是一個很好的工具。我們可以通過調整 maxConcurrentOperationCount 參數來控制負載。

內容參考

  • Combine | Apple Developer Documentation;
  • 來自 Kodeco 的書籍《Combine: Asynchronous Programming with Swift》;
  • 對上述 Kodeco 書籍的漢語自譯版 《Combine: Asynchronous Programming with Swift》整理與補充。

原文鏈接:https://juejin.cn/post/7180990074408927292

欄目分類
最近更新