網站首頁 編程語言 正文
在很多.net開發體系中開發者在面對調度作業需求的時候一般會選擇三方開源成熟的作業調度框架來滿足業務需求,比如Hangfire、Quartz.NET這樣的框架。但是有些時候可能我們只是需要一個簡易的延遲任務,這個時候引入這些框架就費力不討好了。
最簡單的粗暴的辦法當然是:
Task.Run(async () =>
{
//延遲xx毫秒
await Task.Delay(time);
//業務執行
});
當時作為一個開發者,有時候還是希望使用更優雅的、可復用的一體化方案,比如可以實現一個簡易的時間輪來完成基于內存的非核心重要業務的延遲調度。什么是時間輪呢,其實就是一個環形數組,每一個數組有一個插槽代表對應時刻的任務,數組的值是一個任務隊列,假設我們有一個基于60秒的延遲時間輪,也就是說我們的任務會在不超過60秒(超過的情況增加分鐘插槽,下面會講)的情況下執行,那么如何實現?下面我們將定義一段代碼來實現這個簡單的需求
話不多說,擼代碼,首先我們需要定義一個時間輪的Model類用于承載我們的延遲任務和任務處理器。簡單定義如下:
public class WheelTask<T>
{
public T Data { get; set; }
public Func<T, Task> Handle { get; set; }
}
定義很簡單,就是一個入參T代表要執行的任務所需要的入參,然后就是任務的具體處理器Handle。接著我們來定義時間輪本輪的核心代碼:
可以看到時間輪其實核心就兩個東西,一個是毫秒計時器,一個是數組插槽,這里數組插槽我們使用了字典來實現,key值分別對應0到59秒。每一個插槽的value對應一個任務隊列。當添加一個新任務的時候,輸入需要延遲的秒數,就會將任務插入到延遲多少秒對應的插槽內,當計時器啟動的時候,每一跳剛好1秒,那么就會對插槽計數+1,然后去尋找當前插槽是否有任務,有的話就會調用ExecuteTask執行該插槽下的所有任務。
public class TimeWheel<T>
{
int secondSlot = 0;
DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, 0, secondSlot); } }
Dictionary<int, ConcurrentQueue<WheelTask<T>>> secondTaskQueue;
public void Start()
{
new Timer(Callback, null, 0, 1000);
secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
Enumerable.Range(0, 60).ToList().ForEach(x =>
{
secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
});
}
public async Task AddTaskAsync(int second, T data, Func<T, Task> handler)
{
var handTime = wheelTime.AddSeconds(second);
if (handTime.Second != wheelTime.Second)
secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
else
await handler(data);
}
async void Callback(object o)
{
if (secondSlot != 59)
secondSlot++;
else
{
secondSlot = 0;
}
if (secondTaskQueue[secondSlot].Any())
await ExecuteTask();
}
async Task ExecuteTask()
{
if (secondTaskQueue[secondSlot].Any())
while (secondTaskQueue[secondSlot].Any())
if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
await task.Handle(task.Data);
}
}
接下來就是如果我需要大于60秒的情況如何處理呢。其實就是增加分鐘插槽數組,舉個例子我有一個任務需要2分40秒后執行,那么當我 插入到時間輪的時候我先插入到分鐘插槽,當計時器每過去60秒,分鐘插槽值+1,當分鐘插槽對應有任務的時候就將這些任務從分鐘插槽里彈出再入隊到秒插槽中,這樣一個任務會先進入插槽值=2(假設從0開始計算)的分鐘插槽,計時器運行120秒后分鐘值從0累加到2,2插槽的任務彈出到插槽值=40的秒插槽里,當計時器再運行40秒,剛好就可以執行這個延遲2分40秒的任務。話不多說,上代碼:
首先我們將任務WheelTask增加一個Second屬性,用于當任務從分鐘插槽彈出來時需要知道自己入隊哪個秒插槽
public class WheelTask<T>
{
...
public int Second { get; set; }
...
}
接著我們再重新定義時間輪的邏輯增加分鐘插槽值以及插槽隊列的部分
public class TimeWheel<T>
{
int minuteSlot, secondSlot = 0;
DateTime wheelTime { get { return new DateTime(1, 1, 1, 0, minuteSlot, secondSlot); } }
Dictionary<int, ConcurrentQueue<WheelTask<T>>> minuteTaskQueue, secondTaskQueue;
public void Start()
{
new Timer(Callback, null, 0, 1000);、
minuteTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
secondTaskQueue = new Dictionary<int, ConcurrentQueue<WheelTask<T>>>();
Enumerable.Range(0, 60).ToList().ForEach(x =>
{
minuteTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
secondTaskQueue.Add(x, new ConcurrentQueue<WheelTask<T>>());
});
}
...
}
同樣的在添加任務的AddTaskAsync函數中我們需要增加分鐘,代碼改為這樣,當大于1分鐘的任務會入隊到分鐘插槽中,小于1分鐘的會按原邏輯直接入隊到秒插槽中:
public async Task AddTaskAsync(int minute, int second, T data, Func<T, Task> handler)
{
var handTime = wheelTime.AddMinutes(minute).AddSeconds(second);
if (handTime.Minute != wheelTime.Minute)
minuteTaskQueue[handTime.Minute].Enqueue(new WheelTask<T>(handTime.Second, data, handler));
else
{
if (handTime.Second != wheelTime.Second)
secondTaskQueue[handTime.Second].Enqueue(new WheelTask<T>(data, handler));
else
await handler(data);
}
}
最后的部分就是計時器的callback以及任務執行的部分:
async void Callback(object o)
{
bool minuteExecuteTask = false;
if (secondSlot != 59)
secondSlot++;
else
{
secondSlot = 0;
minuteExecuteTask = true;
if (minuteSlot != 59)
minuteSlot++;
else
{
minuteSlot = 0;
}
}
if (minuteExecuteTask || secondTaskQueue[secondSlot].Any())
await ExecuteTask(minuteExecuteTask);
}
async Task ExecuteTask(bool minuteExecuteTask)
{
if (minuteExecuteTask)
while (minuteTaskQueue[minuteSlot].Any())
if (minuteTaskQueue[minuteSlot].TryDequeue(out WheelTask<T> task))
secondTaskQueue[task.Second].Enqueue(task);
if (secondTaskQueue[secondSlot].Any())
while (secondTaskQueue[secondSlot].Any())
if (secondTaskQueue[secondSlot].TryDequeue(out WheelTask<T> task))
await task.Handle(task.Data);
}
基本上基于分鐘+秒的時間輪延遲任務核心功能就這些了,聰明的你一定知道如何擴展增加小時,天,月份甚至年份的時間輪了。雖然從代碼邏輯上可以實現,但是大部分情況下我們使用時間輪僅僅是完成一些內存易失性的非核心的任務延遲調度,實現天,周,月年意義不是很大。所以基本上到小時就差不多了。再多就上作業系統來調度吧。
原文鏈接:https://www.cnblogs.com/gmmy/p/17015538.html
相關推薦
- 2022-10-15 Qt鍵盤事件實現圖片在窗口上下左右移動_C 語言
- 2022-05-04 使用Spring.Net框架實現多數據庫_實用技巧
- 2023-11-13 數據結構——鏈表(python版)
- 2022-08-21 Python?正則?re.compile?真的必需嗎_python
- 2023-07-05 實際開發中如何存儲密碼(md5加鹽bcrypt)golang
- 2022-02-02 css 波紋擴散效果
- 2022-07-23 .Net創建型設計模式之簡單工廠模式(Simple?Factory)_基礎應用
- 2022-01-29 composer global require “fxp/composer-asset-plugin
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支