<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
memberlist庫的簡單用法如下,注意下面使用for迴圈來執行 list.Join
,原因是一開始各節點都沒有runing,直接執行 Join
會出現連線拒絕的錯誤。
package main import ( "fmt" "github.com/hashicorp/memberlist" "time" ) func main() { /* Create the initial memberlist from a safe configuration. Please reference the godoc for other default config types. http://godoc.org/github.com/hashicorp/memberlist#Config */ list, err := memberlist.Create(memberlist.DefaultLocalConfig()) if err != nil { panic("Failed to create memberlist: " + err.Error()) } t := time.NewTicker(time.Second * 5) for { select { case <-t.C: // Join an existing cluster by specifying at least one known member. n, err := list.Join([]string{"192.168.80.129"}) if err != nil { fmt.Println("Failed to join cluster: " + err.Error()) continue } fmt.Println("member number is:", n) goto END } } END: for { select { case <-t.C: // Ask for members of the cluster for _, member := range list.Members() { fmt.Printf("Member: %s %sn", member.Name, member.Addr) } } } // Continue doing whatever you need, memberlist will maintain membership // information in the background. Delegates can be used for receiving // events when members join or leave. }
memberlist的兩個主要介面如下:
Create:根據入參設定建立一個 Memberlist
,初始化階段 Memberlist
僅包含本節點狀態。注意此時並不會連線到其他節點,執行成功之後就可以允許其他節點加入該memberlist。
Join:使用已有的 Memberlist
來嘗試連線給定的主機,並與之同步狀態,以此來加入某個cluster。執行該操作可以讓其他節點了解到本節點的存在。最後返回成功建立連線的節點數以及錯誤資訊,如果沒有與任何節點建立連線,則返回錯誤。
注意當join一個cluster時,至少需要指定叢集中的一個已知成員,後續會通過gossip同步整個叢集的成員資訊。
memberlist提供的功能主要分為兩塊:維護成員狀態(gossip)以及資料同步(boardcast、SendReliable)。下面看幾個相關介面。
memberlist.Create
的入參要求給出相應的 設定 資訊, DefaultLocalConfig()
給出了通用的設定資訊,但還需要實現相關介面來實現成員狀態的同步以及使用者資料的收發。注意下面有些介面是必選的,有些則可選:
type Config struct { // ... // Delegate and Events are delegates for receiving and providing // data to memberlist via callback mechanisms. For Delegate, see // the Delegate interface. For Events, see the EventDelegate interface. // // The DelegateProtocolMin/Max are used to guarantee protocol-compatibility // for any custom messages that the delegate might do (broadcasts, // local/remote state, etc.). If you don't set these, then the protocol // versions will just be zero, and version compliance won't be done. Delegate Delegate Events EventDelegate Conflict ConflictDelegate Merge MergeDelegate Ping PingDelegate Alive AliveDelegate //... }
memberlist使用如下 型別 的訊息來同步叢集狀態和處理使用者訊息:
const ( pingMsg messageType = iota indirectPingMsg ackRespMsg suspectMsg aliveMsg deadMsg pushPullMsg compoundMsg userMsg // User mesg, not handled by us compressMsg encryptMsg nackRespMsg hasCrcMsg errMsg )
如果要使用memberlist的gossip協定,則必須實現該介面。所有這些方法都必須是執行緒安全的。
type Delegate interface { // NodeMeta is used to retrieve meta-data about the current node // when broadcasting an alive message. It's length is limited to // the given byte size. This metadata is available in the Node structure. NodeMeta(limit int) []byte // NotifyMsg is called when a user-data message is received. // Care should be taken that this method does not block, since doing // so would block the entire UDP packet receive loop. Additionally, the byte // slice may be modified after the call returns, so it should be copied if needed NotifyMsg([]byte) // GetBroadcasts is called when user data messages can be broadcast. // It can return a list of buffers to send. Each buffer should assume an // overhead as provided with a limit on the total byte size allowed. // The total byte size of the resulting data to send must not exceed // the limit. Care should be taken that this method does not block, // since doing so would block the entire UDP packet receive loop. GetBroadcasts(overhead, limit int) [][]byte // LocalState is used for a TCP Push/Pull. This is sent to // the remote side in addition to the membership information. Any // data can be sent here. See MergeRemoteState as well. The `join` // boolean indicates this is for a join instead of a push/pull. LocalState(join bool) []byte // MergeRemoteState is invoked after a TCP Push/Pull. This is the // state received from the remote side and is the result of the // remote side's LocalState call. The 'join' // boolean indicates this is for a join instead of a push/pull. MergeRemoteState(buf []byte, join bool) }
主要方法如下:
NotifyMsg:用於接收使用者訊息( userMsg
)。注意不能阻塞該方法,否則會阻塞整個UDP/TCP報文接收回圈。此外由於資料可能在方法呼叫時被修改,因此應該事先拷貝資料。
該方法用於接收通過UDP/TCP方式傳送的使用者訊息( userMsg
):
注意UDP方式並不是立即傳送的,它會隨gossip週期性傳送或在處理 pingMsg
等訊息時傳送從GetBroadcasts獲取到的使用者訊息。
//使用UDP方式將使用者訊息傳輸到給定節點,訊息大小受限於memberlist的UDPBufferSize設定。沒有使用gossip機制 func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error //與SendBestEffort機制相同,只不過一個指定了Node,一個指定了Node地址 func (m *Memberlist) SendToAddress(a Address, msg []byte) error //使用TCP方式將使用者訊息傳輸到給定節點,訊息沒有大小限制。沒有使用gossip機制 func (m *Memberlist) SendReliable(to *Node, msg []byte) error
GetBroadcasts:用於在gossip週期性排程或處理處理 pingMsg
等訊息時攜帶使用者訊息,因此並不是即時的。通常會把需要傳送的訊息通過 TransmitLimitedQueue.QueueBroadcast
儲存起來,然後在傳送時通過 TransmitLimitedQueue.GetBroadcasts
獲取需要傳送的訊息。見下面 TransmitLimitedQueue
的描述。
LocalState:用於TCP Push/Pull,用於向遠端傳送除成員之外的資訊(可以傳送任意資料),用於定期同步成員狀態。引數 join
用於表示將該方法用於join階段,而非push/pull。
MergeRemoteState:TCP Push/Pull之後呼叫,接收到遠端的狀態(即遠端呼叫LocalState的結果)。引數 join
用於表示將該方法用於join階段,而非push/pull。
定期(PushPullInterval)呼叫pushPull來隨機執行一次完整的狀態互動。但由於pushPull會與其他節點同步本節點的所有狀態,因此代價也比較大。
僅用於接收成員的joining 和leaving通知,可以用於更新原生的成員狀態資訊。
type EventDelegate interface { // NotifyJoin is invoked when a node is detected to have joined. // The Node argument must not be modified. NotifyJoin(*Node) // NotifyLeave is invoked when a node is detected to have left. // The Node argument must not be modified. NotifyLeave(*Node) // NotifyUpdate is invoked when a node is detected to have // updated, usually involving the meta data. The Node argument // must not be modified. NotifyUpdate(*Node) }
ChannelEventDelegate
實現了簡單的 EventDelegate
介面:
type ChannelEventDelegate struct { Ch chan<- NodeEvent }
用於通知某個client在執行join時產生了命名衝突。通常是因為兩個client設定了相同的名稱,但使用了不同的地址。可以用於統計錯誤資訊。
type ConflictDelegate interface { // NotifyConflict is invoked when a name conflict is detected NotifyConflict(existing, other *Node) }
在叢集執行merge操作時呼叫。 NotifyMerge
方法的引數 peers
提供了對端成員資訊。 可以不實現該介面。
type MergeDelegate interface { // NotifyMerge is invoked when a merge could take place. // Provides a list of the nodes known by the peer. If // the return value is non-nil, the merge is canceled. NotifyMerge(peers []*Node) error }
用於通知觀察者完成一個ping訊息( pingMsg
)要花費多長時間。可以在 NotifyPingComplete
中(使用histogram)統計ping的執行時間。
type PingDelegate interface { // AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack AckPayload() []byte // NotifyPing is invoked when an ack for a ping is received NotifyPingComplete(other *Node, rtt time.Duration, payload []byte) }
當接收到 aliveMsg
訊息時呼叫的介面,可以用於新增紀錄檔和指標等資訊。
type AliveDelegate interface { // NotifyAlive is invoked when a message about a live // node is received from the network. Returning a non-nil // error prevents the node from being considered a peer. NotifyAlive(peer *Node) error }
可以隨gossip將資料廣播到memberlist叢集。
// Broadcast is something that can be broadcasted via gossip to // the memberlist cluster. type Broadcast interface { // Invalidates checks if enqueuing the current broadcast // invalidates a previous broadcast Invalidates(b Broadcast) bool // Returns a byte form of the message Message() []byte // Finished is invoked when the message will no longer // be broadcast, either due to invalidation or to the // transmit limit being reached Finished() }
Broadcast
介面通常作為 TransmitLimitedQueue.QueueBroadcast
的入參:
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { q.queueBroadcast(b, 0) }
alertmanager中的實現如下:
type simpleBroadcast []byte func (b simpleBroadcast) Message() []byte { return []byte(b) } func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false } func (b simpleBroadcast) Finished()
TransmitLimitedQueue主要用於處理廣播訊息。有兩個主要的方法: QueueBroadcast
和 GetBroadcasts
,前者用於儲存廣播訊息,後者用於在傳送的時候獲取需要廣播的訊息。隨gossip週期性排程或在處理 pingMsg
等訊息時呼叫 GetBroadcasts
方法。
// TransmitLimitedQueue is used to queue messages to broadcast to // the cluster (via gossip) but limits the number of transmits per // message. It also prioritizes messages with lower transmit counts // (hence newer messages). type TransmitLimitedQueue struct { // NumNodes returns the number of nodes in the cluster. This is // used to determine the retransmit count, which is calculated // based on the log of this. NumNodes func() int // RetransmitMult is the multiplier used to determine the maximum // number of retransmissions attempted. RetransmitMult int mu sync.Mutex tq *btree.BTree // stores *limitedBroadcast as btree.Item tm map[string]*limitedBroadcast idGen int64 }
memberlist中的訊息分為兩種,一種是內部用於同步叢集狀態的訊息,另一種是使用者訊息。
GossipInterval
週期性排程的有兩個方法:
// GossipInterval and GossipNodes are used to configure the gossip // behavior of memberlist. // // GossipInterval is the interval between sending messages that need // to be gossiped that haven't been able to piggyback on probing messages. // If this is set to zero, non-piggyback gossip is disabled. By lowering // this value (more frequent) gossip messages are propagated across // the cluster more quickly at the expense of increased bandwidth. // // GossipNodes is the number of random nodes to send gossip messages to // per GossipInterval. Increasing this number causes the gossip messages // to propagate across the cluster more quickly at the expense of // increased bandwidth. // // GossipToTheDeadTime is the interval after which a node has died that // we will still try to gossip to it. This gives it a chance to refute. GossipInterval time.Duration GossipNodes int GossipToTheDeadTime time.Duration
使用者訊息又分為兩種:
PushPullInterval
為週期,使用 Delegate.LocalState
和 Delegate.MergeRemoteState
以TCP方式同步使用者資訊;Delegate.GetBroadcasts
隨gossip傳送使用者資訊。SendReliable
等方法實現主動傳送使用者訊息。alertmanager通過兩種方式傳送使用者訊息,即UDP方式和TCP方式。在alertmanager中,當要傳送的資料大於 MaxGossipPacketSize/2
將採用TCP方式( SendReliable
方法),否則使用UDP方式( Broadcast
介面)。
func (c *Channel) Broadcast(b []byte) { b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b}) if err != nil { return } if OversizedMessage(b) { select { case c.msgc <- b: //從c.msgc 接收資料,並使用SendReliable傳送 default: level.Debug(c.logger).Log("msg", "oversized gossip channel full") c.oversizeGossipMessageDroppedTotal.Inc() } } else { c.send(b) } } func OversizedMessage(b []byte) bool { return len(b) > MaxGossipPacketSize/2 }
這裡 實現了一個簡單的基於gossip管理叢集資訊,並通過TCP給叢整合員傳送資訊的例子。
到此這篇關於通過memberlist庫實現gossip管理叢集以及叢集資料互動的文章就介紹到這了,更多相關memberlist庫gossip叢集內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45