網站首頁 編程語言 正文
前言
扇出/扇入模式是更高級 API 集成的主要內容。這些應用程序并不總是表現出相同的可用性或性能特征。
扇出是從電子工程中借用的一個術語,它描述了輸入的邏輯門連接到另一個輸出門的數量。輸出需要提供足夠的電流來驅動所有連接的輸入。在事務處理系統中,用來描述為了服務一個輸入請求而需要做的請求總數。
扇入是指為邏輯單元的輸入方程提供輸入信號的最大數量。扇入是定義單個邏輯門可以接受的最大數字輸入數量的術語。大多數晶體管-晶體管邏輯 (TTL) 門有一個或兩個輸入,盡管有些有兩個以上。典型的邏輯門具有 1 或 2 的扇入。
扇入/扇出服務
我們舉一個現實世界的例子,一個電子商務網站將自己與一個第三方支付網關整合在一起。 這里,網站使用支付網關的 API 來彈出支付屏幕并輸入安全證書。同時,網站可能會調用另一個稱為分析的 API 來記錄支付的嘗試。這種將一個請求分叉成多個請求的過程被稱為 fan-out 扇出。在現實世界中,一個客戶請求可能涉及許多扇出服務。
另一個例子是 MapReduce。Map 是一個扇入的操作,而 Reduce 是一個扇出的 操作。一個服務器可以將一個信息扇出到下一組服務(API),并忽略結果。或者可以等到這些服務器的所有響應都返回。如 如下圖所示,一個傳入的請求被服務器復用為轉換成兩個傳出的請求:
扇入 fan-in 是一種操作,即兩個或更多傳入的請求會聚成一個請求。這種情況下,API如何聚合來自多個后端服務的結果,并將結果即時返回給客戶。
例如,想想一個酒店價格聚合器或航班票務聚合器,它從不同的數據提供者那里獲取關于多個酒店或航班的請求信息并顯示出來。
下圖顯示了扇出操作是如何結合多個請求并準備一個最終的響應,由客戶端消費的。
客戶端也可以是一個服務器,為更多的客戶提供服務。如上圖所示,左側的服務器正在收集來自酒店 A、酒店 B 和 航空公司供應商 A,并為不同的客戶準備另一個響應。
因此,扇入和扇出操作并不總是完全相互獨立的。大多數情況下,它將是一個混合場景,扇入和扇出操作都是相互配合的。
請記住,對下一組服務器的扇出操作可以是異步的。也是如此。對于扇入請求來說,這可能不是真的。扇入操作有時被稱為 API 調用。
Go 語言實現扇入/扇出模式
Fan-out:多個 goroutine 從同一個通道讀取數據,直到該通道關閉。OUT 是一種張開的模式,所以又被稱為扇出,可以用來分發任務。
Fan-in:1 個 goroutine 從多個通道讀取數據,直到這些通道關閉。IN 是一種收斂的模式,所以又被稱為扇入,用來收集處理的結果。
package main import ( "context" "log" "sync" "time" ) // Task 包含任務編號及任務所需時長 type Task struct { Number int Cost time.Duration } // task channel 生成器 func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task { taskCh := make(chan Task) go func() { defer close(taskCh) for _, task := range taskList { select { case <-ctx.Done(): return case taskCh <- task: } } }() return taskCh } // doTask 處理并返回已處理的任務編號作為通道的函數 func doTask(ctx context.Context, taskCh <-chan Task) <-chan int { doneTaskCh := make(chan int) go func() { defer close(doneTaskCh) for task := range taskCh { select { case <-ctx.Done(): return default: log.Printf("do task number: %d\n", task.Number) // task 任務處理 // 根據任務耗時休眠 time.Sleep(task.Cost) doneTaskCh <- task.Number // 已處理任務的編號放入通道 } } }() return doneTaskCh } // `fan-in` 意味著將多個數據流復用或合并成一個流。 // merge 函數接收參數傳遞的多個通道 “taskChs”,并返回單個通道 “<-chan int” func merge(ctx context.Context, taskChs []<-chan int) <-chan int { var wg sync.WaitGroup mergedTaskCh := make(chan int) mergeTask := func(taskCh <-chan int) { defer wg.Done() for t := range taskCh { select { case <-ctx.Done(): return case mergedTaskCh <- t: } } } wg.Add(len(taskChs)) for _, taskCh := range taskChs { go mergeTask(taskCh) } // 等待所有任務處理完畢 go func() { wg.Wait() close(mergedTaskCh) }() return mergedTaskCh } func main() { start := time.Now() // 使用 context 來防止 goroutine 泄漏,即使在處理過程中被中斷 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // taskList 定義每個任務及其成本 taskList := []Task{ Task{1, 1 * time.Second}, Task{2, 7 * time.Second}, Task{3, 2 * time.Second}, Task{4, 3 * time.Second}, Task{5, 5 * time.Second}, Task{6, 3 * time.Second}, } // taskChannelGerenator 是一個函數,它接收一個 taskList 并將其轉換為 Task 類型的通道 // 執行結果(int slice channel)存儲在 worker 中 // 由于 doTask 的結果是一個通道,被分給了多個 worker,這就對應了 fan-out 處理 taskCh := taskChannelGerenator(ctx, taskList) numWorkers := 4 workers := make([]<-chan int, numWorkers) for i := 0; i < numWorkers; i++ { workers[i] = doTask(ctx, taskCh) // doTask 處理并返回已處理的任務編號作為通道的函數 } count := 0 for d := range merge(ctx, workers) { // merge 從中讀取已處理的任務編號 count++ log.Printf("done task number: %d\n", d) } log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds()) }
參考鏈接:
Fan-in/fan-out of services
Understanding the Fan-Out/Fan-In API Integration Pattern
原文鏈接:https://juejin.cn/post/7128037479587790855
相關推薦
- 2022-05-22 Nginx服務安裝及軟件升級_nginx
- 2022-08-27 一文了解Go語言中的函數與方法的用法_Golang
- 2023-07-07 Spring整合Junit單元測試
- 2022-10-23 使用React組件編寫溫度顯示器_React
- 2022-04-25 C#使用Npoi導出Excel并合并行列_C#教程
- 2024-02-29 UNI-APP中點擊事件多重響應問題的解決,list列表項item和列表項item中按鈕的點擊事件沖
- 2022-09-20 C#?Winform實現復制文件顯示進度_C#教程
- 2023-12-10 該方法僅能傳入 lambda 表達式產生的合成類
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支