網(wǎng)站首頁 編程語言 正文
在電商、支付等領(lǐng)域,往往會有這樣的場景,用戶下單后放棄支付了,那這筆訂單會在指定的時間段后進(jìn)行關(guān)閉操作,細(xì)心的你一定發(fā)現(xiàn)了像某寶、某東都有這樣的邏輯,而且時間很準(zhǔn)確,誤差在1s內(nèi);那他們是怎么實(shí)現(xiàn)的呢?
一般的做法有如下幾種
- 定時任務(wù)關(guān)閉訂單
- rocketmq延遲隊(duì)列
- rabbitmq死信隊(duì)列
- 時間輪算法
- redis過期監(jiān)聽
一、定時任務(wù)關(guān)閉訂單(最low)
一般情況下,最不推薦的方式就是關(guān)單方式就是定時任務(wù)方式,原因我們可以看下面的圖來說明
我們假設(shè),關(guān)單時間為下單后10分鐘,定時任務(wù)間隔也是10分鐘;通過上圖我們看出,如果在第1分鐘下單,在第20分鐘的時候才能被掃描到執(zhí)行關(guān)單操作,這樣誤差達(dá)到10分鐘,這在很多場景下是不可接受的,另外需要頻繁掃描主訂單號造成網(wǎng)絡(luò)IO和磁盤IO的消耗,對實(shí)時交易造成一定的沖擊,所以PASS
二、rocketmq延遲隊(duì)列方式
延遲消息 生產(chǎn)者把消息發(fā)送到消息服務(wù)器后,并不希望被立即消費(fèi),而是等待指定時間后才可以被消費(fèi)者消費(fèi),這類消息通常被稱為延遲消息。 在RocketMQ開源版本中,支持延遲消息,但是不支持任意時間精度的延遲消息,只支持特定級別的延遲消息。 消息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。
發(fā)送延遲消息(生產(chǎn)者)
/**
* 推送延遲消息
* @param topic
* @param body
* @param producerGroup
* @return boolean
*/
public boolean sendMessage(String topic, String body, String producerGroup)
{
try
{
Message recordMsg = new Message(topic, body.getBytes());
producer.setProducerGroup(producerGroup);
//設(shè)置消息延遲級別,我這里設(shè)置14,對應(yīng)就是延時10分鐘
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
recordMsg.setDelayTimeLevel(14);
// 發(fā)送消息到一個Broker
SendResult sendResult = producer.send(recordMsg);
// 通過sendResult返回消息是否成功送達(dá)
log.info("發(fā)送延遲消息結(jié)果:======sendResult:{}", sendResult);
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("發(fā)送時間:{}", format.format(new Date()));
return true;
}
catch (Exception e)
{
e.printStackTrace();
log.error("延遲消息隊(duì)列推送消息異常:{},推送內(nèi)容:{}", e.getMessage(), body);
}
return false;
}
消費(fèi)延遲消息(消費(fèi)者)
/**
* 接收延遲消息
*
* @param topic
* @param consumerGroup
* @param messageHandler
*/
public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler)
{
ThreadPoolUtil.execute(() ->
{
try
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup(consumerGroup);
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr(address);
//設(shè)置消費(fèi)者拉取消息的策略,*表示消費(fèi)該topic下的所有消息,也可以指定tag進(jìn)行消息過濾
consumer.subscribe(topic, "*");
//消費(fèi)者端啟動消息監(jiān)聽,一旦生產(chǎn)者發(fā)送消息被監(jiān)聽到,就打印消息,和rabbitmq中的handlerDelivery類似
consumer.registerMessageListener(messageHandler);
consumer.start();
log.info("啟動延遲消息隊(duì)列監(jiān)聽成功:" + topic);
}
catch (MQClientException e)
{
log.error("啟動延遲消息隊(duì)列監(jiān)聽失敗:{}", e.getErrorMessage());
System.exit(1);
}
});
}
實(shí)現(xiàn)監(jiān)聽類,處理具體邏輯
/**
* 延遲消息監(jiān)聽
*
*/
@Component
public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent>
{
@Resource
private MQUtil mqUtil;
@Resource
private CourseOrderTimeoutHandler courseOrderTimeoutHandler;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent)
{
// 訂單超時監(jiān)聽
mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler);
}
}
/**
* 實(shí)現(xiàn)監(jiān)聽
*/
@Slf4j
@Component
public class CourseOrderTimeoutHandler implements MessageListenerConcurrently
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list)
{
// 得到消息體
String body = new String(msg.getBody());
JSONObject userJson = JSONObject.parseObject(body);
TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class);
// 處理具體的業(yè)務(wù)邏輯,,,,,
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("消費(fèi)時間:{}", format.format(new Date()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
這種方式相比定時任務(wù)好了很多,但是有一個致命的缺點(diǎn),就是延遲等級只有18種(商業(yè)版本支持自定義時間),如果我們想把關(guān)閉訂單時間設(shè)置在15分鐘該如何處理呢?顯然不夠靈活。
三、rabbitmq死信隊(duì)列的方式
Rabbitmq本身是沒有延遲隊(duì)列的,只能通過Rabbitmq本身隊(duì)列的特性來實(shí)現(xiàn),想要Rabbitmq實(shí)現(xiàn)延遲隊(duì)列,需要使用Rabbitmq的死信交換機(jī)(Exchange)和消息的存活時間TTL(Time To Live)
死信交換機(jī) 一個消息在滿足如下條件下,會進(jìn)死信交換機(jī),記住這里是交換機(jī)而不是隊(duì)列,一個交換機(jī)可以對應(yīng)很多隊(duì)列。
一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊(duì)列里,被其他消費(fèi)者使用。 上面的消息的TTL到了,消息過期了。
隊(duì)列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。 死信交換機(jī)就是普通的交換機(jī),只是因?yàn)槲覀儼堰^期的消息扔進(jìn)去,所以叫死信交換機(jī),并不是說死信交換機(jī)是某種特定的交換機(jī)
消息TTL(消息存活時間) 消息的TTL就是消息的存活時間。RabbitMQ可以對隊(duì)列和消息分別設(shè)置TTL。對隊(duì)列設(shè)置就是隊(duì)列沒有消費(fèi)者連著的保留時間,也可以對每一個單獨(dú)的消息做單獨(dú)的設(shè)置。超過了這個時間,我們認(rèn)為這個消息就死了,稱之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會取值較小的。所以一個消息如果被路由到不同的隊(duì)列中,這個消息死亡的時間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
可以通過設(shè)置消息的expiration字段或者x-message-ttl屬性來設(shè)置時間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫個int類型的字符串:當(dāng)上面的消息扔到隊(duì)列中后,過了60秒,如果沒有被消費(fèi),它就死了。不會被消費(fèi)者消費(fèi)到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會被刪除和釋放,它會被統(tǒng)計到隊(duì)列的消息數(shù)中去
處理流程圖
創(chuàng)建交換機(jī)(Exchanges)和隊(duì)列(Queues)
創(chuàng)建死信交換機(jī)
如圖所示,就是創(chuàng)建一個普通的交換機(jī),這里為了方便區(qū)分,把交換機(jī)的名字取為:delay
創(chuàng)建自動過期消息隊(duì)列 這個隊(duì)列的主要作用是讓消息定時過期的,比如我們需要2小時候關(guān)閉訂單,我們就需要把消息放進(jìn)這個隊(duì)列里面,把消息過期時間設(shè)置為2小時
創(chuàng)建一個一個名為delay_queue1的自動過期的隊(duì)列,當(dāng)然圖片上面的參數(shù)并不會讓消息自動過期,因?yàn)槲覀儾]有設(shè)置x-message-ttl參數(shù),如果整個隊(duì)列的消息有消息都是相同的,可以設(shè)置,這里為了靈活,所以并沒有設(shè)置,另外兩個參數(shù)x-dead-letter-exchange代表消息過期后,消息要進(jìn)入的交換機(jī),這里配置的是delay,也就是死信交換機(jī),x-dead-letter-routing-key是配置消息過期后,進(jìn)入死信交換機(jī)的routing-key,跟發(fā)送消息的routing-key一個道理,根據(jù)這個key將消息放入不同的隊(duì)列
創(chuàng)建消息處理隊(duì)列 這個隊(duì)列才是真正處理消息的隊(duì)列,所有進(jìn)入這個隊(duì)列的消息都會被處理
消息隊(duì)列的名字為delay_queue2 消息隊(duì)列綁定到交換機(jī) 進(jìn)入交換機(jī)詳情頁面,將創(chuàng)建的2個隊(duì)列(delayqueue1和delayqueue2)綁定到交換機(jī)上面
自動過期消息隊(duì)列的routing key 設(shè)置為delay 綁定delayqueue2
delayqueue2 的key要設(shè)置為創(chuàng)建自動過期的隊(duì)列的x-dead-letter-routing-key參數(shù),這樣當(dāng)消息過期的時候就可以自動把消息放入delay_queue2這個隊(duì)列中了 綁定后的管理頁面如下圖:
當(dāng)然這個綁定也可以使用代碼來實(shí)現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺來操作 發(fā)送消息
String msg = "hello word";
MessageProperties messageProperties = newMessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = newMessage(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);
設(shè)置了讓消息6秒后過期 注意:因?yàn)橐屜⒆詣舆^期,所以一定不能設(shè)置delay_queue1的監(jiān)聽,不能讓這個隊(duì)列里面的消息被接受到,否則消息一旦被消費(fèi),就不存在過期了
接收消息 接收消息配置好delay_queue2的監(jiān)聽就好了
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
publicclassDelayQueue{
/** 消息交換機(jī)的名字*/
publicstaticfinalString EXCHANGE = "delay";
/** 隊(duì)列key1*/
publicstaticfinalString ROUTINGKEY1 = "delay";
/** 隊(duì)列key2*/
publicstaticfinalString ROUTINGKEY2 = "delay_key";
/**
* 配置鏈接信息
* @return
*/
@Bean
publicConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置
return connectionFactory;
}
/**
* 配置消息交換機(jī)
* 針對消費(fèi)者配置
FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無routingkey的概念
HeadersExchange :通過添加屬性key-value匹配
DirectExchange:按照routingkey分發(fā)到指定隊(duì)列
TopicExchange:多關(guān)鍵字匹配
*/
@Bean
publicDirectExchange defaultExchange() {
returnnewDirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息隊(duì)列2
* 針對消費(fèi)者配置
* @return
*/
@Bean
publicQueue queue() {
returnnewQueue("delay_queue2", true); //隊(duì)列持久
}
/**
* 將消息隊(duì)列2與交換機(jī)綁定
* 針對消費(fèi)者配置
* @return
*/
@Bean
@Autowired
publicBinding binding() {
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的監(jiān)聽,這個監(jiān)聽會接受消息隊(duì)列1的消息
* 針對消費(fèi)者配置
* @return
*/
@Bean
@Autowired
publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)
container.setMessageListener(newChannelAwareMessageListener() {
publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : "+ newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費(fèi)
}
});
return container;
}
}
這種方式可以自定義進(jìn)入死信隊(duì)列的時間;是不是很完美,但是有的小伙伴的情況是消息中間件就是rocketmq,公司也不可能會用商業(yè)版,怎么辦?那就進(jìn)入下一節(jié)
四、時間輪算法
(1)創(chuàng)建環(huán)形隊(duì)列,例如可以創(chuàng)建一個包含3600個slot的環(huán)形隊(duì)列(本質(zhì)是個數(shù)組)
(2)任務(wù)集合,環(huán)上每一個slot是一個Set 同時,啟動一個timer,這個timer每隔1s,在上述環(huán)形隊(duì)列中移動一格,有一個Current Index指針來標(biāo)識正在檢測的slot。
Task結(jié)構(gòu)中有兩個很重要的屬性: (1)Cycle-Num:當(dāng)Current Index第幾圈掃描到這個Slot時,執(zhí)行任務(wù) (2)訂單號,要關(guān)閉的訂單號(也可以是其他信息,比如:是一個基于某個訂單號的任務(wù))
假設(shè)當(dāng)前Current Index指向第0格,例如在3610秒之后,有一個訂單需要關(guān)閉,只需: (1)計算這個訂單應(yīng)該放在哪一個slot,當(dāng)我們計算的時候現(xiàn)在指向1,3610秒之后,應(yīng)該是第10格,所以這個Task應(yīng)該放在第10個slot的Set中 (2)計算這個Task的Cycle-Num,由于環(huán)形隊(duì)列是3600格(每秒移動一格,正好1小時),這個任務(wù)是3610秒后執(zhí)行,所以應(yīng)該繞3610/3600=1圈之后再執(zhí)行,于是Cycle-Num=1
Current Index不停的移動,每秒移動到一個新slot,這個slot中對應(yīng)的Set,每個Task看Cycle-Num是不是0: (1)如果不是0,說明還需要多移動幾圈,將Cycle-Num減1 (2)如果是0,說明馬上要執(zhí)行這個關(guān)單Task了,取出訂單號執(zhí)行關(guān)單(可以用單獨(dú)的線程來執(zhí)行Task),并把這個訂單信息從Set中刪除即可。 (1)無需再輪詢?nèi)坑唵危矢?(2)一個訂單,任務(wù)只執(zhí)行一次 (3)時效性好,精確到秒(控制timer移動頻率可以控制精度)
五、redis過期監(jiān)聽
1.修改redis.windows.conf配置文件中notify-keyspace-events的值 默認(rèn)配置notify-keyspace-events的值為 "" 修改為 notify-keyspace-events Ex 這樣便開啟了過期事件
2. 創(chuàng)建配置類RedisListenerConfig(配置RedisMessageListenerContainer這個Bean)
package com.zjt.shop.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisListenerConfig {
@Autowired
private RedisTemplate redisTemplate;
/**
* @return
*/
@Bean
public RedisTemplate redisTemplateInit() {
// key序列化
redisTemplate.setKeySerializer(new StringRedisSerializer());
//val實(shí)例化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
3.繼承KeyExpirationEventMessageListener創(chuàng)建redis過期事件的監(jiān)聽類
package com.zjt.shop.common.util;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.zjt.shop.modules.order.service.OrderInfoService;
import com.zjt.shop.modules.product.entity.OrderInfoEntity;
import com.zjt.shop.modules.product.mapper.OrderInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Autowired
private OrderInfoMapper orderInfoMapper;
/**
* 針對redis數(shù)據(jù)失效事件,進(jìn)行數(shù)據(jù)處理
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String key = message.toString();
//從失效key中篩選代表訂單失效的key
if (key != null && key.startsWith("order_")) {
//截取訂單號,查詢訂單,如果是未支付狀態(tài)則為-取消訂單
String orderNo = key.substring(6);
QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("order_no",orderNo);
OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper);
if (orderInfo != null) {
if (orderInfo.getOrderState() == 0) { //待支付
orderInfo.setOrderState(4); //已取消
orderInfoMapper.updateById(orderInfo);
log.info("訂單號為【" + orderNo + "】超時未支付-自動修改為已取消狀態(tài)");
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("【修改支付訂單過期狀態(tài)異常】:" + e.getMessage());
}
}
}
4:測試 通過redis客戶端存一個有效時間為3s的訂單:
結(jié)果:
原文鏈接:https://juejin.cn/post/6987233263660040206
相關(guān)推薦
- 2023-10-30 springboot application 常用配置
- 2022-12-12 flutter?Bloc?實(shí)現(xiàn)原理示例解析_Android
- 2022-04-22 C#基于WinForm實(shí)現(xiàn)串口通訊_C#教程
- 2022-04-09 SpringBoot 安全漏洞之SQL注入解決方案
- 2022-06-11 sql?server排查死鎖優(yōu)化性能_MsSql
- 2022-03-12 Android導(dǎo)航欄功能項(xiàng)的顯示與屏蔽介紹_Android
- 2022-11-28 iOS中NSThread使用示例詳解_IOS
- 2022-07-17 oracle數(shù)據(jù)庫去除重復(fù)數(shù)據(jù)常用的方法總結(jié)_oracle
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支