網站首頁 編程語言 正文
操作Kafka
Kafka 是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據,具有高性能、持久化、多副本備份、橫向擴展等特點。本文介紹了如何使用 Go 語言發送和接收 kafka 消息。
sarama
Go 語言中連接 kafka 使用第三方庫:github.com/Shopify/sar…。
下載及安裝
go get github.com/Shopify/sarama
注意事項
sarama
v1.20 之后的版本加入了zstd
壓縮算法,需要用到 cgo,在 Windows 平臺編譯時會提示類似如下錯誤:
# github.com/DataDog/zstd exec: "gcc":executable file not found in %PATH%
所以在 Windows 平臺請使用 v1.19 版本的 sarama。
連接 kafka 發送消息
package main import ( "fmt" "github.com/Shopify/sarama" ) // 基于sarama第三方庫開發的kafka client func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 發送完數據需要leader和follow都確認 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回 // 構造一個消息 msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") // 連接kafka client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 發送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
連接 kafka 消費消息
package main import ( "fmt" "github.com/Shopify/sarama" ) // kafka consumer func main() { consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } partitionList, err := consumer.Partitions("web_log") // 根據topic取到所有的分區 if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } fmt.Println(partitionList) for partition := range partitionList { // 遍歷所有的分區 // 針對每個分區創建一個對應的分區消費者 pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() // 異步從每個分區消費信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } }
操作Etcd
這里使用官方的etcd/clientv3包來連接etcd并進行相關操作。
安裝
go get go.etcd.io/etcd/clientv3
put和get操作
put
命令用來設置鍵值對數據,get
命令用來根據key獲取值。
package main import ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3" ) func main(){ cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"122.51.79.172:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! fmt.Printf("connect to etcd failed, err:%v\n", err) return } fmt.Println("connect to etcd success") defer cli.Close() // put ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err = cli.Put(ctx, "coolops", "test") cancel() if err != nil { fmt.Printf("put to etcd failed, err:%v\n", err) return } // get ctx, cancel = context.WithTimeout(context.Background(), time.Second) resp, err := cli.Get(ctx, "coolops") cancel() if err != nil { fmt.Printf("get from etcd failed, err:%v\n", err) return } for _, ev := range resp.Kvs { fmt.Printf("%s:%s\n", ev.Key, ev.Value) } }
watch操作
watch
用來獲取未來更改的通知。
package main import ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3" ) func main(){ cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"122.51.79.172:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! fmt.Printf("connect to etcd failed, err:%v\n", err) return } fmt.Println("connect to etcd success") defer cli.Close() // watch 操作,返回的是一個通道 rch := cli.Watch(context.Background(), "coolops") // <-chan WatchResponse for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }
安裝報錯:
go: finding github.com/coreos/pkg latest # github.com/coreos/etcd/clientv3/balancer/resolver/endpoint E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:114:78: undefined: resolver.BuildOption E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\resolver\endpoint\endpoint.go:182:31: undefined: resolver.ResolveNowOption # github.com/coreos/etcd/clientv3/balancer/picker E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\err.go:37:44: undefined: balancer.PickOptions E:\DEV\Go\pkg\mod\github.com\coreos\etcd@v3.3.19+incompatible\clientv3\balancer\picker\roundrobin_balanced.go:55:54: undefined: balancer.PickOptions
解決: 將go.mod里的prpc改為1.26.0版本
google.golang.org/grpc v1.26.0
原文鏈接:https://juejin.cn/post/7129376877449314335
相關推薦
- 2023-06-13 C語言中求余運算符的使用解讀_C 語言
- 2022-08-02 linux?shell文件轉碼iconv命令的使用_linux shell
- 2022-10-14 scikit-learn工具包中分類模型predict_proba、predict、decision
- 2022-10-08 Python使用xlrd和xlwt實現自動化操作Excel_python
- 2022-04-18 uniapp h5去掉默認的頂部導航
- 2023-04-04 Python筆記之Scipy.stats.norm函數使用解析_python
- 2022-04-20 Android實現環信修改頭像和昵稱_Android
- 2022-02-24 Kubernetes究竟是個容器應用程序還是集群操作系統,它這么復雜的原因出在哪?
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支