網站首頁 編程語言 正文
前言
在數據結構中,隊列遵循著FIFO(先進先出)的規則。在此基礎上,人們引申出了“優先級隊列”的概念。
優先級隊列,是帶有優先級屬性的隊列,所有的隊列元素按照優先級進行排序,消費者會先對優先級高的隊列元素進行處理。
優先級隊列的使用場景也是非常多的。比如,作業調度系統,當一個作業完成后,需要從剩下的作業中取出優先級最高的作業進行處理。又比如,一個商城的用戶分為普通用戶和vip用戶,vip用戶更容易搶到那些秒殺商品。
在本文中,我將和大家一起探討,golang優先級隊列的一種實現方案。
你可以收獲
- golang切片特性
- golang map特性
- golang并發場景下的解決方案
- golang優先級隊列的實現思路
正文
內容脈絡
為了讓大家腦海里有個大致的輪廓,我先把正文的大綱展示出來。
基礎知識
在正式開始“優先級隊列”這個話題之前,我們首先要明確以下的一些golang特性。
-
切片的特性
- 元素的有序性
- 非線程安全
-
map的特性
- 元素的無序性
- 非線程安全
-
并發場景下的解決方案
- 互斥鎖:可以對非線程安全的數據結構創建臨界區,一般用于同步場景;
- 管道:可以對非線程安全的數據結構進行異步處理
實現思路
既然,我們了解了golang的一些特性,那么,我們接下來就要明確,如何去實現優先級隊列了。
我們都知道,無論是哪一種隊列,必然是存在生產者和消費者兩個部分,對于優先級隊列來說,更是如此。因此,咱們的實現思路,也將從這兩個部分來談。
1、生產者
對于生產者來說,他只需要推送一個任務及其優先級過來,咱們就得根據優先級處理他的任務。
由于,我們不大好判斷,到底會有多少種不同的優先級傳過來,也無法確定,每種優先級下有多少個任務要處理,所以,我們可以考慮使用map來存儲優先級隊列。其中key為優先級,value為屬于該優先級下的任務隊列(即管道) 。
2、消費者
對于消費者來說,他需要獲取優先級最高的任務進行消費。
但是,如果只按照上面所說的map來存儲優先級隊列的話,我們是沒法找到優先級最高的任務隊列的,因為map的元素是無序的。那么,我們怎么處理這個問題呢?
我們都知道,在golang的數據結構里,切片的元素是具有有序性的。那么,我們只需要將所有的優先級按從小到大的方式,存儲在一個切片里,就可以了。等到消費的時候,我們可以先從切片中,取出最大的優先級,然后再根據這個key去優先級隊列的map中查詢,是不是就可以了?
目標規劃
想好了實現思路之后,我們就得對接下來的代碼實現做一個規劃了。
-
數據結構
- 存儲優先級隊列的map
- 存儲優先級的切片
- 互斥鎖
- 其他......
-
生產者
- 添加任務到優先級隊列
-
消費者
- 從優先級隊列獲取任務
步步為營
1、數據流
(1)調用NewPriorityQueue() ,初始化優先級隊列對象。
(2)初始化優先級隊列map。
(3)開啟協程,監聽一個接收推送任務的全局管道pushChan。
(4)用戶調用Push() ,推送的任務進入pushChan。
(5)推送的任務被加到優先級隊列中。
(6)消費者從優先級隊列中獲取優先級最高的一個任務。
(7)消費者執行任務。
2、數據結構
(1)優先級隊列對象
type PriorityQueue struct { mLock sync.Mutex // 互斥鎖,queues和priorities并發操作時使用 queues map[int]chan *task // 優先級隊列map pushChan chan *task // 推送任務管道 priorities []int // 記錄優先級的切片(優先級從小到大排列) }
(2)任務對象
type task struct { priority int // 任務的優先級 f func() // 任務的執行函數 }
3、初始化優先級隊列對象
func NewPriorityQueue() *PriorityQueue { pq := &PriorityQueue{ queues: make(map[int]chan *task), // 初始化優先級隊列map pushChan: make(chan *task, 100), } return pq }
當然,在這個過程中,我們需要對pushChan進行監聽。如果有任務推送過來,咱們得處理。
func (pq *PriorityQueue) listenPushChan() { for { select { case taskEle := <-pq.pushChan: // TODO 這里接收到推送的任務,并且準備處理 } } }
將這個監聽函數放到NewPriorityQueue()中:
func NewPriorityQueue() *PriorityQueue { pq := &PriorityQueue{ queues: make(map[int]chan *task), pushChan: make(chan *task, 100), } // 監聽pushChan go pq.listenPushChan() return pq }
4、生產者推送任務
生產者推送任務的時候,我們只需要將任務放到pushChan中:
func (pq *PriorityQueue) Push(f func(), priority int) { pq.pushChan <- &task{ f: f, priority: priority, } }
5、將推送任務加到優先級隊列中
這一步就比較關鍵了。我們前面談到,優先級隊列最核心的數據結構有兩個:優先級隊列map和優先級切片。因此,推送任務添加到優先級隊列的操作,咱們得分兩種情況來看:
(1)之前已經推過相同優先級的任務
這種情況非常簡單,咱們其實只要操作優先級隊列map就可以了。
func (pq *PriorityQueue) listenPushChan() { for { select { case taskEle := <-pq.pushChan: priority := taskEle.priority pq.mLock.Lock() if v, ok := pq.queues[priority]; ok { pq.mLock.Unlock() // 之前推送過相同優先級的任務 // 將推送的任務塞到對應優先級的隊列中 v <- taskEle continue } // todo 之前未推過相同優先級任務的處理... } } }
(2)之前未推過相同優先級的任務
這種情況會稍微復雜一些。我們不僅要將新的優先級插入到優先級切片正確的位置,而且要將任務添加到對應優先級的隊列。
1)將新的優先級插入到優先級切片中
a. 首先,咱們得尋找新優先級在切片中的插入位置。這里,咱們用了二分法。
// 通過二分法尋找新優先級的切片插入位置 func (pq *PriorityQueue) getNewPriorityInsertIndex(priority int, leftIndex, rightIndex int) (index int) { if len(pq.priorities) == 0 { // 如果當前優先級切片沒有元素,則插入的index就是0 return 0 } length := rightIndex - leftIndex if pq.priorities[leftIndex] >= priority { // 如果當前切片中最小的元素都超過了插入的優先級,則插入位置應該是最左邊 return leftIndex } if pq.priorities[rightIndex] <= priority { // 如果當前切片中最大的元素都沒超過插入的優先級,則插入位置應該是最右邊 return rightIndex + 1 } if length == 1 && pq.priorities[leftIndex] < priority && pq.priorities[rightIndex] >= priority { // 如果插入的優先級剛好在僅有的兩個優先級之間,則中間的位置就是插入位置 return leftIndex + 1 } middleVal := pq.priorities[leftIndex+length/2] // 這里用二分法遞歸的方式,一直尋找正確的插入位置 if priority <= middleVal { return pq.getNewPriorityInsertIndex(priority, leftIndex, leftIndex+length/2) } else { return pq.getNewPriorityInsertIndex(priority, leftIndex+length/2, rightIndex) } }
b. 找到插入位置之后,我們才要插入。在這個過程中,插入位置右側的元素全部都要向右邊移動一位。
// index右側元素均需要向后移動一個單位 func (pq *PriorityQueue) moveNextPriorities(index, priority int) { pq.priorities = append(pq.priorities, 0) copy(pq.priorities[index+1:], pq.priorities[index:]) pq.priorities[index] = priority }
這樣,我們就成功地將新的優先級插入了切片。
2)將推送任務放入優先級隊列map也就順理成章。
// 創建一個新優先級管道 pq.queues[priority] = make(chan *task, 10000) // 將任務塞到新的優先級管道中 pq.queues[priority] <- taskEle
因此,listenPushChan()的代碼如下:
func (pq *PriorityQueue) listenPushChan() { for { select { case taskEle := <-pq.pushChan: priority := taskEle.priority pq.mLock.Lock() if v, ok := pq.queues[priority]; ok { pq.mLock.Unlock() // 將推送的任務塞到對應優先級的隊列中 v <- taskEle continue } // 如果這是一個新的優先級,則需要插入優先級切片,并且新建一個優先級的queue // 通過二分法尋找新優先級的切片插入位置 index := pq.getNewPriorityInsertIndex(priority, 0, len(pq.priorities)-1) // index右側元素均需要向后移動一個單位 pq.moveNextPriorities(index, priority) // 創建一個新優先級隊列 pq.queues[priority] = make(chan *task, 10000) // 將任務塞到新的優先級隊列中 pq.queues[priority] <- taskEle pq.mLock.Unlock() } } }
完成了生產者部分之后,接下來我們看看消費者。
6、消費者消費隊列
這里分成兩個步驟,首先咱們得拿到最高優先級隊列的任務,然后再去執行任務。代碼如下:
// 消費者輪詢獲取最高優先級的任務 func (pq *PriorityQueue) Consume() { for { task := pq.Pop() if task == nil { // 未獲取到任務,則繼續輪詢 continue } // 獲取到了任務,就執行任務 task.f() } } // 取出最高優先級隊列中的一個任務 func (pq *PriorityQueue) Pop() *task { pq.mLock.Lock() defer pq.mLock.Unlock() for i := len(pq.priorities) - 1; i >= 0; i-- { if len(pq.queues[pq.priorities[i]]) == 0 { // 如果當前優先級的隊列沒有任務,則看低一級優先級的隊列中有沒有任務 continue } // 如果當前優先級的隊列里有任務,則取出一個任務。 return <-pq.queues[pq.priorities[i]] } // 如果所有隊列都沒有任務,則返回null return nil }
7、完整代碼
這樣,咱們的優先級隊列就實現了。下面,我們將完整代碼展示。
pq.go
package priority_queue import ( "sync" ) type PriorityQueue struct { mLock sync.Mutex // 互斥鎖,queues和priorities并發操作時使用 queues map[int]chan *task // 優先級隊列map pushChan chan *task // 推送任務管道 priorities []int // 記錄優先級的切片(優先級從小到大排列) } type task struct { priority int // 任務的優先級 f func() // 任務的執行函數 } func NewPriorityQueue() *PriorityQueue { pq := &PriorityQueue{ queues: make(map[int]chan *task), pushChan: make(chan *task, 100), } go pq.listenPushChan() return pq } func (pq *PriorityQueue) listenPushChan() { for { select { case taskEle := <-pq.pushChan: priority := taskEle.priority pq.mLock.Lock() if v, ok := pq.queues[priority]; ok { pq.mLock.Unlock() // 將推送的任務塞到對應優先級的隊列中 v <- taskEle continue } // 如果這是一個新的優先級,則需要插入優先級切片,并且新建一個優先級的queue // 通過二分法尋找新優先級的切片插入位置 index := pq.getNewPriorityInsertIndex(priority, 0, len(pq.priorities)-1) // index右側元素均需要向后移動一個單位 pq.moveNextPriorities(index, priority) // 創建一個新優先級隊列 pq.queues[priority] = make(chan *task, 10000) // 將任務塞到新的優先級隊列中 pq.queues[priority] <- taskEle pq.mLock.Unlock() } } } // 插入work func (pq *PriorityQueue) Push(f func(), priority int) { pq.pushChan <- &task{ f: f, priority: priority, } } // index右側元素均需要向后移動一個單位 func (pq *PriorityQueue) moveNextPriorities(index, priority int) { pq.priorities = append(pq.priorities, 0) copy(pq.priorities[index+1:], pq.priorities[index:]) pq.priorities[index] = priority } // 通過二分法尋找新優先級的切片插入位置 func (pq *PriorityQueue) getNewPriorityInsertIndex(priority int, leftIndex, rightIndex int) (index int) { if len(pq.priorities) == 0 { // 如果當前優先級切片沒有元素,則插入的index就是0 return 0 } length := rightIndex - leftIndex if pq.priorities[leftIndex] >= priority { // 如果當前切片中最小的元素都超過了插入的優先級,則插入位置應該是最左邊 return leftIndex } if pq.priorities[rightIndex] <= priority { // 如果當前切片中最大的元素都沒超過插入的優先級,則插入位置應該是最右邊 return rightIndex + 1 } if length == 1 && pq.priorities[leftIndex] < priority && pq.priorities[rightIndex] >= priority { // 如果插入的優先級剛好在僅有的兩個優先級之間,則中間的位置就是插入位置 return leftIndex + 1 } middleVal := pq.priorities[leftIndex+length/2] // 這里用二分法遞歸的方式,一直尋找正確的插入位置 if priority <= middleVal { return pq.getNewPriorityInsertIndex(priority, leftIndex, leftIndex+length/2) } else { return pq.getNewPriorityInsertIndex(priority, leftIndex+length/2, rightIndex) } } // 取出最高優先級隊列中的一個任務 func (pq *PriorityQueue) Pop() *task { pq.mLock.Lock() defer pq.mLock.Unlock() for i := len(pq.priorities) - 1; i >= 0; i-- { if len(pq.queues[pq.priorities[i]]) == 0 { // 如果當前優先級的隊列沒有任務,則看低一級優先級的隊列中有沒有任務 continue } // 如果當前優先級的隊列里有任務,則取出一個任務。 return <-pq.queues[pq.priorities[i]] } // 如果所有隊列都沒有任務,則返回null return nil } // 消費者輪詢獲取最高優先級的任務 func (pq *PriorityQueue) Consume() { for { task := pq.Pop() if task == nil { // 未獲取到任務,則繼續輪詢 continue } // 獲取到了任務,就執行任務 task.f() } }
測試代碼pq_test.go:
package priority_queue import ( "fmt" "math/rand" "testing" "time" ) func TestQueue(t *testing.T) { defer func() { if err := recover(); err != nil { fmt.Println(err) } }() pq := NewPriorityQueue() rand.Seed(time.Now().Unix()) // 我們在這里,隨機生成一些優先級任務 for i := 0; i < 100; i++ { a := rand.Intn(10) go func(i int) { pq.Push(func() { fmt.Println("推送任務的編號為:", i) fmt.Println("推送的任務優先級為:", a) fmt.Println("============") }, a) }(i) } // 這里會阻塞,消費者會輪詢查詢任務隊列 pq.Consume() }
發散思維
上面的方案的確是實現了優先級隊列,但是,有一種極端情況:如果消費者的消費速度遠遠小于生產者的生產速度,并且高優先級的任務被不斷插入,這樣,低優先級的任務就會有“餓死”的風險。
對于這種情況,我們在消費的時候,可以考慮給每一個優先級隊列分配一個權重,高優先級的隊列有更大的概率被消費,低優先級的概率相對較小。感興趣的朋友們,可以自己去實現一下。
小結
本文和大家討論了優先級隊列在golang中的一種實現方案,里面應用到了切片、map、互斥鎖、管道等諸多golang特性,可以說是一個非常典型的案例。其實,優先級隊列在實際的業務場景中使用廣泛,其實現方式也不止一種,我們需要根據實際的需求,選擇最優解。
原文鏈接:https://juejin.cn/post/7180632889489358906
相關推薦
- 2023-07-13 el-table實現多選及反選
- 2023-03-20 使用Redis緩存時高效的批量刪除的幾種方案_Redis
- 2023-12-10 記錄一次導出Excel報表的錯誤
- 2023-01-05 Kotlin?協程與掛起函數及suspend關鍵字深入理解_Android
- 2022-08-16 Golang輕量級IoC容器安裝使用示例_Golang
- 2023-01-02 Pytes正確的配置使用日志功能_python
- 2022-06-10 C語言中sizeof函數踩過的坑總結_C 語言
- 2022-07-03 C#列表List<T>、HashSet和只讀集合介紹_C#教程
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支