日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

定時任務和延時任務詳解

作者:望京小哥 更新時間: 2022-07-22 編程語言

場景

1、訂單成功后,在30分鐘內沒有支付,自動取消訂單

2、外賣平臺發送訂餐通知,下單成功后60s給用戶推送短信。

3、如果訂單一直處于某一個未完結狀態時,及時處理關單,并退還庫存

4、淘寶新建商戶一個月內還沒上傳商品信息,將凍結商鋪等

解決方案

1、DelayQueue 延時隊列

代碼dome

package com.study.base.delayMessage;

import org.apache.commons.lang3.time.DateFormatUtils;

import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueue_JDKVesion {

    // data  prepare 
    static class DalayData implements Delayed{

        private long times;

        public DalayData(long times){
            this.times = times;
        }

        /**
         * 需實現最低維度為:TimeUnit.NANOSECONDS 級別的剩余時間
         *
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return times - System.nanoTime();
        }

        @Override
        public int compareTo(Delayed o) {
            DalayData od = (DalayData) o;
            return this.times - od.times < -1L ? -1 : this.times - od.times == 0 ? 0 : 1;
        }
    }

    // test
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DalayData> delayQueue = new DelayQueue();
        delayQueue.add(new DalayData(System.nanoTime()));
        delayQueue.add(new DalayData(System.nanoTime() + 5L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 10L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 15L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 20L * 1000 * 1000 * 1000));
        delayQueue.add(new DalayData(System.nanoTime() + 25L * 1000 * 1000 * 1000));

        while (true){
            DalayData dalayData = delayQueue.take();
            System.out.println(DateFormatUtils.format(new Date(), DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT.getPattern()));
        }
    }




}

流程分析

準備數據列隊數據 implements Delayed

(列隊為優先級列表,到期的放前面),核心方法:delayQueue.take() 阻塞式獲取數據

  • 若隊列沒有元素,進入條件隊列Condition,wait方法,線程park掛起

  • 若隊列有元素,獲取隊列peek首個數據,若此個元素時間<=0,poll處理,若>0,加入條件隊列Condition,休息awaitNanos(delay)

  • 若此時新增數據是最小的元素,條件隊列Condition喚醒第一個線程,來嘗試獲取數據

流程圖

image-20220721192513171

優缺點

總體看,沒啥缺點,每個節點,環節,都無所謂的性能浪費

場景限定:jvm中,可能數據流失

2、定時任務 + mysql

優缺點

解決單jvm問題

性能不高

適合小數據量

定時任務 + redis(zset)

優缺點

Queue放在公共節點上,解決單jvm問題

若數據不多,浪費節點對queue空拉取

時效誤差,定時任務間隔

相對(定時任務 + mysql)原理一樣,體現redis 和 mysql 特性和性能問題

3、netty 時間輪延遲隊列

代碼dome

@Test
    @org.junit.jupiter.api.Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
    public void testTimerOverflowWheelLength() throws InterruptedException {
        final HashedWheelTimer timer = new HashedWheelTimer(
            Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 32);
        final CountDownLatch latch = new CountDownLatch(3);

        timer.newTimeout(new TimerTask() {
            @Override
            public void run(final Timeout timeout) throws Exception {
                timer.newTimeout(this, 100, TimeUnit.MILLISECONDS);
                latch.countDown();
            }
        }, 100, TimeUnit.MILLISECONDS);

        latch.await();
        assertFalse(timer.stop().isEmpty());
    }

流程分析

創建HashedWheelTimer,核心4個元素(任務thread工廠,間隔時間,間隔時間單位,數組長度)

addTimeOut(添加任務),(若這是第一個任務,HashedWheelTimer還未啟動,任務啟動,懶加載的方式啟動)

HashedWheelTimer 任務啟動后,有個work線程,一直輪詢時間輪的數據,獲取到expire任務放到線程池處理

流程圖

image-20220721192341309

優缺點

和jdk 的DelayQueue 比較類似,差別不大,設計思路和netty處理reactor網絡模型主從模式思路比較類似

  • 數據結構: 數據 + 鏈表 vs 完全二叉樹 (查詢,新增,效率要高點)

  • 獲取任務的方式:固定時間輪詢到下個節點 vs 完全準時,無時差 (沒有jdkDelayQueue 效率高,有數據執行誤差,即間隔時間)

3、rocketMQ官方 延遲消息

代碼dome

public class ScheduledMessageProducer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
         // Shutdown producer after use.
         producer.shutdown();
     }
        
 }

流程分析

定時消息(延遲隊列)是指消息發送到broker后,不會立即被消費,等待特定時間投遞給真正的topic。 broker有配置項messageDelayLevel,默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。可以配置自定義messageDelayLevel。注意,messageDelayLevel是broker的屬性,不屬于某個topic。發消息時,設置delayLevel等級即可:msg.setDelayLevel(level)。level有以下三種情況:

  • level == 0,消息為非延遲消息

  • 1<=level<=maxLevel,消息延遲特定時間,例如level==1,延遲1s

  • level > maxLevel,則level== maxLevel,例如level==20,延遲2h

定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個queue只存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。

需要注意的是,定時消息會在第一次寫入和調度寫入真實topic時都會計數,因此發送數量、tps都會變高。

細節待更新。。。

3、中間件延遲消息【neptune】

RocketMQ 遷移,延遲消息升級指南

neptune是基于rocketmq的延遲消息服務,主要為解決rocketmq無法發送任意時間段的消息。

  1. 支持任意時間(看硬盤情況。默認90天)、精確到秒的的延遲消息。

  2. grpc的外部API,可用于直接存儲延遲消息,主要用于發送rocketmq失敗后異步的重試. 消息查詢等。

  3. 存儲基于rocksdb

  4. 支持HA,主背復制,主背切換

  5. 監控指標直接report到influxdb和log中,采用micrometer,可以很方便到report到別的時序數據庫中

推薦使用RocketMQ的延時隊列,功能強大,但是也要看具體場景

原文鏈接:https://blog.csdn.net/leige07112033/article/details/125919093

欄目分類
最近更新