日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

GoLang日志監控系統實現_Golang

作者:上后左愛 ? 更新時間: 2023-01-14 編程語言

日志監控系統

Nginx(日志文件) -> log_process (實時讀取解析寫入) -> influxdb(存儲) ->grafana(前端日志展示器)

influxdb 屬于GO語言編寫的開源的時序型數據,著力于高性能 查詢與存儲時序型數據,influxdb 廣泛的應用于存儲系統的監控數據,IOT行業的實時數據。

目前市面上流行 TSDB(時序型處理數據庫):influxDB, TimescaleDB, QuestDBinfluxDB 類似于NOSQL體驗,自動適合標記集模型的技術的數據集;TimescaleDB 與 postgreSQL 兼容, 更加適合物聯網數據,與PostgreSQL更好的兼容QuestDB: 支持InfluxDB內聯協議和PostgreSQL, 但是生態問題比較大

項目簡答介紹

本日志系統 DEMO,但是可以直接使用到生產環境上面,使用LOG_Process 讀取Nginx ./Access.log, 使用influxDB 進行存取

log_process -path ./access.log influxdsn http://127.0.0.1:8086@imooc@imoocpass@immoc@s

常見并發模型

  • 解決C10k 的問題 采用異步非阻塞的模型(Nginx, libevent, NodeJS)-- 問題 復雜度高 大量回調函數
  • 協程(Go,Erlang, lua): 協線性函數一樣寫代碼;理解根加輕量級別的線程
  • 程序并行執行 go foo() // 執行函數
  • mgs:= <- c 多個gorountine 需要進行通信
  • select 從多個channel 中讀取數據 ,多個 channel 隨機選擇一個進行消費?
  • 并發: 一個任務通過調度器讓任務看起來運行 屬于單核CPU(邏輯運行)對于IO密集型比較友好
  • 并行:任務真正的運行

在go 語言中 并發執行 ,使用三個不同 gorountine, 一個負責裝填,一個負責運輸,一個負責處理 ,讓程序并發的運行起來,讓任務更加的職責單一化 這種思想 也可以將 日志解析讀取,寫入模塊進行單獨小模塊,每個模塊讓使用gorountine ,通過channel 數據交互,至于這么多gorountine 是在一個CPU調度執行還是分配到多個CPU上進行執行 ,取決于系統.

go 語言有自己的調度器, go fun() 屬于一個獨立的工作單元,go的調度器,根據每個可用的物理處理器分配一個邏輯處理器,通過這個邏輯處理器對 獨立單元進行處理,
通過設置: runtime.GOMAXPROCS(1)//給調度器分配多小個具體的邏輯處理器
一臺服務器的 物理處理器越多 ,go 獲取到邏輯處理器也越多,導致器允許速度越快。 參考:傳送門

系統架構

日志解析的基本流程化的偽函數,如下的函數有兩個缺陷,解析介入和解析后輸出只能寫死,所以需要進行擴展,接口方式進行擴展

package main
import (
	"fmt"
	"strings"
	"time"
)
/**
* 日志解析系統分為: 解析,讀取,寫入
 */
type LogProcess struct {
	path        string      // 讀取文件路徑
	influxDBDsn string      // influx data source
	rc          chan string // read module to process
	wc          chan string // process to influx
}
// 返回函數使用 指針, 結構體很大 不需要進行拷貝 性能優化
func (l *LogProcess) ReadFromFile() {
	// 文件讀取模塊
	line := "message"
	l.rc <- line
}
func (l *LogProcess) Process() {
	// 文件解析模塊
	data := <-l.rc
	l.wc <- strings.ToUpper(data)
}
func (l *LogProcess) writeToInfluxDB() {
	fmt.Println(<-l.wc)
}
func main() {
	// lp 引用類型
	lp := &LogProcess{
		path:        "./tmp/access.log",
		influxDBDsn: "username&password...",
		rc:          make(chan string),
		wc:          make(chan string),
	}
	// tree goroutine run
	go lp.ReadFromFile()
	go lp.Process()
	// 需要定義 chan 將 Process 數據 傳遞給 influxDB
	go lp.writeToInfluxDB()
	time.Sleep(2 * time.Second)
}

接口方式約束 輸入和輸出 進行優化

package main
import (
	"fmt"
	"strings"
	"time"
)
/**
* 日志解析系統分為: 解析,讀取,寫入
 */
type LogProcess struct {
	rc    chan string // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}
func (l *LogProcess) Process() {
	// 文件解析模塊
	data := <-l.rc
	l.wc <- strings.ToUpper(data)
}
type Writer interface {
	writer(wc chan string)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
	fmt.Println(<-wc)
}
type Read interface {
	read(rc chan string)
}
type ReadFromFile struct {
	path string // 讀取文件
}
func (r *ReadFromFile) read(rc chan string) {
	// 讀取模塊
	line := "message"
	rc <- line
}
func main() {
	// lp 引用類型
	r := &ReadFromFile{
		path: "./tmp/access.log",
	}
	w := &WriteToInfluxDB{
		influxDBDsn: "username&password"}
	lp := &LogProcess{
		rc:    make(chan string),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// 通過接口方式 約束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通過參數注入方式
	time.Sleep(2 * time.Second)
}

讀取模塊具體實現

從上次讀取光標后開始逐行進行讀取,無需每次都全部文件讀取

package main
import (
	"bufio"
	"fmt"
	"io"
	"os"
	"strings"
	"time"
)
/**
* 日志解析系統分為: 解析,讀取,寫入
 */
type LogProcess struct {
	rc    chan []byte // read module to process
	wc    chan string // process to influx
	read  Read
	write Writer
}
func (l *LogProcess) Process() {
	// 文件解析模塊
	for v := range l.rc {
		l.wc <- strings.ToUpper(string(v))
	}
}
type Writer interface {
	writer(wc chan string)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
	// wc 通道另外一種讀取方式
	for x := range wc {
		fmt.Println(x)
	}
}
type Read interface {
	read(rc chan []byte)
}
type ReadFromFile struct {
	path string // 讀取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
	// 實時系統: 從文件末尾逐行進行讀取
	f, err := os.Open(r.path)
	if err != nil {
		panic(fmt.Sprintln("open file error:%s", err.Error()))
	}
	// 文件末尾最開始進行讀取
	f.Seek(0, 2)
	rd := bufio.NewReader(f)
	for {
		line, err := rd.ReadBytes('\n')
		if err == io.EOF {
			// d讀取到文件末尾, 日志還沒有寫入
			time.Sleep(500 * time.Millisecond)
			continue
		} else if err != nil {
			panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
		}
		rc <- line[:len(line)-1]
	}
}
func main() {
	// lp 引用類型
	r := &ReadFromFile{
		path: "H:\\code\\goprogarm\\src\\access.log",
	}
	w := &WriteToInfluxDB{
		influxDBDsn: "username&password"}
	lp := &LogProcess{
		rc:    make(chan []byte),
		wc:    make(chan string),
		read:  r,
		write: w,
	}
	// 通過接口方式 約束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通過參數注入方式
	time.Sleep(100 * time.Second)
}

日志解析模塊

  • 沖Read Chan 中讀取每一行數據
  • 正則方式提取所需要的監控數據
  • 將數據寫入到influxDB
package main
import (
	"bufio"
	"fmt"
	"io"
	"log"
	"os"
	"regexp"
	"strconv"
	"time"
)
/**
* 日志解析系統分為: 解析,讀取,寫入
 */
type LogProcess struct {
	rc    chan []byte // read module to process
	wc    chan *Message // process to influx
	read  Read
	write Writer
}
//日志寫入結構體
type Message struct {
	TimeLocal time.Time
	BytesSent int
	Path, Method, Scheme, Status string
	UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
	// 通過正則表達式進行解析數據
	r := regexp.MustCompile(`(\s*)`)
	loc, _ := time.LoadLocation("Asia/shanghai")
	// 文件解析模塊
	for v := range l.rc {
		ret := r.FindStringSubmatch(string(v))
		if len(ret) != 13 {
			log.Println("FindStringSub match fail:", string(v))
			continue
		}
		message := &Message{
		}
		location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
		if err != nil {
			log.Println("ParseInLocation fail:", err.Error(), ret[4])
		}
		message.TimeLocal = location
		// 字符串類型轉換成int
		atoi, err := strconv.Atoi(ret[8])
		if err != nil {
			log.Println("strconv.Atoi fail:", err.Error(), ret[4])
		}
		message.BytesSent = atoi
		l.wc <- message
	}
}
type Writer interface {
	writer(wc chan *Message)
}
type WriteToInfluxDB struct {
	influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
	// wc 通道另外一種讀取方式
	for x := range wc {
		fmt.Println(x)
	}
}
type Read interface {
	read(rc chan []byte)
}
type ReadFromFile struct {
	path string // 讀取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
	// 實時系統: 從文件末尾逐行進行讀取
	f, err := os.Open(r.path)
	if err != nil {
		panic(fmt.Sprintf("open file error:%s\n", err.Error()))
	}
	// 文件末尾最開始進行讀取
	f.Seek(0, 2)
	rd := bufio.NewReader(f)
	for {
		line, err := rd.ReadBytes('\n')
		if err == io.EOF {
			// d讀取到文件末尾, 日志還沒有寫入
			time.Sleep(500 * time.Millisecond)
			continue
		} else if err != nil {
			panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
		}
		rc <- line[:len(line)-1]
	}
}
func main() {
	// lp 引用類型
	r := &ReadFromFile{
		path: "H:\\code\\goprogarm\\src\\access.log",
	}
	w := &WriteToInfluxDB{
		influxDBDsn: "username&password"}
	lp := &LogProcess{
		rc:    make(chan []byte),
		wc:    make(chan *Message),
		read:  r,
		write: w,
	}
	// 通過接口方式 約束其功能
	go lp.read.read(lp.rc)
	go lp.Process()
	go lp.write.writer(lp.wc)
	// 通過參數注入方式
	time.Sleep(100 * time.Second)
}

原文鏈接:https://blog.csdn.net/qq_27217017/article/details/125945920

欄目分類
最近更新