網站首頁 編程語言 正文
日志監控系統
Nginx(日志文件) -> log_process (實時讀取解析寫入) -> influxdb(存儲) ->grafana(前端日志展示器)
influxdb 屬于GO語言編寫的開源的時序型數據,著力于高性能 查詢與存儲時序型數據,influxdb 廣泛的應用于存儲系統的監控數據,IOT行業的實時數據。
目前市面上流行 TSDB(時序型處理數據庫):influxDB, TimescaleDB, QuestDBinfluxDB 類似于NOSQL體驗,自動適合標記集模型的技術的數據集;TimescaleDB 與 postgreSQL 兼容, 更加適合物聯網數據,與PostgreSQL更好的兼容QuestDB: 支持InfluxDB內聯協議和PostgreSQL, 但是生態問題比較大
項目簡答介紹
本日志系統 DEMO,但是可以直接使用到生產環境上面,使用LOG_Process 讀取Nginx ./Access.log, 使用influxDB 進行存取
log_process -path ./access.log influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s
常見并發模型
- 解決C10k 的問題 采用異步非阻塞的模型(Nginx, libevent, NodeJS)-- 問題 復雜度高 大量回調函數
- 協程(Go,Erlang, lua): 協線性函數一樣寫代碼;理解根加輕量級別的線程
- 程序并行執行 go foo() // 執行函數
- mgs:= <- c 多個gorountine 需要進行通信
- select 從多個channel 中讀取數據 ,多個 channel 隨機選擇一個進行消費?
- 并發: 一個任務通過調度器讓任務看起來運行 屬于單核CPU(邏輯運行)對于IO密集型比較友好
- 并行:任務真正的運行
在go 語言中 并發執行 ,使用三個不同 gorountine, 一個負責裝填,一個負責運輸,一個負責處理 ,讓程序并發的運行起來,讓任務更加的職責單一化 這種思想 也可以將 日志解析讀取,寫入模塊進行單獨小模塊,每個模塊讓使用gorountine ,通過channel 數據交互,至于這么多gorountine 是在一個CPU調度執行還是分配到多個CPU上進行執行 ,取決于系統.
go 語言有自己的調度器, go fun() 屬于一個獨立的工作單元,go的調度器,根據每個可用的物理處理器分配一個邏輯處理器,通過這個邏輯處理器對 獨立單元進行處理,
通過設置: runtime.GOMAXPROCS(1)//給調度器分配多小個具體的邏輯處理器
一臺服務器的 物理處理器越多 ,go 獲取到邏輯處理器也越多,導致器允許速度越快。 參考:傳送門
系統架構
日志解析的基本流程化的偽函數,如下的函數有兩個缺陷,解析介入和解析后輸出只能寫死,所以需要進行擴展,接口方式進行擴展
package main import ( "fmt" "strings" "time" ) /** * 日志解析系統分為: 解析,讀取,寫入 */ type LogProcess struct { path string // 讀取文件路徑 influxDBDsn string // influx data source rc chan string // read module to process wc chan string // process to influx } // 返回函數使用 指針, 結構體很大 不需要進行拷貝 性能優化 func (l *LogProcess) ReadFromFile() { // 文件讀取模塊 line := "message" l.rc <- line } func (l *LogProcess) Process() { // 文件解析模塊 data := <-l.rc l.wc <- strings.ToUpper(data) } func (l *LogProcess) writeToInfluxDB() { fmt.Println(<-l.wc) } func main() { // lp 引用類型 lp := &LogProcess{ path: "./tmp/access.log", influxDBDsn: "username&password...", rc: make(chan string), wc: make(chan string), } // tree goroutine run go lp.ReadFromFile() go lp.Process() // 需要定義 chan 將 Process 數據 傳遞給 influxDB go lp.writeToInfluxDB() time.Sleep(2 * time.Second) }
接口方式約束 輸入和輸出 進行優化
package main import ( "fmt" "strings" "time" ) /** * 日志解析系統分為: 解析,讀取,寫入 */ type LogProcess struct { rc chan string // read module to process wc chan string // process to influx read Read write Writer } func (l *LogProcess) Process() { // 文件解析模塊 data := <-l.rc l.wc <- strings.ToUpper(data) } type Writer interface { writer(wc chan string) } type WriteToInfluxDB struct { influxDBDsn string // influx data source } func (w *WriteToInfluxDB) writer(wc chan string) { fmt.Println(<-wc) } type Read interface { read(rc chan string) } type ReadFromFile struct { path string // 讀取文件 } func (r *ReadFromFile) read(rc chan string) { // 讀取模塊 line := "message" rc <- line } func main() { // lp 引用類型 r := &ReadFromFile{ path: "./tmp/access.log", } w := &WriteToInfluxDB{ influxDBDsn: "username&password"} lp := &LogProcess{ rc: make(chan string), wc: make(chan string), read: r, write: w, } // 通過接口方式 約束其功能 go lp.read.read(lp.rc) go lp.Process() go lp.write.writer(lp.wc) // 通過參數注入方式 time.Sleep(2 * time.Second) }
讀取模塊具體實現
從上次讀取光標后開始逐行進行讀取,無需每次都全部文件讀取
package main import ( "bufio" "fmt" "io" "os" "strings" "time" ) /** * 日志解析系統分為: 解析,讀取,寫入 */ type LogProcess struct { rc chan []byte // read module to process wc chan string // process to influx read Read write Writer } func (l *LogProcess) Process() { // 文件解析模塊 for v := range l.rc { l.wc <- strings.ToUpper(string(v)) } } type Writer interface { writer(wc chan string) } type WriteToInfluxDB struct { influxDBDsn string // influx data source } func (w *WriteToInfluxDB) writer(wc chan string) { // wc 通道另外一種讀取方式 for x := range wc { fmt.Println(x) } } type Read interface { read(rc chan []byte) } type ReadFromFile struct { path string // 讀取文件 } func (r *ReadFromFile) read(rc chan []byte) { // 實時系統: 從文件末尾逐行進行讀取 f, err := os.Open(r.path) if err != nil { panic(fmt.Sprintln("open file error:%s", err.Error())) } // 文件末尾最開始進行讀取 f.Seek(0, 2) rd := bufio.NewReader(f) for { line, err := rd.ReadBytes('\n') if err == io.EOF { // d讀取到文件末尾, 日志還沒有寫入 time.Sleep(500 * time.Millisecond) continue } else if err != nil { panic(fmt.Sprintln("ReadBytes error:%s", err.Error())) } rc <- line[:len(line)-1] } } func main() { // lp 引用類型 r := &ReadFromFile{ path: "H:\\code\\goprogarm\\src\\access.log", } w := &WriteToInfluxDB{ influxDBDsn: "username&password"} lp := &LogProcess{ rc: make(chan []byte), wc: make(chan string), read: r, write: w, } // 通過接口方式 約束其功能 go lp.read.read(lp.rc) go lp.Process() go lp.write.writer(lp.wc) // 通過參數注入方式 time.Sleep(100 * time.Second) }
日志解析模塊
- 沖Read Chan 中讀取每一行數據
- 正則方式提取所需要的監控數據
- 將數據寫入到influxDB
package main import ( "bufio" "fmt" "io" "log" "os" "regexp" "strconv" "time" ) /** * 日志解析系統分為: 解析,讀取,寫入 */ type LogProcess struct { rc chan []byte // read module to process wc chan *Message // process to influx read Read write Writer } //日志寫入結構體 type Message struct { TimeLocal time.Time BytesSent int Path, Method, Scheme, Status string UpstreamTime, RequestTime float64 } func (l *LogProcess) Process() { // 通過正則表達式進行解析數據 r := regexp.MustCompile(`(\s*)`) loc, _ := time.LoadLocation("Asia/shanghai") // 文件解析模塊 for v := range l.rc { ret := r.FindStringSubmatch(string(v)) if len(ret) != 13 { log.Println("FindStringSub match fail:", string(v)) continue } message := &Message{ } location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc) if err != nil { log.Println("ParseInLocation fail:", err.Error(), ret[4]) } message.TimeLocal = location // 字符串類型轉換成int atoi, err := strconv.Atoi(ret[8]) if err != nil { log.Println("strconv.Atoi fail:", err.Error(), ret[4]) } message.BytesSent = atoi l.wc <- message } } type Writer interface { writer(wc chan *Message) } type WriteToInfluxDB struct { influxDBDsn string // influx data source } func (w *WriteToInfluxDB) writer(wc chan *Message) { // wc 通道另外一種讀取方式 for x := range wc { fmt.Println(x) } } type Read interface { read(rc chan []byte) } type ReadFromFile struct { path string // 讀取文件 } func (r *ReadFromFile) read(rc chan []byte) { // 實時系統: 從文件末尾逐行進行讀取 f, err := os.Open(r.path) if err != nil { panic(fmt.Sprintf("open file error:%s\n", err.Error())) } // 文件末尾最開始進行讀取 f.Seek(0, 2) rd := bufio.NewReader(f) for { line, err := rd.ReadBytes('\n') if err == io.EOF { // d讀取到文件末尾, 日志還沒有寫入 time.Sleep(500 * time.Millisecond) continue } else if err != nil { panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error())) } rc <- line[:len(line)-1] } } func main() { // lp 引用類型 r := &ReadFromFile{ path: "H:\\code\\goprogarm\\src\\access.log", } w := &WriteToInfluxDB{ influxDBDsn: "username&password"} lp := &LogProcess{ rc: make(chan []byte), wc: make(chan *Message), read: r, write: w, } // 通過接口方式 約束其功能 go lp.read.read(lp.rc) go lp.Process() go lp.write.writer(lp.wc) // 通過參數注入方式 time.Sleep(100 * time.Second) }
原文鏈接:https://blog.csdn.net/qq_27217017/article/details/125945920
相關推薦
- 2022-08-01 Python3?中return和yield的區別_python
- 2022-03-19 淺談Go1.18中的泛型編程_Golang
- 2023-03-23 詳解python?ThreadPoolExecutor異常捕獲_python
- 2023-12-25 fiddler展示接口的響應時間
- 2022-06-25 pytorch中permute()函數用法補充說明(矩陣維度變化過程)_python
- 2024-07-15 arthas操作spring被代理目標對象命令速查
- 2022-06-26 ASP.NET?Core中間件會話狀態讀寫及生命周期示例_實用技巧
- 2022-07-22 服務器配置uWSGI+Nginx+Django
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支