網站首頁 編程語言 正文
通過memberlist庫實現gossip管理集群以及集群數據交互
概述
memberlist庫的簡單用法如下,注意下面使用for循環來執行?list.Join
?,原因是一開始各節點都沒有runing,直接執行?Join
?會出現連接拒絕的錯誤。
package main import ( "fmt" "github.com/hashicorp/memberlist" "time" ) func main() { /* Create the initial memberlist from a safe configuration. Please reference the godoc for other default config types. http://godoc.org/github.com/hashicorp/memberlist#Config */ list, err := memberlist.Create(memberlist.DefaultLocalConfig()) if err != nil { panic("Failed to create memberlist: " + err.Error()) } t := time.NewTicker(time.Second * 5) for { select { case <-t.C: // Join an existing cluster by specifying at least one known member. n, err := list.Join([]string{"192.168.80.129"}) if err != nil { fmt.Println("Failed to join cluster: " + err.Error()) continue } fmt.Println("member number is:", n) goto END } } END: for { select { case <-t.C: // Ask for members of the cluster for _, member := range list.Members() { fmt.Printf("Member: %s %s\n", member.Name, member.Addr) } } } // Continue doing whatever you need, memberlist will maintain membership // information in the background. Delegates can be used for receiving // events when members join or leave. }
memberlist的兩個主要接口如下:
Create:根據入參配置創建一個?
Memberlist
?,初始化階段?Memberlist
?僅包含本節點狀態。注意此時并不會連接到其他節點,執行成功之后就可以允許其他節點加入該memberlist。-
Join:使用已有的?
Memberlist
?來嘗試連接給定的主機,并與之同步狀態,以此來加入某個cluster。執行該操作可以讓其他節點了解到本節點的存在。最后返回成功建立連接的節點數以及錯誤信息,如果沒有與任何節點建立連接,則返回錯誤。注意當join一個cluster時,至少需要指定集群中的一個已知成員,后續會通過gossip同步整個集群的成員信息。
memberlist提供的功能主要分為兩塊:維護成員狀態(gossip)以及數據同步(boardcast、SendReliable)。下面看幾個相關接口。
接口
memberlist.Create
?的入參要求給出相應的?配置?信息,?DefaultLocalConfig()
?給出了通用的配置信息,但還需要實現相關接口來實現成員狀態的同步以及用戶數據的收發。注意下面有些接口是必選的,有些則可選:
type Config struct { // ... // Delegate and Events are delegates for receiving and providing // data to memberlist via callback mechanisms. For Delegate, see // the Delegate interface. For Events, see the EventDelegate interface. // // The DelegateProtocolMin/Max are used to guarantee protocol-compatibility // for any custom messages that the delegate might do (broadcasts, // local/remote state, etc.). If you don't set these, then the protocol // versions will just be zero, and version compliance won't be done. Delegate Delegate Events EventDelegate Conflict ConflictDelegate Merge MergeDelegate Ping PingDelegate Alive AliveDelegate //... }
memberlist使用如下?類型?的消息來同步集群狀態和處理用戶消息:
const ( pingMsg messageType = iota indirectPingMsg ackRespMsg suspectMsg aliveMsg deadMsg pushPullMsg compoundMsg userMsg // User mesg, not handled by us compressMsg encryptMsg nackRespMsg hasCrcMsg errMsg )
Delegate
如果要使用memberlist的gossip協議,則必須實現該接口。所有這些方法都必須是線程安全的。
type Delegate interface { // NodeMeta is used to retrieve meta-data about the current node // when broadcasting an alive message. It's length is limited to // the given byte size. This metadata is available in the Node structure. NodeMeta(limit int) []byte // NotifyMsg is called when a user-data message is received. // Care should be taken that this method does not block, since doing // so would block the entire UDP packet receive loop. Additionally, the byte // slice may be modified after the call returns, so it should be copied if needed NotifyMsg([]byte) // GetBroadcasts is called when user data messages can be broadcast. // It can return a list of buffers to send. Each buffer should assume an // overhead as provided with a limit on the total byte size allowed. // The total byte size of the resulting data to send must not exceed // the limit. Care should be taken that this method does not block, // since doing so would block the entire UDP packet receive loop. GetBroadcasts(overhead, limit int) [][]byte // LocalState is used for a TCP Push/Pull. This is sent to // the remote side in addition to the membership information. Any // data can be sent here. See MergeRemoteState as well. The `join` // boolean indicates this is for a join instead of a push/pull. LocalState(join bool) []byte // MergeRemoteState is invoked after a TCP Push/Pull. This is the // state received from the remote side and is the result of the // remote side's LocalState call. The 'join' // boolean indicates this is for a join instead of a push/pull. MergeRemoteState(buf []byte, join bool) }
主要方法如下:
-
NotifyMsg:用于接收用戶消息(?
userMsg
?)。注意不能阻塞該方法,否則會阻塞整個UDP/TCP報文接收循環。此外由于數據可能在方法調用時被修改,因此應該事先拷貝數據。該方法用于接收通過UDP/TCP方式發送的用戶消息(?
userMsg
?):注意UDP方式并不是立即發送的,它會隨gossip周期性發送或在處理?
pingMsg
?等消息時發送從GetBroadcasts獲取到的用戶消息。//使用UDP方式將用戶消息傳輸到給定節點,消息大小受限于memberlist的UDPBufferSize配置。沒有使用gossip機制 func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error //與SendBestEffort機制相同,只不過一個指定了Node,一個指定了Node地址 func (m *Memberlist) SendToAddress(a Address, msg []byte) error //使用TCP方式將用戶消息傳輸到給定節點,消息沒有大小限制。沒有使用gossip機制 func (m *Memberlist) SendReliable(to *Node, msg []byte) error
GetBroadcasts:用于在gossip周期性調度或處理處理?
pingMsg
?等消息時攜帶用戶消息,因此并不是即時的。通常會把需要發送的消息通過?TransmitLimitedQueue.QueueBroadcast
?保存起來,然后在發送時通過?TransmitLimitedQueue.GetBroadcasts
?獲取需要發送的消息。見下面?TransmitLimitedQueue
?的描述。LocalState:用于TCP Push/Pull,用于向遠端發送除成員之外的信息(可以發送任意數據),用于定期同步成員狀態。參數?
join
?用于表示將該方法用于join階段,而非push/pull。MergeRemoteState:TCP Push/Pull之后調用,接收到遠端的狀態(即遠端調用LocalState的結果)。參數?
join
?用于表示將該方法用于join階段,而非push/pull。
定期(PushPullInterval)調用pushPull來隨機執行一次完整的狀態交互。但由于pushPull會與其他節點同步本節點的所有狀態,因此代價也比較大。
EventDelegate
僅用于接收成員的joining 和leaving通知,可以用于更新本地的成員狀態信息。
type EventDelegate interface { // NotifyJoin is invoked when a node is detected to have joined. // The Node argument must not be modified. NotifyJoin(*Node) // NotifyLeave is invoked when a node is detected to have left. // The Node argument must not be modified. NotifyLeave(*Node) // NotifyUpdate is invoked when a node is detected to have // updated, usually involving the meta data. The Node argument // must not be modified. NotifyUpdate(*Node) }
ChannelEventDelegate
?實現了簡單的?EventDelegate
?接口:
type ChannelEventDelegate struct { Ch chan<- NodeEvent }
ConflictDelegate
用于通知某個client在執行join時產生了命名沖突。通常是因為兩個client配置了相同的名稱,但使用了不同的地址。可以用于統計錯誤信息。
type ConflictDelegate interface { // NotifyConflict is invoked when a name conflict is detected NotifyConflict(existing, other *Node) }
MergeDelegate
在集群執行merge操作時調用。?NotifyMerge
?方法的參數?peers
?提供了對端成員信息。?可以不實現該接口。
type MergeDelegate interface { // NotifyMerge is invoked when a merge could take place. // Provides a list of the nodes known by the peer. If // the return value is non-nil, the merge is canceled. NotifyMerge(peers []*Node) error }
PingDelegate
用于通知觀察者完成一個ping消息(?pingMsg
?)要花費多長時間。可以在?NotifyPingComplete
?中(使用histogram)統計ping的執行時間。
type PingDelegate interface { // AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack AckPayload() []byte // NotifyPing is invoked when an ack for a ping is received NotifyPingComplete(other *Node, rtt time.Duration, payload []byte) }
AliveDelegate
當接收到?aliveMsg
?消息時調用的接口,可以用于添加日志和指標等信息。
type AliveDelegate interface { // NotifyAlive is invoked when a message about a live // node is received from the network. Returning a non-nil // error prevents the node from being considered a peer. NotifyAlive(peer *Node) error }
Broadcast
可以隨gossip將數據廣播到memberlist集群。
// Broadcast is something that can be broadcasted via gossip to // the memberlist cluster. type Broadcast interface { // Invalidates checks if enqueuing the current broadcast // invalidates a previous broadcast Invalidates(b Broadcast) bool // Returns a byte form of the message Message() []byte // Finished is invoked when the message will no longer // be broadcast, either due to invalidation or to the // transmit limit being reached Finished() }
Broadcast
?接口通常作為?TransmitLimitedQueue.QueueBroadcast
?的入參:
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { q.queueBroadcast(b, 0) }
alertmanager中的實現如下:
type simpleBroadcast []byte func (b simpleBroadcast) Message() []byte { return []byte(b) } func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false } func (b simpleBroadcast) Finished()
TransmitLimitedQueue
TransmitLimitedQueue主要用于處理廣播消息。有兩個主要的方法:?QueueBroadcast
?和?GetBroadcasts
?,前者用于保存廣播消息,后者用于在發送的時候獲取需要廣播的消息。隨gossip周期性調度或在處理?pingMsg
?等消息時調用?GetBroadcasts
?方法。
// TransmitLimitedQueue is used to queue messages to broadcast to // the cluster (via gossip) but limits the number of transmits per // message. It also prioritizes messages with lower transmit counts // (hence newer messages). type TransmitLimitedQueue struct { // NumNodes returns the number of nodes in the cluster. This is // used to determine the retransmit count, which is calculated // based on the log of this. NumNodes func() int // RetransmitMult is the multiplier used to determine the maximum // number of retransmissions attempted. RetransmitMult int mu sync.Mutex tq *btree.BTree // stores *limitedBroadcast as btree.Item tm map[string]*limitedBroadcast idGen int64 }
小結
memberlist中的消息分為兩種,一種是內部用于同步集群狀態的消息,另一種是用戶消息。
GossipInterval
?周期性調度的有兩個方法:
-
gossip?:用于同步?
aliveMsg
?、?deadMsg
?、?suspectMsg
?消息 -
probe?:用于使用?
pingMsg
?消息探測節點狀態
// GossipInterval and GossipNodes are used to configure the gossip // behavior of memberlist. // // GossipInterval is the interval between sending messages that need // to be gossiped that haven't been able to piggyback on probing messages. // If this is set to zero, non-piggyback gossip is disabled. By lowering // this value (more frequent) gossip messages are propagated across // the cluster more quickly at the expense of increased bandwidth. // // GossipNodes is the number of random nodes to send gossip messages to // per GossipInterval. Increasing this number causes the gossip messages // to propagate across the cluster more quickly at the expense of // increased bandwidth. // // GossipToTheDeadTime is the interval after which a node has died that // we will still try to gossip to it. This gives it a chance to refute. GossipInterval time.Duration GossipNodes int GossipToTheDeadTime time.Duration
用戶消息又分為兩種:
- 周期性同步:
- 以?
PushPullInterval
?為周期,使用?Delegate.LocalState
?和?Delegate.MergeRemoteState
?以TCP方式同步用戶信息; - 使用?
Delegate.GetBroadcasts
?隨gossip發送用戶信息。
- 以?
- 主動發送:使用?
SendReliable
?等方法實現主動發送用戶消息。
alertmanager的處理
alertmanager通過兩種方式發送用戶消息,即UDP方式和TCP方式。在alertmanager中,當要發送的數據大于?MaxGossipPacketSize/2
?將采用TCP方式(?SendReliable
?方法),否則使用UDP方式(?Broadcast
?接口)。
func (c *Channel) Broadcast(b []byte) { b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b}) if err != nil { return } if OversizedMessage(b) { select { case c.msgc <- b: //從c.msgc 接收數據,并使用SendReliable發送 default: level.Debug(c.logger).Log("msg", "oversized gossip channel full") c.oversizeGossipMessageDroppedTotal.Inc() } } else { c.send(b) } } func OversizedMessage(b []byte) bool { return len(b) > MaxGossipPacketSize/2 }
demo
這里?實現了一個簡單的基于gossip管理集群信息,并通過TCP給集群成員發送信息的例子。
原文鏈接:https://www.cnblogs.com/charlieroro/p/16466547.html
相關推薦
- 2022-08-25 R語言實現KMeans聚類算法實例教程_R語言
- 2022-11-27 Python+decimal完成精度計算的示例詳解_python
- 2021-12-04 Qt下監測內存泄漏的方法_C 語言
- 2022-08-30 C語言深入探究sizeof與整型數據存儲及數據類型取值范圍_C 語言
- 2022-05-04 配置Spring.Net框架開發環境_實用技巧
- 2022-12-04 Android自定義View繪制貝塞爾曲線實現流程_Android
- 2022-05-09 如何利用python讀取圖片屬性信息_python
- 2022-11-23 Python字符串格式化實例講解_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支