網站首頁 編程語言 正文
場景
1、訂單成功后,在30分鐘內沒有支付,自動取消訂單
2、外賣平臺發送訂餐通知,下單成功后60s給用戶推送短信。
3、如果訂單一直處于某一個未完結狀態時,及時處理關單,并退還庫存
4、淘寶新建商戶一個月內還沒上傳商品信息,將凍結商鋪等
解決方案
1、DelayQueue 延時隊列
代碼dome
package com.study.base.delayMessage;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueue_JDKVesion {
// data prepare
static class DalayData implements Delayed{
private long times;
public DalayData(long times){
this.times = times;
}
/**
* 需實現最低維度為:TimeUnit.NANOSECONDS 級別的剩余時間
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return times - System.nanoTime();
}
@Override
public int compareTo(Delayed o) {
DalayData od = (DalayData) o;
return this.times - od.times < -1L ? -1 : this.times - od.times == 0 ? 0 : 1;
}
}
// test
public static void main(String[] args) throws InterruptedException {
DelayQueue<DalayData> delayQueue = new DelayQueue();
delayQueue.add(new DalayData(System.nanoTime()));
delayQueue.add(new DalayData(System.nanoTime() + 5L * 1000 * 1000 * 1000));
delayQueue.add(new DalayData(System.nanoTime() + 10L * 1000 * 1000 * 1000));
delayQueue.add(new DalayData(System.nanoTime() + 15L * 1000 * 1000 * 1000));
delayQueue.add(new DalayData(System.nanoTime() + 20L * 1000 * 1000 * 1000));
delayQueue.add(new DalayData(System.nanoTime() + 25L * 1000 * 1000 * 1000));
while (true){
DalayData dalayData = delayQueue.take();
System.out.println(DateFormatUtils.format(new Date(), DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT.getPattern()));
}
}
}
流程分析
準備數據列隊數據 implements Delayed
(列隊為優先級列表,到期的放前面),核心方法:delayQueue.take() 阻塞式獲取數據
-
若隊列沒有元素,進入條件隊列Condition,wait方法,線程park掛起
-
若隊列有元素,獲取隊列peek首個數據,若此個元素時間<=0,poll處理,若>0,加入條件隊列Condition,休息awaitNanos(delay)
-
若此時新增數據是最小的元素,條件隊列Condition喚醒第一個線程,來嘗試獲取數據
流程圖
優缺點
總體看,沒啥缺點,每個節點,環節,都無所謂的性能浪費
場景限定:jvm中,可能數據流失
2、定時任務 + mysql
優缺點
解決單jvm問題
性能不高
適合小數據量
定時任務 + redis(zset)
優缺點
Queue放在公共節點上,解決單jvm問題
若數據不多,浪費節點對queue空拉取
時效誤差,定時任務間隔
相對(定時任務 + mysql)原理一樣,體現redis 和 mysql 特性和性能問題
3、netty 時間輪延遲隊列
代碼dome
@Test
@org.junit.jupiter.api.Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
public void testTimerOverflowWheelLength() throws InterruptedException {
final HashedWheelTimer timer = new HashedWheelTimer(
Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 32);
final CountDownLatch latch = new CountDownLatch(3);
timer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
timer.newTimeout(this, 100, TimeUnit.MILLISECONDS);
latch.countDown();
}
}, 100, TimeUnit.MILLISECONDS);
latch.await();
assertFalse(timer.stop().isEmpty());
}
流程分析
創建HashedWheelTimer,核心4個元素(任務thread工廠,間隔時間,間隔時間單位,數組長度)
addTimeOut(添加任務),(若這是第一個任務,HashedWheelTimer還未啟動,任務啟動,懶加載的方式啟動)
HashedWheelTimer 任務啟動后,有個work線程,一直輪詢時間輪的數據,獲取到expire任務放到線程池處理
流程圖
優缺點
和jdk 的DelayQueue 比較類似,差別不大,設計思路和netty處理reactor網絡模型主從模式思路比較類似
-
數據結構: 數據 + 鏈表 vs 完全二叉樹 (查詢,新增,效率要高點)
-
獲取任務的方式:固定時間輪詢到下個節點 vs 完全準時,無時差 (沒有jdkDelayQueue 效率高,有數據執行誤差,即間隔時間)
3、rocketMQ官方 延遲消息
代碼dome
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
流程分析
定時消息(延遲隊列)是指消息發送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。 broker有配置項messageDelayLevel,默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬于某個topic。發消息時,設置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:
-
level == 0,消息為非延遲消息
-
1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s
-
level > maxLevel,則level== maxLevel,例如level==20,延遲2h
定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
需要注意的是,定時消息會在第一次寫入和調度寫入真實topic時都會計數,因此發送數量、tps都會變高。
細節待更新。。。
3、中間件延遲消息【neptune】
RocketMQ 遷移,延遲消息升級指南
neptune是基于rocketmq的延遲消息服務,主要為解決rocketmq無法發送任意時間段的消息。
-
支持任意時間(看硬盤情況。默認90天)、精確到秒的的延遲消息。
-
grpc的外部API,可用于直接存儲延遲消息,主要用于發送rocketmq失敗后異步的重試. 消息查詢等。
-
存儲基于rocksdb
-
支持HA,主背復制,主背切換
-
監控指標直接report到influxdb和log中,采用micrometer,可以很方便到report到別的時序數據庫中
推薦使用RocketMQ的延時隊列,功能強大,但是也要看具體場景
原文鏈接:https://blog.csdn.net/leige07112033/article/details/125919093
相關推薦
- 2024-01-11 org.apache.commons.collections.MapUtils Map集合工具類
- 2022-08-29 Python常見異常處理總結_python
- 2022-07-10 python日志管理loguru模塊實操
- 2022-04-07 一篇文章帶你學習Python3的高階函數_python
- 2022-09-14 Android?實現卡片堆疊錢包管理動畫效果_Android
- 2022-09-06 關于react+antd樣式不生效問題的解決方式_React
- 2022-06-28 python神經網絡使用Keras構建RNN訓練_python
- 2022-07-11 Android?ListView列表優化的方法詳解_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支