網站首頁 編程語言 正文
前言
golang實現定時任務很簡單,只須要簡單幾步代碼即可以完成,最近在做了幾個定時任務,想研究一下它內部是怎么實現的,所以將源碼過了一遍,記錄和分享在此。需要的朋友可以參考以下內容,希望對大家有幫助。
關于go cron是如何使用的可以參考之前的文章:一文帶你入門Go語言中定時任務庫Cron的使用
Demo示例
package main import ( "fmt" "github.com/robfig/cron/v3" ) func main() { // 創建一個默認的cron對象 c := cron.New() //添加執行任務 c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("@hourly", func() { fmt.Println("Every hour, starting an hour from now") }) c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") }) //開始執行任務 c.Start() select {} //阻塞 }
通過上面的示例,可以發現, cron 最常用的幾個函數:
- New(): 實例化一個 cron 對象。
- Cron.AddFunc(): 向 Cron 對象中添加一個作業,接受兩個參數,第一個是 cron 表達式,第二個是一個無參無返回值的函數(作業)。
- Cron.Stop(): 停止調度,Stop 之后不會再有未執行的作業被喚醒,但已經開始執行的作業不會受影響。
源碼實現
在了解其整體邏輯的實現過程前,先了解兩個重要的結構體Entry
和Cron
:
位置在/robfig/cron/cron.go
。
結構體 Cron 和 Entry
Cron
主要負責維護所有的任務數據,調用相關的func時間指定,可以啟動、停止任務等;Entry是對添加到 Cron
中的任務的封裝,每個 Entry
有一個 ID
,除此之外,Entry
里保存了這個任務上次運行的時間和下次運行的時間。具體代碼實現如下:
// Entry 數據結構,每一個被調度實體一個 type Entry struct { // 唯一id,用于查詢和刪除 ID EntryID // 本Entry的調度時間,不是絕對時間,在生成entry時會計算出來 Schedule Schedule // 本entry下次需要執行的絕對時間,會一直被更新 // 被封裝的含義是Job可以多層嵌套,可以實現基于需要執行Job的額外處理 // 比如抓取Job異常、如果Job沒有返回下一個時間點的Job是還是繼續執行還是delay Next time.Time // 上一次被執行時間,主要用來查詢 Prev time.Time // WrappedJob 是真實執行的Job實體 WrappedJob Job // Job 主要給用戶查詢 Job Job } // Cron保持任意數量的任務的軌道,調用相關的func時間表指定。它可以被啟動,停止,可運行的同時進行檢查。 type Cron struct { entries []*Entry // 保存了所有加入到 Cron 的任務 // chain 用來定義entry里的warppedJob使用什么邏輯(e.g. skipIfLastRunning) // 即一個cron里所有entry只有一個封裝邏輯 chain Chain stop chan struct{} // 停止整個cron的channel add chan *Entry // 增加一個entry的channel remove chan EntryID // 移除一個entry的channel snapshot chan chan []Entry // 獲取entry整體快照的channel running bool // 代表是否已經在執行,是cron為使用者提供的動態修改entry的接口準備的 logger Logger // 封裝golang的log包 runningMu sync.Mutex // 用來修改運行中的cron數據,比如增加entry,移除entry location *time.Location // 地理位置 parser ScheduleParser // 對時間格式的解析,為interface, 可以定制自己的時間規則。 nextID EntryID // entry的全局ID,新增一個entry就加1 jobWaiter sync.WaitGroup // run job時會進行add(1), job 結束會done(),stop整個cron,以此保證所有job都能退出 }
New()實現
cron.go
中的New()
方法用來創建并返回一個Cron
對象指針,其實現如下:
func New(opts ...Option) *Cron { c := &Cron{ entries: nil, chain: NewChain(), add: make(chan *Entry), stop: make(chan struct{}), snapshot: make(chan chan []Entry), remove: make(chan EntryID), running: false, runningMu: sync.Mutex{}, logger: DefaultLogger, location: time.Local, parser: standardParser, } for _, opt := range opts { opt(c) } return c }
AddFunc()實現
AddFunc()
用于向Corn
中添加一個任務,AddFunc()中將func
包裝成 Job
類型然后調用AddJob()
,AddFunc()
相較于 AddJob()
幫用戶省去了包裝成 Job 類型的一步,在 AddJob()
中,調用了 standardParser.Parse()
將 cron
表達式解釋成了 schedule
類型,最終,他們調用了 Schedule()
方法;其代碼實現如下:
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { return c.AddJob(spec, FuncJob(cmd)) //包裝成job類型然后調用AddJob()方法 } func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { schedule, err := c.parser.Parse(spec) //將cron表達式解析成schedule類型 if err != nil { return 0, err } return c.Schedule(schedule, cmd), nil //調用Schedule() } func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { c.runningMu.Lock() //為了保證線程安全,加鎖 defer c.runningMu.Unlock() c.nextID++ //下一EntryID entry := &Entry{ ID: c.nextID, Schedule: schedule, WrappedJob: c.chain.Then(cmd), Job: cmd, } // Cron是否處于運行狀態 if !c.running { c.entries = append(c.entries, entry) // 追加到entries列表中 } else { c.add <- entry // 發送到Cron的add chan } return entry.ID }
Schedule()
這個方法負責創建 Entry
結構體,并把它追加到 Cron
的 entries
列表中,如果 Cron
已經處于運行狀態,會將這個創建好的 entry
發送到 Cron
的 add chan
中,在run()
中會處理這種情況。
Start()實現
Start()
用于開始執行 Cron
,其代碼實現如下:
func (c *Cron) Start() { c.runningMu.Lock() // 獲取鎖 defer c.runningMu.Unlock() if c.running { return } c.running = true // 將 c.running 置為 true 表示 cron 已經在運行中了 go c.run() //開啟一個 goroutine 執行 c.run() }
通過上面的代碼,可以看到主要干了這么幾件事:
- 獲取鎖,保證線程安全。
- 判斷
cron
是否已經在運行中,如果是則直接返回,否則將c.running
置為true
表示cron
已經在運行中了。 - 開啟一個
goroutine
執行c.run()
。
Run()實現
Run()
是整個cron
的一個核心,它負責處理cron
開始執行后的大部分事情, run
中會一直輪循c.entries
中的entry
, 如果一個entry
允許執行了,就會開啟單獨的goroutine
去執行這個任務。
// run the scheduler.. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run() { c.logger.Info("start") // Figure out the next activation times for each entry. now := c.now() for _, entry := range c.entries { entry.Next = entry.Schedule.Next(now) c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next) } for { // Determine the next entry to run. // 將定時任務執行時間進行排序,最近最早執行的放在前面 sort.Sort(byTime(c.entries)) var timer *time.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. timer = time.NewTimer(100000 * time.Hour) } else { // 生成一個定時器,距離最近的任務時間到時 觸發定時器的channel,發送通知 timer = time.NewTimer(c.entries[0].Next.Sub(now)) } for { select { // 定時時間到了,執行定時任務,并設置下次執行的時刻 case now = <-timer.C: now = now.In(c.location) c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now //對每個定時任務嘗試執行 for _, e := range c.entries { if e.Next.After(now) || e.Next.IsZero() { break } c.startJob(e.WrappedJob) e.Prev = e.Next e.Next = e.Schedule.Next(now) c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } //新增的定時任務添加到 任務列表中 case newEntry := <-c.add: timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) //獲取 當前所有定時任務(快照) case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() continue //停止定時任務,timer停止即可完成此功能 case <-c.stop: timer.Stop() c.logger.Info("stop") return //刪除某個定時任務 case id := <-c.remove: timer.Stop() now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) } break } } }
Stop()實現
Stop()
用來停止Cron
的運行,但已經在執行中的作業是不會被打斷的,也就是從執行 Stop()
之后,不會再有新的任務被調度:
func (c *Cron) Stop() context.Context { c.runningMu.Lock() defer c.runningMu.Unlock() if c.running { c.stop <- struct{}{} // 會發出一個 stop 信號 c.running = false } ctx, cancel := context.WithCancel(context.Background()) go func() { // 等待所有已經在執行的任務執行完畢 c.jobWaiter.Wait() // 會發出一個 cancelCtx.Done() 信號 cancel() }() return ctx }
Remove()實現
Remove() 用于移除一個任務:
func (c *Cron) Remove(id EntryID) { c.runningMu.Lock() defer c.runningMu.Unlock() if c.running { c.remove <- id // 會發出一個 remove 信號 } else { c.removeEntry(id) } } func (c *Cron) removeEntry(id EntryID) { var entries []*Entry for _, e := range c.entries { if e.ID != id { entries = append(entries, e) } } c.entries = entries }
小結
到此這篇關于Golang Cron 定時任務的內部實現的文章就介紹到這了, 其中重點如下:
在Go Cron
內部維護了兩個結構體Cron
和Entry
,用于維護任務數據,cron.Start()
執行后,cron
的后臺程序c.Run()
就開始執行了,Run()
是整個cron
的一個核心,它負責處理cron
開始執行后的大部分事情, run
中會一直輪循c.entries
中的entry
, 每個entry都包含自己下一次執行的絕對時間,如果一個entry
允許執行了,就會開啟單獨的goroutine
去執行這個任務。
原文鏈接:https://juejin.cn/post/7155769581255000101
相關推薦
- 2022-10-17 React報錯信息之Expected?an?assignment?or?function?call?
- 2022-08-15 python?time模塊時間戳?與?結構化時間詳解_python
- 2022-10-20 C++淺析虛函數使用方法_C 語言
- 2022-06-17 C語言詳解函數與指針的使用_C 語言
- 2022-09-27 使用Python?matplotlib繪制簡單的柱形圖、折線圖和直線圖_python
- 2023-07-05 React解決setState異步帶來的多次修改合一和修改后立即使用沒有變化問題
- 2022-10-05 redis?哨兵集群搭建的實現_Redis
- 2022-10-12 Docker安裝RabbitMQ的超詳細步驟_docker
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支