奄幂牛 发表于 2025-6-2 23:39:22

hashicorp/raft模块实现的raft集群存在节点跨集群身份冲突问题

问题场景描述

我通过模块github.com/hashicorp/raft使用golang实现了一个raft集群功能,发现如下场景中会遇到一个问题:
测试启动如下2个raft集群,集群名称,和集群node与IP地址如下,raft集群均通过BootstrapCluster方法初始化:
Cluster1 BootstrapCluster servers:
- node1: {raft.ServerID: c1-node1, raft.ServerAddress: 192.168.100.1:7000}
- node2: {raft.ServerID: c1-node2, raft.ServerAddress: 192.168.100.2:7000}
- node3: {raft.ServerID: c1-node3, raft.ServerAddress: 192.168.100.3:7000}Cluster2 BootstrapCluster servers:
- node3: {raft.ServerID: c2-node3, raft.ServerAddress: 192.168.100.3:7000}
- node4: {raft.ServerID: c2-node4, raft.ServerAddress: 192.168.100.4:7000}
- node5: {raft.ServerID: c2-node5, raft.ServerAddress: 192.168.100.5:7000}其中,"node3"的地址会存在2个集群中。

[*]"node1","node2"按照"Cluster1"启动:
sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node1
sudo ./raft_svr -cluster 'c1-node1,127.0.0.1,800;c1-node2,127.0.0.2,800;c1-node3,127.0.0.3,800' -id c1-node2

[*]"node3","node4","node5"先按照"Cluster2"启动:
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node3
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node4
sudo ./raft_svr -cluster 'c2-node3,127.0.0.3,800;c2-node4,127.0.0.4,800;c2-node5,127.0.0.5,800' -id c2-node5
然后就会发现"node3"会在"Cluster1"和"Cluster2"之间来回切换,一会属于"Cluster1",一会属于"Cluster2".
INFO current state:Follower, leader address:127.0.0.5:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:53.330867 +0800 CST m=+169.779019126
INFO current state:Follower, leader address:127.0.0.1:800, servers:[{Suffrage:Voter ID:c2-node3 Address:127.0.0.3:800} {Suffrage:Voter ID:c2-node4 Address:127.0.0.4:800} {Suffrage:Voter ID:c2-node5 Address:127.0.0.5:800}], last contact:2025-05-14 15:35:54.308388 +0800 CST m=+170.756576126我的代码如下

package main

import (
        "flag"
        "fmt"
        "io"
        "net"
        "os"
        "strconv"
        "strings"
        "time"

        "github.com/hashicorp/raft"
        log "github.com/sirupsen/logrus"
)

type raftCluster struct {
        localRaftID   raft.ServerID
        servers         mapraft.ServerAddress // raftID : raftAddressPort
        raft            *raft.Raft
        electionTimeout time.Duration
}

func (r *raftCluster) Start() error {
        config := raft.DefaultConfig()
        config.HeartbeatTimeout = 2000 * time.Millisecond
        config.ElectionTimeout = 5000 * time.Millisecond
        config.CommitTimeout = 2000 * time.Millisecond
        config.LeaderLeaseTimeout = 2000 * time.Millisecond
        config.LocalID = r.localRaftID
        config.LogOutput = log.StandardLogger().Out

        r.electionTimeout = config.ElectionTimeout * time.Duration(len(r.servers)*2)

        localAddressPort := string(r.servers)
        tcpAddr, err := net.ResolveTCPAddr("tcp", localAddressPort)
        if err != nil {
                return fmt.Errorf("resolve tcp address %s, %v", localAddressPort, err)
        }
        transport, err := raft.NewTCPTransport(localAddressPort, tcpAddr, 2, 10*time.Second, log.StandardLogger().Out)
        if err != nil {
                return fmt.Errorf("fail to create tcp transport, localAddressPort:%s, tcpAddr:%v, %v",
                        localAddressPort, tcpAddr, err)
        }
        snapshots := raft.NewInmemSnapshotStore()
        logStore := raft.NewInmemStore()
        stableStore := raft.NewInmemStore()
        fm := NewFsm()
        r.raft, err = raft.NewRaft(config, fm, logStore, stableStore, snapshots, transport)
        if err != nil {
                return fmt.Errorf("create raft error, %v", err)
        }

        var configuration raft.Configuration
        for sID, addr := range r.servers {
                server := raft.Server{
                        ID:      sID,
                        Address: addr,
                }
                configuration.Servers = append(configuration.Servers, server)
        }
        err = r.raft.BootstrapCluster(configuration).Error()
        if err != nil {
                return fmt.Errorf("raft bootstrap faild, conf:%v, %v", configuration, err)
        }
        log.Infof("bootstrap cluster as config: %v", configuration)

        return nil
}

func (r *raftCluster) checkLeaderState() {
        ticker := time.NewTicker(time.Second)
        for {
                select {
                case leader := <-r.raft.LeaderCh():
                        log.Infof("im leader:%v, state:%s, leader address:%s", leader, r.raft.State(), r.raft.Leader())

                case <-ticker.C:
                        verifyErr := r.raft.VerifyLeader().Error()
                        servers := r.raft.GetConfiguration().Configuration().Servers
                        switch verifyErr {
                        case nil:
                                log.Infof("im leader, servers:%v", servers)
                        case raft.ErrNotLeader:
                                // check cluster leader
                                log.Infof("current state:%v, servers:%+v, leader address:%v, last contact:%v",
                                        r.raft.State(), servers, r.raft.Leader(), r.raft.LastContact())
                        }
                }
        }
}

func main() {
        var (
                clusters = flag.String("cluster", "",
                        "cluster node address, fmt: ID,IP,Port;ID,IP,Port")
                clusterId = flag.String("id", "", "cluster id")
        )
        flag.Parse()

        if *clusterId == "" {
                log.Infof("cluster id messing")
                os.Exit(1)
        }

        servers := make(mapraft.ServerAddress)
        for _, cluster := range strings.Split(*clusters, ";") {
                info := strings.Split(cluster, ",")
                var (
                        nid   string
                        nip   net.IP
                        nport int
                        err   error
                )
                switch {
                case len(info) == 3:
                        nid = info
                        nip = net.ParseIP(info)
                        if nip == nil {
                                log.Infof("cluster %s ip %s parse failed", cluster, info)
                                os.Exit(1)
                        }
                        nport, err = strconv.Atoi(info)
                        if err != nil {
                                log.Infof("cluster %s port %s parse failed, %v", cluster, info, err)
                        }
                default:
                        log.Infof("cluster args value is bad format")
                        os.Exit(1)
                }
                log.Infof("cluster node id:%s, ip:%v, port:%d", nid, nip, nport)
                addr := net.TCPAddr{IP: nip, Port: nport}
                servers = raft.ServerAddress(addr.String())
        }

        r := raftCluster{
                localRaftID: raft.ServerID(*clusterId),
                servers:   servers,
        }
        err := r.Start()
        if err != nil {
                log.Infof("rafter cluster start failed, %v", err)
                os.Exit(1)
        }
        r.checkLeaderState()
}

// SimpleFsm: 实现一个简单的Fsm

type SimpleFsm struct {
        db database
}

func NewFsm() *SimpleFsm {
        fsm := &SimpleFsm{
                db: NewDatabase(),
        }
        return fsm
}

func (f *SimpleFsm) Apply(l *raft.Log) interface{} {
        return nil
}

func (f *SimpleFsm) Snapshot() (raft.FSMSnapshot, error) {
        return &f.db, nil
}

func (f *SimpleFsm) Restore(io.ReadCloser) error {
        return nil
}

type database struct{}

func NewDatabase() database {
        return database{}
}

func (d *database) Get(key string) string {
        return "not implemented"
}

func (d *database) Set(key, value string) {}

func (d *database) Persist(sink raft.SnapshotSink) error {
        _, _ = sink.Write([]byte{})
        _ = sink.Close()
        return nil
}

func (d *database) Release() {}问题排除

重新编译运行后,我们看到node3始终保持在Cluster2中,并且可以看到如下日志
sudo ifconfig lo0 alias 127.0.0.2 up
sudo ifconfig lo0 alias 127.0.0.3 up
sudo ifconfig lo0 alias 127.0.0.4 up
sudo ifconfig lo0 alias 127.0.0.5 up在Cluster1的leader日志中,我们可以看到该leader向node3发送心跳失败的日志:
raft: rejecting appendEntries request since node is not in configuration: from=c1-node1提醒

注意,这个修改方法还没有得到官方的认可,可能会有其他潜在的影响,使用之前应该自我评估。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: hashicorp/raft模块实现的raft集群存在节点跨集群身份冲突问题