網站首頁 編程語言 正文
前言
日志收集項目的準備中,本文主要講的是利用golang的tail
庫,監聽日志文件的變動,將日志信息發送到kafka中。
涉及的golang庫和可視化工具:
go-ini
,sarama
,tail
其中:
-
go-ini
:用于讀取配置文件,統一管理配置項,有利于后其的維護 -
sarama
:是一個go操作kafka的客戶端。目前我用于向kefka發送消息 -
tail
:類似于linux的tail命令了,讀取文件的后幾行。如果文件有追加數據,會檢測到。就是通過它來監聽日志文件
可視化工具:
offsetexplorer
:是kafka的可視化工具,這里用來查看消息是否投遞成功
工作的流程
- 加載配置,初始化
sarama
和kafka
。 - 起一個的協程,利用
tail
不斷去監聽日志文件的變化。 - 主協程中一直阻塞等待
tail
發送消息,兩者通過一個管道通訊。一旦主協程接收到新日志,組裝格式,然后發送到kafka中
環境準備
環境的話,確保zookeeper
和kafka
正常運行。因為還沒有使用sarama
讀取數據,使用offsetexplorer
來查看任務是否真的投遞成功了。
代碼分層
serve來存放寫tail
服務類和sarama
服務類,conf存放ini配置文件
main函數為程序入口
?
關鍵的代碼
main.go
main函數做的有:構建配置結構體,映射配置文件。調用和初始化tail
,srama
服務。
package main import ( "fmt" "sarama/serve" "github.com/go-ini/ini" ) type KafkaConfig struct { Address string `ini:"address"` ChannelSize int `ini:"chan_size"` } type TailConfig struct { Path string `ini:"path"` Filename string `ini:"fileName"` // 如果是結構體,則指明分區名 Children `ini:"tailfile.children"` } type Config struct { KafkaConfig `ini:"kafka"` TailConfig `ini:"tailfile"` } type Children struct { Name string `ini:"name"` } func main() { // 加載配置 var cfg = new(Config) err := ini.MapTo(cfg, "./conf/go-conf.ini") if err != nil { fmt.Print(err) } // 初始化kafka ks := &serve.KafukaServe{} // 啟動kafka消息監聽。異步 ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize)) // 關閉主協程時,關閉channel defer ks.Destruct() // 初始化tail ts := &serve.TailServe{} ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename) // 阻塞 ts.Listener(ks.MsgChan) }
kafka.go
有3個方法 :
-
InitKafka
,組裝配置項以及初始化接收消息的管道, -
Listener
,監聽管道消息,收到消息后,將消息組裝,發送到kafka -
Destruct
, 關閉管道
package serve import ( "fmt" "github.com/Shopify/sarama" ) type KafukaServe struct { MsgChan chan string //err error } func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) { // 讀取配置 config := sarama.NewConfig() // 1. 初始化生產者配置 config.Producer.RequiredAcks = sarama.WaitForAll // 選擇分區 config.Producer.Partitioner = sarama.NewRandomPartitioner // 成功交付的信息 config.Producer.Return.Successes = true ks.MsgChan = make(chan string, chanSize) go ks.Listener(addr, chanSize, config) } func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) { // 連接kafka var kafkaClient, _ = sarama.NewSyncProducer(addr, config) defer kafkaClient.Close() for { select { case content := <-ks.MsgChan: // msg := &sarama.ProducerMessage{ Topic: "weblog", Value: sarama.StringEncoder(content), } partition, offset, err := kafkaClient.SendMessage(msg) if err != nil { fmt.Println(err) } fmt.Println("分區,偏移量:") fmt.Println(partition, offset) fmt.Println("___") } } } func (ks *KafukaServe) Destruct() { close(ks.MsgChan) }
tail.go
主要包括了兩個方法:
-
TailInit
初始化,組裝tail
配置。Listener
-
Listener
,保存kafka
服務類初始化之后的管道。監聽日志文件,如果有新日志,就往管道里發送
package serve import ( "fmt" "github.com/hpcloud/tail" ) type TailServe struct { tails *tail.Tail } func (ts *TailServe) TailInit(filenName string) { config := tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true, } // 打開文件開始讀取數據 ts.tails, _ = tail.TailFile(filenName, config) // if err != nil { // fmt.Println("tails %s failed,err:%v\n", filenName, err) // return nil, err // } fmt.Println("啟動," + filenName + "監聽") } func (ts *TailServe) Listener(MsgChan chan string) { for { msg, ok := <-ts.tails.Lines if !ok { // todo fmt.Println("數據接收失敗") return } fmt.Println(msg.Text) MsgChan <- msg.Text } } // 測試案例 func Demo() { filename := `E:\xx.log` config := tail.Config{ ReOpen: true, Follow: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, MustExist: false, Poll: true, } // 打開文件開始讀取數據 tails, err := tail.TailFile(filename, config) if err != nil { fmt.Println("tails %s failed,err:%v\n", filename, err) return } var ( msg *tail.Line ok bool ) fmt.Println("啟動") for { msg, ok = <-tails.Lines if !ok { fmt.Println("tails file close reopen,filename:$s\n", tails.Filename) } fmt.Println("msg:", msg.Text) } }
原文鏈接:https://juejin.cn/post/7086105702057377828
相關推薦
- 2022-06-01 解決IIS不識別PUT和DELETE請求_win服務器
- 2022-01-18 正則——16進制顏色
- 2022-09-19 Tomcat配置HTTPS訪問的實現步驟_Tomcat
- 2022-05-17 解決使用maven打jar包缺失依賴包問題
- 2022-09-09 python?獲取星期字符串的實例_python
- 2022-11-26 React常見跨窗口通信方式實例詳解_React
- 2022-08-18 python編寫第一個交互程序步驟示例教程_python
- 2022-12-10 Android入門之日歷選擇與時間選擇組件的使用_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支