Gossip协议

简介

在一个有界网络中,每个节点都随机地与其他节点通信,经过一番杂乱无章的通信,最终所有节点的状态都会达成一致。每个节点可能知道所有其他节点,也可能仅知道几个邻居节点,只要这些节可以通过网络连通,最终他们的状态都是一致的。 简述工作过程:
Gossip是由种子节点发起:

  1. 当一个种子节点有状态需要更新到网络中的其他节点时,它会随机的选择周围几个节点散播消息
  2. 收到消息的节点也会重复该过程,直至最终网络中所有的节点都收到了消息。
  3. 这个过程可能需要一定的时间,由于不能保证某个时刻所有节点都收到消息,但是理论上最终所有节点都会收到消息,因此它是一个最终一致性协议。

Gossip的一个实现

memberlist

hashicorp/memberlist是HashiCorp公司开源的Gossip库,这个库被consul(也是HashiCorp公司开源的)所引用。
以下为简单使用过程:

  1. 定义某个节点的地址

    func GossipConfig() *memberlist.Config {
    	c := memberlist.DefaultLANConfig()
    	c.HandoffQueueDepth = 16384
    
    	c.Name = “gossip-01”
    	c.BindPort = 8091
    	c.AdvertisePort = 8091
    	return c
    }
    
  2. 定义收到信息的处理类Delegate 需要实现一下接口:

    NodeMeta(limit int) []byte
    NotifyMsg([]byte)
    GetBroadcasts(overhead, limit int) [][]byte
    LocalState(join bool) []byte
    MergeRemoteState(buf []byte, join bool)
    

其中最重要的是NotifyMsg接口,是收到信息的回调函数:

使用memberlist创建gossip集群。

func initGossip() {
	gossipConfig := config.GossipConfig()
	gossipConfig.Delegate = l.delegate
	log.Infof("gossip bind port: %d", gossipConfig.BindPort)
	log.Infof("gossip advertise port: %d", gossipConfig.AdvertisePort)

	var err error
	gossiper, err = memberlist.Create(gossipConfig)
	if err != nil {
		log.Fatalf("create gossiper failed: %v", err)
	}

	initialGossipPeers := []string{"0.0.0.0:9081","0.0.0.0:9082"}
	if initialGossipPeers != nil && len(initialGossipPeers) > 0 {
		_, err = gossiper.Join(initialGossipPeers)
		if err != nil {
			log.Fatalf("failed to join cluster: %v", err.Error())
		}
	}
}

memberlist的节点状态

节点的状态有以下三种: - alive 节点状况良好 - suspect 对于PingMsg没有应答或应答超时,这个节点的状态是可疑的 - dead 节点已死亡

节点的状态转换有以下过程:

  • 如果节点B无法被对节点A发出的PingMsg(这里是作者自定义的UDP协议,不是ICMP)进行响应,或者响应超时,它会被节点A标为suspect, 如果suspect持续一段时间(或它收到足够多的其它节点关于B的SuspectMsg),节点A会在集群中广播SuspectMsg,告知集群中的其它节点,节点B很可疑
  • 如果B收到了针对它的SuspectMsg,这显然是对它的不利言论,B可以通过发送AliveMsg告知对方,那么在对方节点看来B的state从suspect变为alive
  • 如果一段时间内,B的状态仍然是suspect, 那么对节点A而言,B的状态会被置为dead
  • 如果节点B在down掉一段时间后,重新上线,它可以通过与种子节点的Gossip(push/pull) 重新被认为alive