網站首頁 編程語言 正文
1. Kafka介紹
1.1 Kafka是什么
kafka使用scala開發,支持多語言客戶端(c++、java、python、go等)
Kafka最先由LinkedIn公司開發,之后成為Apache的頂級項目。
Kafka是一個分布式的、分區化、可復制提交的日志服務
LinkedIn使用Kafka實現了公司不同應用程序之間的松耦和,那么作為一個可擴展、高可靠的消息系統 支持高Throughput的應用
scale out:無需停機即可擴展機器
持久化:通過將數據持久化到硬盤以及replication防止數據丟失
支持online和offline的場景
1.2 Kafka的特點
Kafka是分布式的,其所有的構件borker(服務端集群)、producer(消息生產)、consumer(消息消費者)都可以是分布式的。
在消息的生產時可以使用一個標識topic來區分,且可以進行分區;每一個分區都是一個順序的、不可變的消息隊列, 并且可以持續的添加。
同時為發布和訂閱提供高吞吐量。據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡
1.3 常用的場景
監控:主機通過Kafka發送與系統和應用程序健康相關的指標,然后這些信息會被收集和處理從而創建監控儀表盤并發送警告。
消息隊列: 應用程度使用Kafka作為傳統的消息系統實現標準的隊列和消息的發布—訂閱,例如搜索和內容提要(Content Feed)。比起大多數的消息系統來說,Kafka有更好的吞吐量,內置的分區,冗余及容錯性,這讓Kafka成為了一個很好的大規模消息處理應用的解決方案。消息系統 一般吞吐量相對較低,但是需要更小的端到端延時,并嘗嘗依賴于Kafka提供的強大的持久性保障。在這個領域,Kafka足以媲美傳統消息系統,如ActiveMR或RabbitMQ
站點的用戶活動追蹤: 為了更好地理解用戶行為,改善用戶體驗,將用戶查看了哪個頁面、點擊了哪些內容等信息發送到每個數據中心的Kafka集群上,并通過Hadoop進行分析、生成日常報告。
流處理:保存收集流數據,以提供之后對接的Storm或其他流式計算框架進行處理。很多用戶會將那些從原始topic來的數據進行 階段性處理,匯總,擴充或者以其他的方式轉換到新的topic下再繼續后面的處理。例如一個文章推薦的處理流程,可能是先從RSS數據源中抓取文章的內 容,然后將其丟入一個叫做“文章”的topic中;后續操作可能是需要對這個內容進行清理,比如回復正常數據或者刪除重復數據,最后再將內容匹配的結果返 還給用戶。這就在一個獨立的topic之外,產生了一系列的實時數據處理的流程。
日志聚合:使用Kafka代替日志聚合(log aggregation)。日志聚合一般來說是從服務器上收集日志文件,然后放到一個集中的位置(文件服務器或HDFS)進行處理。然而Kafka忽略掉 文件的細節,將其更清晰地抽象成一個個日志或事件的消息流。這就讓Kafka處理過程延遲更低,更容易支持多數據源和分布式數據處理。比起以日志為中心的 系統比如Scribe或者Flume來說,Kafka提供同樣高效的性能和因為復制導致的更高的耐用性保證,以及更低的端到端延遲
持久性日志:Kafka可以為一種外部的持久性日志的分布式系統提供服務。這種日志可以在節點間備份數據,并為故障節點數據回復提供一種重新同步的機制。Kafka中日志壓縮功能為這種用法提供了條件。在這種用法中,Kafka類似于Apache BookKeeper項目。
1.4 Kafka中包含以下基礎概念
1.Topic(話題):Kafka中用于區分不同類別信息的類別名稱。由producer指定
2.Producer(生產者):將消息發布到Kafka特定的Topic的對象(過程)
3.Consumers(消費者):訂閱并處理特定的Topic中的消息的對象(過程)
4.Broker(Kafka服務集群):已發布的消息保存在一組服務器中,稱之為Kafka集群。集群中的每一個服務器都是一個代理(Broker). 消費者可以訂閱一個或多個話題,并從Broker拉數據,從而消費這些已發布的消息。
5.Partition(分區):Topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)
Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發布一些消息。
1.5 消息
消息由一個固定大小的報頭和可變長度但不透明的字節陣列負載。報頭包含格式版本和CRC32效驗和以檢測損壞或截斷
1.6 消息格式
1. 4 byte CRC32 of the message
2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
bit 0 ~ 2 : Compression codec
0 : no compression
1 : gzip
2 : snappy
3 : lz4
bit 3 : Timestamp type
0 : create time
1 : log append time
bit 4 ~ 7 : reserved
4. (可選) 8 byte timestamp only if "magic" identifier is greater than 0
5. 4 byte key length, containing length K
6. K byte key
7. 4 byte payload length, containing length V
8. V byte payload
2. Kafka深層介紹
2.1 架構介紹
Producer:Producer即生產者,消息的產生者,是消息的?口。
kafka cluster:kafka集群,一臺或多臺服務?組成
- Broker:Broker是指部署了Kafka實例的服務?節點。每個服務?上有一個或多個kafka的實 例,我們姑且認為每個broker對應一臺服務?。每個kafka集群內的broker都有一個不重復的 編號,如圖中的broker-0、broker-1等……
- Topic:消息的主題,可以理解為消息的分類,kafka的數據就保存在topic。在每個broker上 都可以創建多個topic。實際應用中通常是一個業務線建一個topic。
- Partition:Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞 吐量。同一個topic在不同的分區的數據是不重復的,partition的表現形式就是一個一個的?件夾!
- Replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(Leader)故障的 時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數量是10 個,且副本的數量不能大于Broker的數量,follower和leader絕對是在不同的機器,同一機 ?對同一個分區也只可能存放一個副本(包括自己)。
Consumer:消費者,即消息的消費方,是消息的出口。
- Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中同一個分 區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個 topic的不同分區的數據,這也是為了提高kafka的吞吐量!
2.2 ?作流程
我們看上?的架構圖中,producer就是生產者,是數據的入口。Producer在寫入數據的時候會把數據 寫入到leader中,不會直接將數據寫入follower!那leader怎么找呢?寫入的流程又是什么樣的呢?我 們看下圖:
1.?產者從Kafka集群獲取分區leader信息
2.?產者將消息發送給leader
3.leader將消息寫入本地磁盤
4.follower從leader拉取消息數據
5.follower將消息寫入本地磁盤后向leader發送ACK
6.leader收到所有的follower的ACK之后向生產者發送ACK
2.3 選擇partition的原則
那在kafka中,如果某個topic有多個partition,producer?怎么知道該將數據發往哪個partition呢? kafka中有幾個原則:
1.partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
2.如果沒有指定partition,但是設置了數據的key,則會根據key的值hash出一個partition。
3.如果既沒指定partition,又沒有設置key,則會采用輪詢?式,即每次取一小段時間的數據寫入某partition,下一小段的時間寫入下一個partition
2.4 ACK應答機制
producer在向kafka寫入消息的時候,可以設置參數來確定是否確認kafka接收到數據,這個參數可設置 的值為 0,1,all
- 0代表producer往集群發送數據不需要等到集群的返回,不確保消息發送成功。安全性最低但是效 率最高。
- 1代表producer往集群發送數據只要leader應答就可以發送下一條,只確保leader發送成功。
- all代表producer往集群發送數據需要所有的follower都完成從leader的同步才會發送下一條,確保 leader發送成功和所有的副本都完成備份。安全性最?高,但是效率最低。
最后要注意的是,如果往不存在的topic寫數據,kafka會?動創建topic,partition和replication的數量 默認配置都是1。
2.5 Topic和數據?志
topic 是同?類別的消息記錄(record)的集合。在Kafka中,?個主題通常有多個訂閱者。對于每個 主題,Kafka集群維護了?個分區數據?志?件結構如下:
每個partition都是?個有序并且不可變的消息記錄集合。當新的數據寫?時,就被追加到partition的末 尾。在每個partition中,每條消息都會被分配?個順序的唯?標識,這個標識被稱為offset,即偏移 量。注意,Kafka只保證在同?個partition內部消息是有序的,在不同partition之間,并不能保證消息 有序。
Kafka可以配置?個保留期限,?來標識?志會在Kafka集群內保留多?時間。Kafka集群會保留在保留 期限內所有被發布的消息,不管這些消息是否被消費過。?如保留期限設置為兩天,那么數據被發布到 Kafka集群的兩天以內,所有的這些數據都可以被消費。當超過兩天,這些數據將會被清空,以便為后 續的數據騰出空間。由于Kafka會將數據進?持久化存儲(即寫?到硬盤上),所以保留的數據??可 以設置為?個?較?的值。
2.6 Partition結構
Partition在服務器上的表現形式就是?個?個的?件夾,每個partition的?件夾下?會有多組segment ?件,每組segment?件?包含 .index ?件、 .log ?件、 .timeindex ?件三個?件,其中 .log ? 件就是實際存儲message的地?,? .index 和 .timeindex ?件為索引?件,?于檢索消息。
2.7 消費數據
多個消費者實例可以組成?個消費者組,并??個標簽來標識這個消費者組。?個消費者組中的不同消 費者實例可以運?在不同的進程甚?不同的服務器上。
如果所有的消費者實例都在同?個消費者組中,那么消息記錄會被很好的均衡的發送到每個消費者實 例。
如果所有的消費者實例都在不同的消費者組,那么每?條消息記錄會被?播到每?個消費者實例。
舉個例?,如上圖所示?個兩個節點的Kafka集群上擁有?個四個partition(P0-P3)的topic。有兩個 消費者組都在消費這個topic中的數據,消費者組A有兩個消費者實例,消費者組B有四個消費者實例。 從圖中我們可以看到,在同?個消費者組中,每個消費者實例可以消費多個分區,但是每個分區最多只 能被消費者組中的?個實例消費。也就是說,如果有?個4個分區的主題,那么消費者組中最多只能有4 個消費者實例去消費,多出來的都不會被分配到分區。其實這也很好理解,如果允許兩個消費者實例同 時消費同?個分區,那么就?法記錄這個分區被這個消費者組消費的offset了。如果在消費者組中動態 的上線或下線消費者,那么Kafka集群會?動調整分區與消費者實例間的對應關系。
3. 操作Kafka
3.1 sarama
Go語言中連接kafka使用第三方庫: github.com/Shopify/sarama。
3.2 下載及安裝
go get github.com/Shopify/sarama
注意事項: sarama v1.20之后的版本加入了zstd壓縮算法,需要用到cgo,在Windows平臺編譯時會提示類似如下錯誤: github.com/DataDog/zstd exec: "gcc":executable file not found in %PATH% 所以在Windows平臺請使用v1.19版本的sarama。(如果不會版本控制請查看博客里面的go module章節)
3.3 連接kafka發送消息
package main import ( "fmt" "github.com/Shopify/sarama" ) // 基于sarama第三方庫開發的kafka client func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 發送完數據需要leader和follow都確認 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回 // 構造一個消息 msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") // 連接kafka client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 發送消息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }
3.4 連接kafka消費消息
package main import ( "fmt" "github.com/Shopify/sarama" ) // kafka consumer func main() { consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%v\n", err) return } partitionList, err := consumer.Partitions("web_log") // 根據topic取到所有的分區 if err != nil { fmt.Printf("fail to get list of partition:err%v\n", err) return } fmt.Println(partitionList) for partition := range partitionList { // 遍歷所有的分區 // 針對每個分區創建一個對應的分區消費者 pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() // 異步從每個分區消費信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } }
原文鏈接:https://juejin.cn/post/7173314677550612493
相關推薦
- 2022-06-27 nginx中配置使用proxy?protocol協議的全過程_nginx
- 2022-04-04 Python?Opencv實現圖片切割處理_python
- 2023-11-11 Flask 表單form.validate_on_submit()什么情況下會是false——解決辦
- 2022-07-02 Python列表1~n輸出步長為3的分組實例_python
- 2022-06-25 C#實現連接電子秤串口自動稱重_C#教程
- 2022-04-24 C語言字符函數中的isalnum()和iscntrl()你都知道嗎_C 語言
- 2022-07-21 vscode代碼保存時自動格式化
- 2022-09-30 Docker容器搭建本地私有倉庫詳情_docker
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支