Contents

MIT 6.824 Raft Lab

这篇博客用来记录实现Raft Lab中遇到各种细节问题。

这里推荐Consensus: Bridging Theory and Practice,它包含了更加详尽的内容。

Introduction

共识(consensus)是fault-tolerant系统的基础问题。

我们一直误解的CAP理论,真实表达的是:在P存在的情况下,满足完美的CA是不存在的。而共识协议,大多数允许完美的CA,通过探知分区发生,进入显式的分区模式 以限制某些操作。并且,在恢复过程中 恢复数据一致性,补偿分区期间发生的错误。

Raft将复杂的共识问题分解为独立的子问题:

  • leader election:集群需要一个leader复制执行与协调共识算法
  • log replication:集群的leader接收client的log entries并replicate给集群里的其它followers
  • safety:保证共识算法的正确性,在election和replication时能够应对各种corner cases

Architecture

实现raft时,直观感受:这时一个并发多状态的程序,存在多处内部状态修改的trigger points。

比如raft内部timeout trigger的状态变化,比如raft需要接收外部RequestVoteRPC/AppendEntriesRPC然后做出状态变更。

我们无法完全预料到,你处理这个状态变更时是否满足前置条件,可能其它trigger point已经修改了内部状态,导致前置条件不满足了。也可能RPC请求乱序了,导致后发来的RPC被先处理,造成了raft状态问题。

比如,你在处理RequestVoteResp RPC时候,在T时刻got higher term导致降为follower,在T+1时刻处理got majority granted vote,但是当前状态已经变成了follower,从而导致非法的状态流转。

因此,基于这些问题,采用基于event-driven的方式,将raft独立设计成 朴素的状态机(channel单线程处理),从而保证状态流转前经过合法的校验。

基于event的raft架构图如下:https://raw.githubusercontent.com/Fedomn/misc-blog-assets/master/raft-my6824-flow.png

基于event的架构注意事项:

  • 好处:基于event的方式,可以配置给一个state只处理特点的event。比如一个leader收到higher term的RPC后,变为follower,这时即使它在收到了EventAppResp,它可以直接ignore,不做处理

同理,参考etcd的event架构,会发现类似的处理方式:

https://raw.githubusercontent.com/Fedomn/misc-blog-assets/master/etcd-raft-flow.png

Leader election

开始election,follower状态改为candidate将currentTem+1。Candidate需要执行:

  • vote for itself:注意计算majority时候,计数器初始为1
  • 并行发送RequestVote RPC给所有peers
  • 计算是否得到majority的granted votes

candidate state发生转换的时机有:

  • 作为任何state,收到更高term的RequestVoteRPC或AppendEntriesRPC,则 become follower
  • 作为candidate,收到RequestVoteRPC的reply 包含higher term,则 become follower
  • 作为candidate,在下一个election timeout时,还是candidate,代表上一次没有得到majority granted votes,则 become follower -> become candidate
  • 作为candidate,得到majority granted votes(计算时候包含它自己)后,则 become leader

注意事项:

  • 一个peer在一个term内只能vote给一个candidate (EventVote中判断args.Term == rf.currentTerm,来判断是否已经vote)
  • 一个candidate可能收到AppendEntries RPC,只要满足leader的termat least as large as candidate的term(EventApp中判断args.Term >= rf.currentTerm),这个candidate则会认可leader,自己降为follower
  • split vote情况:集群中同时出现多个candidate,则可能发生 no candidate得到majority vote。所以,一个random的election timeout可以一定层度的解决这个问题。但一旦出现edge case,整个cluster对外就不可用。因此,etcd-raft存在一个preVote阶段。当然,preVote很大目的为了解决rejoin导致集群重选leader问题(preVote成功才会增加term并成为candidate)

Safety:election restriction 如何保证

  • raft通过保证当选的leader,一定包含集群内的all potentially committed log entries。这样做简化了log replication的复杂度,因为log flow只会从leader -> followers,从而leader可以force overwrite follower的logs。(换个说法:新当选的leader如果包含了cluster里所有 潜在committed的logs,那就不用担心它会overwrite已经committed logs)

  • 实现的核心算法是:candidate的[]log需要满足,at least as up-to-date majority的[]log。up-to-date:比较lastLogTerm和lastLogIndex。term越高log越新;相同term时index越高log越新。

    我们进一步思考:term对于raft的意义是什么?由于我们讨论的consensus是基于非byzantine问题,不存在恶意节点。所以一旦更大的term出现在了[]log里,就代表集群里存在更新的leader。

实现细节:

  • 6.824中通过labrpc.dispatch里的reflect.Value.Call方法来模拟RPC通信,所以要求一个RPC的reply必须是同步的,因此在基于event的模型中,一个node收到RPC再send EventVote到状态机后,必须要hang住,等待处理完成。所以通过一个channel waitRequestVoteDone来实现。

    注意这里也有坑,一旦这个方法可能会被并发访问到,你就必须要考虑data race情况。比如RequestVote方法里的waitRequestVoteDone,同时2个RPC请求后,如果第二个先处理完,就signal给waitRequestVoteDone,就导致第一个RPC还未处理完成就返回,这时第一个labgob.NewEncoder就会read reply,而真实情况是第一个reply还在write,所以,这里出现了data race。导致的结果是:reply还未完全赋值完成后,就被读取了,可能是VoteGranted=默认值FALSE,term=默认值0,从而产生了各种异常情况。

    因此,waitRequestVoteDone设计成一个map chan,不同的client有不同的channel。

    但是,这里还会存在bug。就拿waitAppendEntriesDone举例,一个node由于RPC delay,接收到同一个leader的多个 AppendEntries请求,如果第一个请求处理较慢,第二个处理很快,第二个的就会先执行waitAppendEntriesDone<-done。但是第一个请求是先<-waitAppendEntriesDone,所以第一个会先response成功,从而导致了不完整的reply数据,比如reply中bool为默认值false。

    因此,waitRequestVoteDone的处理粒度无法满足乱序event,需要设计更加细粒度的wait chan,自然想到了在event中加入done chan来实现。即每一个event wait chan互相独立,互补影响。

  • 即使采用了event的方式,仍然可能存在并发情况,因为event方式通过channel实现,也就代表通过顺序<-channel来保证了串行处理避免data race。但并不是对raft内部状态的read和write都是通过event来的,只能说大部分操作可以通过event流转减少并发场景。

    第一类 比如:tickFunction 和 raft.step 属于一类,它们两之间并不需要锁,因为同一时刻只可能运行一个

    第二类 比如:startRequestVote和startAppendEntries中单独启动的goroutine中,如果需要读取raft state。当然也可以将,读取state的代码放到raft.step中,这样就不需要lock了,goroutine只负责RPC。

    第三类 比如:raft和client交互的地方,GetState和Start需要read内部state

    上面三类之间,存在着并发读写,因此必须通过锁机制避免data race。当然,仍然可以考虑通过channel怎么减少上面的类别

Log Replication

在leader election中胜出的就会调用becomeLeader,开始发送AppendEntriesRPC,注意初始化leader需要:

  • 初始化所有peer的nextIndex = commitIndex+1
  • 初始化所有peer的matchIndex = 0

leader的职责之一就是log replication,需要注意的点:

  • startAppendEntries方法:
    • 记得设置leader自己的nextIndex和matchIndex,为了后续计算commitIndex
    • 通过nextIndex决定发送给peer的 PrevLogIndex和PrevLogTerm
  • EventAppResp的handling方法:
    • 每当有RPC reply,都需要计算commitIndex。算法为:从lastLogIndex->commitIndex遍历idxN,来去看idxN在每个peer的matchIndex是否>=idxN && log[idxN].Term==currentTerm(这条是为了保证,leader只能commit自己的term的log),如果满足条件majority,就返回idxN作为commitIndex
    • 通过commitIndex和lastApplied来判断,是否应该要apply到rf.applyCh
  • 如果follower出现crash,run solowly,网络丢包问题,leader会根据nextIndex一直retry AppendEntries RPC直到所有follower存储了所有log entris
  • 一个log entry被认为是committed后,所有node就可以将其apply到raft之上的应用层FSM(BoltDB等持久化数据库)。也就是说index计算的流转为:nextIndex -> matchIndex -> commitIndex -> lastApplied
  • 一个log entry被认为是committed的标志:它被replicate到majority nodes,注意计数时要包含leader自己。由于consistency check的存在,一旦这个log entry被committed,它之前的log entries都会被认为committed。
  • 一旦follower收到leader的committedIndex,就会将自己uncommitted的log apply到自己的状态机里。从这里我们可以得出,所有node自己都保存着一个[]log,即使包含了uncommitted的log entries。通过consistency check来保证了,自己的[]log和leader一致。
  • leader的heartbeat就是空entris的AppendRPC,但是leaderCommittedIndex是真实的,也就是说follower的commit一个log是落后于leader一个RPC的。

Safety:commit entries from pervious terms 如何保证

  • leader在AppendEntries RPC后还未等到commit,自己就先crash了。这时,有些收到log entry的follower,在下次election中由于包含up-to-date的log,它会当选,进而继续执行上一次leader还未完成的replication。

    但是请注意,这条log entry现在是uncommitted状态,对client来说就是没成功。即使后面新当选的leader将它committed。

    所以,这里留一个问题,raft集群怎么和client交互,同步还是异步提交?需要保证线性一致性?

  • 即使通过了election restriction check,仍然可能出现一种问题。即新当选的leader无法断定这个follower里的previous term的entries是否committed,因为只有上一轮的leader才知道。如果这时新当选的leader采用了 force overwrite策略,则会导致上一轮leader的log丢失。如Figure3.7中的c情况。

    为了解决这个问题,leader一旦当选会发送一个no-op log entry来保证,leader之前的log都已经committed,同时也用来清洗了uncommitted的log(个人猜测),这样就避免了 时间先后顺序的 极端情况错误:

    leaderA的uncommitted的log1 -> leaderB的uncommitted的log2,如果leaderB不把leaderA的uncommitted的log clean掉,下一次leaderA在当选,就出现了log时间线顺序的不一致,即log1复制到了majority,但最终又会被term更高的log2给overwrite掉。也就是描述了Figure3.7中的情况。

实现细节:

  • 出现随机的"failed to reach agreement"在TestRejoin2B scenario里

    出现原因:在EventVote和EventApp处理逻辑里,即使reply=false了,也还是reset election timeout,导致时序上的错误,cluster没有election成功。

  • getLogEntry时出现"index out of range length"情况,

    出现原因:goroutine在发送的时候,其它goroutine修改了内部状态,导致并不是最开始trigger这个event时的状态

  • startAppendEntries里getEntry出现"index out of range [-1]“的panic

    出现原因:reply了一个default的AppendEntriesReply它的conflictIndex=0,从而导致nextIndex的时候0-1=-1。

    本质原因:startAppendEntries中不同的RPC response并不是按照发送顺序返回event的,从而导致之前term的response,在这个term才返回,造成了异常情况。

    解决方式:leader应该要识别出这种out-of-date的event,并丢弃它。通过RPC sequence来保证,每次raft发送RPC都会加1,如果小于这个sequence,则丢弃它。

out-of-date event问题:

继续延伸上面的out-of-date event问题,它会导致各种各样的乱序问题,造成状态不一致

出现乱序event的情况有:

  • 1.任何state的node都可能收到:EventVote/EventPreVote/EventApp/EventSnap
  • 2.作为非follower的node,会收到发出去的RPC response event:EventPreVoteResp/EventVoteResp/EventAppResp/EventSnapResp

处理乱序event的思路:

  • 对1来说:一旦处理了更高seq的event,就不会处理较低seq的event。处理代码如:
1
2
3
4
5
if args.Seq <= rf.recvRpcLatestSeq[e.From] {
    DRpcPrintf(rf.me, args.Seq, "%s %v<-%v AbortOldRPC argsSeq:%v <= recvRpcLatestSeq:%v, ignore it", e.Type, rf.me, args.CandidateId, args.Seq, rf.recvRpcLatestSeq[e.From])
    return nil
}
rf.setRecvRpcLatestSeq(e.From, args.Seq)
  • 对2来说:则不能像1一样,只要处理了更高seq的event,就不会处理较低seq的event,因为对一个leader的AppendEntries来说,完美的情况是,发送seq15的RPC后,就只处理seq15的response,这样在收到response后计算nextIndex来说,肯定是正确的。如果leader顺序发送了,seq13、seq14、seq15三个RPC,seq14第一个收到后,计算nextIndex=lastNextIndex+len(args.Entries)。

    注意,这里的seq13、seq14、seq15是raft.Start触发的3个请求,也就是说后一个比前一个 多一个log entry。假设,lastNextIndex=10,seq14的len(args.Entries)=3,seq15的len(args.Entries)=4。因此,收到seq14后,计算nextIndex=10+3=13。

    继续收到seq15,仍然满足约束(处理了更高seq的event,就不会处理较低seq的event),计算nextIndex=13+4=17,但正确的nextIndex=10+4=14才对。

    所以1的约束,在这里不能适用。最简单的解决方式是:只处理最新发出去的seq的event。处理代码如:

1
2
3
4
5
if args.Seq != rf.sendRpcLatestSeq[e.From].SendSeq {
    DRpcPrintf(rf.me, args.Seq, "AppendEntries %v->%v AbortOldRPC argsSeq:%v != sendRpcLatestSeq:%v, ignore it", e.From, e.To, args.Seq, rf.sendRpcLatestSeq[e.From].SendSeq)
    return nil
}
rf.sendRpcLatestSeq[e.From].RecvSeq = args.Seq

对2情况,我们再深入思考一下,是不是leader计算nextIndex的方式问题,如果leader的nextIndex通过follower返回给它,是不是可以避免上面的问题了呢。

按照这个思路,修改EventAppResp的处理逻辑:

1
2
3
4
5
6
7
if args.Seq <= rf.sendRpcLatestSeq[e.From].RecvSeq {
    DRpcPrintf(rf.me, args.Seq, "AppendEntries %v->%v AbortOldRPC argsSeq:%v <= recvRpcLatestSeq:%v, ignore it", e.From, e.To, args.Seq, rf.sendRpcLatestSeq[e.From].RecvSeq)
    return nil
}
// ...
rf.setNextIndexAndMatchIndexDirectly(e.From, reply.NextIndex)
// ...

将判断out-of-date逻辑从 != 转换为 <=,也就是说:从 只处理最新seq的event,到 一旦处理了更高seq的event,就不会处理较低seq的event。

同时为了解决上面计算nextIndex的问题,将nextIndex交给follower计算:NextIndex = args.PrevLogIndex + 1 + len(args.Entries)

注意:EventPreVoteResp/EventVoteResp/EventSnapResp这几个的判断out-of-date的逻辑,还是只处理最新seq的event,因为它们都必须要保证实时性。

Persisted state

raft持久化一些信息以保证可以安全的restart

  • currentTerm和voteFor:保证raft知道当前term已经vote过了,防止vote两次。currentTerm也防止了,如果有lower term的leader来,raft也能感知到。深入思考,term对raft来说是核心,它代表了谁在集群内更新。
  • log[]:注意这里的log也包含了uncommitted的log。相当于WAL一样,防止restart后uncommitted的log丢失。
  • 其它的volatile的状态,都可以在raft restart时计算出来

实现细节:

  • 出现随机的"failed to reach agreement"在TestFigure8Unreliable2C scenario里,也就是说10s内没有达到一个值的agreement,随机出现

    出现原因:由于没有preVote问题,rejoin的node具有更大的term,它就会打扰到leader,触发leader->follower,造成整个集群没有leader,也就无法reach agreement。

    解决方式:加入preVote,即StatePreCandidate状态。它的核心是:每次electionTimeout后,它的term不会加1,而preVoteRequest里的term是加1的。接收方判断请求是preVoteRequest,也不会将自身term进行赋值,只会进行term check和election restriction check。待得到majority后,become candidate再进行voteRequest,如果candidate刚好未got majority,则又会降为follower,重新开始preVote。

Log Compaction

compaction是个常见的问题,因为随着raftlog会不停变大,需要很长时间re-play on reboot或send to new server。

实现细节:

  • 改造startAppendEntries方法,同时支持AppendEntries RPC 和 InstallSnapshot RPC,区分的依据是:prevLogIndex < raft first log index

  • 实现EventSnap Handling:只判断term,不做consistency check,即follower无条件接收leader snapshot

    注意EventSnap Handling里asyncSnapshotCh,虽然是异步同步到applyCh里,而follower真正apply snapshot到应用层的方法在,CondInstallSnapshot里。

  • 实现EventSnapResp Handling:只判断term,如果reply true则set peer nextIndex = raft first log index + 1

  • 处理crash情况时候:目前我们只persist了currentTerm、votedFor、log这三个,但raft处理中还需要的核心变量如,commitIndex、lastApplied、nextIndex、matchIndex这些在初始化时,也需要计算出来,即在newRaft方法里。

    这里在加入snapshot后,存在一个有趣的地方。最初我们通过数组index来作为log的index,但有了lastIncludedIndex后,该怎么计算log index呢?

    简单的做法,将计算index的地方 加入lastIncludedIndex的考虑,这样也就需要把lastIncludedIndex一起持久化起来。

    也可以将log[0]作为lastIncludedIndex所在位置,logEntry加入index的概念,这样计算也会减少点bug,通过log将lastIncludedIndex持久化。

    注意前面引入的rpcSequence也需要在crash之前persist,防止crash后乱序RPC造成时序状态问题。

Client Interaction

建立在raft之上的kvservice,怎么实现一个分布式kv呢?

raft是一个共识协议,而一群机器 要想 组建成的分布式集群,就需要对client表现成一台机器(背后是一个fault-tolerant的集群)。

client interaction就是在描述,如何组织使用raft,达到一致性目的。

在client并发操作情况下,我们使用sequential specification来描述consistency model,这种模型称为 线性一致性 linearizability,它是一种strong consistency model

实现细节:

  • Clerk作为客户端:会按顺序servers,如果返回wrongLeader,则继续call下一个,直到返回成功。

  • KVServer作为raft之上的应用层:会调用raft.Start方法来写入op,op里面会包含kv + clientId + SeqNum 由于raft.Start是一个异步行为,要确定command是否真实commit,就需要通过raft.applyCh返回的msg来判断

    正常流程是:调用raft.Start后,会创建一个commandIndex对应的wait channel。另一个applier goroutine会等待raft.applyCh收到msg,并向commandIndex对应的wait channel发送信号。

    异常情况:KVServer还未reply client之前,发现已经lost leadership了(通过msg.CommandTerm > kv.currentTerm)。这也就代表不能reply给client。

  • 出现"history is not linearizable"错误,并且Porcupine给出了visualization的html,如下 https://raw.githubusercontent.com/Fedomn/misc-blog-assets/master/my6824-kvserver-porcupine-err.png

    尤其在有snapshot的情况下,更多地暴露出了raft core在lab2中没有发现的bug,尤其是线性一致性。这里记录两个:

    1. EventApp处理中,如果先snapshot后,prevLogIndex < firstLogIndex,则consistency check无法完成,这里简单处理,返回conflictIndex = firstLogIndex+1,让leader重新发送。

    2. lastApplied修改并发问题没有处理好,导致will apply时计算错误,lost一些entry到kvserver,从而产生了非线性问题。

    follower将要apply一个idx=797的entry到applyCh的时候,突然收到了leader的idx=800的InstallSnapshot RPC,并且InstallSnapshots idx=800的ApplyMsg 先于 idx=797的ApplyMsg 进入applyCh,从而kvserver会先install snapshot并set lastApplied=800,从而后面的idx=797的ApplyMsg再进入kvserver就会被忽略。

    但是在follower边,已经处理了InstallSnapshotRPC后,设置了lastApplied=800,但idx=797的ApplyMsg在发送完后,还有段代码: rf.lastApplied += deltaCount,即lastApplied=801,从而在follower下一次发送到applyCh的时候,就lost了idx=801的log entry了,从而产生了错误。

    root cause是:向rf.applyCh发送msg成功,并不代表lastApplied应该成功被修改。因为snapshoter里也会对它并发修改,因此需要考虑这种情况。所以rf.lastApplied = max(rf.lastApplied, commitIndex)

    举例来说:snapshot idx=800在前,apply idx=797在后,因此以最新的rf.lastApplied再重新设置。snapshot idx=700在前,apply idx=701在后,这是正常流程用当前apply的commitIndex来设置。

  • KVServer利用raft.persister来进行snapshot,其中包含了kv.lastApplied、kv.kvStore、kv.sessions 即使raft core突然crash了,在KVServer重启时,会重新读取上一次的snapshot并覆盖状态。

    而raft core在recover后,commitIndex、lastApplied、nextIndex、matchIndex都会从firstLogIndex开始从头开始,也就是说raft core会replay snapshot之后的所有log给KVServer,保证了正确性。

consistency model

在client interaction中,我们遇到了线性一致性模型,它是分布式系统中必不可少的组件。

分布式系统为了实现fault-tolerance,会建立在replicated state machine之上,一旦有了复制,必然会出现 不同节点机器 数据不一致的情况。但是我们清楚知道,故障任何时刻都会发生,所以分布式系统的目的之一,即使内部状态出现不一致,整个系统对外表现的仍然像一个单一的副本一样。实现这个特性的前提,就是一致性。

那什么才叫做一致性模型呢?它是一种直觉上的正确性。比如你向一个 single-value register里写入a后,再去读取register一定读取到的是a。这也是我们编程中根深蒂固的概念。

抽象来说:给定一些 operation 和 state 之间的rules (write a/read a),系统中的 operations history 始终遵循这些规则 (read一定会读取到 之前write的值)。我们将 这些 rules 称为 consistency model。

换句话说,consistency model是所有operations的history集合。比如,我们运行一个程序,它经历过这个集合中的每个operations,那么这个过程就是一致的。也就是说,这个operations history是可以预测的,也就满足了我们 直觉上的正确性。

然而,真实的分布式系统中,存在各种问题的delay (比如不同的process位于不同的data center),一个operation存在invocation time和completion time,即使invocation time满足我们直觉上的正确性,但是真实write/read成功的时间是completion time,它可能存在着模糊性。因为,我们的一致性模型必须允许这些情况发生。

linearizability

由上面所知,每个operation都存在invocation和completion time,operation生效的point在这之间。

operations的开始结束时间,只要有重叠,它们就是并发来的,并且真实完成时间 一定会在这之间,因此总会存在一些points,可以把这些operations串起来,以一个线性的顺序 依次invocation和completion(参考Porcupine正确的visualization),从而满足我们直觉上的正确性。我们称为 线性一致性。

如果满足线性一致性,突出的特点:一旦operation completion,任何人都能读取操作后的状态。也就是说,它存在一种物理时间的概念。

一旦operation completion,则这些changes对于,随后的其它participants是可见的。因此,线性一致性也防止了读取到旧数据。也就是说,它阻止了非单调读,即防止了:先读取到一个新的值,后读到了一个旧的值

总结下来:分布式系统中的 strong consistency = linearizability。它也是我们平时常说的 强一致性。即:

  • 一旦写入完成,随后的所有读操作(物理时间上的随后),都应该读到之前写入的值 或 读到更新的写入值
  • 一旦读到一个值,所有随后的读 都应该返回这个值 或 读到更新的写入值

但,如果系统无法满足 这些强约束,会怎么样呢?就有了以下其它consistency model

sequential consistency

顺序一致性

在operations存在重叠的 并发情况下,不去关心operation的真实completion先后时间,即使reorder也是可以接受的。

但是,它仍然保证了2个约束:

  • 单个节点的operations history,在全局operations history上来看,是符合直觉顺序。
  • 每个节点的operations history,都是全局一致的。

举例来说:

1
2
3
4
A: --W(x,1)--
B:  --W(x,2)--
C:                      -R(x,2)-   --R(x,1)-
D:                 -R(x,2)-      --R(x,1)--

例子中,可以找出一个全局的执行顺序: B W(x,2) -> A W(x,1) -> C R(x,2) -> D R(x,2) -> C R(x,1) -> D R(x,1)

尽管这个顺序从直观上看,不满足全局的线性一致性,但从全局上来看,仍然是顺序一致性的,即使是"错误的一致性”。

如果只将D中的两个R操作交换,则系统无法满足顺序一致性,因为无法找到一组全局顺序满足上面2个约束。即,D先读到1后读到2,和C读到的顺序不一致。

总结来说,顺序一致性 = linearizability - real-time-ordering。也就是说,线性一致性更加关心time,顺序一致性关心program order。

Multi-Raft

单个raft能够支撑的logs容量与并发请求有限,因此都会采用sharding的方式,也称partition。将一组kv分为不同的组,称为shard。比如tikv采用的range sharding方式。

每个raft-group负责一些shards,通过这种方式提高了性能,每个raft-group都可以并发操作,因此整体的throughput(puts and gets per unit time)都提高了。

像许多分布式系统一样,要想让多个raft-group协调在一起,需要一个中心的元数据服务,它的职责:

  • 管理 整个集群中servers的join/leave/move
  • 数据分片是在哪个raft-group中,即shard->gid->servers
  • 整个集群中的数据 均衡调度,自动迁移伸缩等

因此,lab4中的shardctrler就在负责元数据管理的工作。

而集群中的一组shardkv,就对应于lab2中的一组raft。它的职责:

  • 负责处理client发来的operation请求
  • 向shardctrler拉取集群中latest配置,即shard->gid->servers
  • 当sharding配置变化时候,向其它raft-group pull对应的shard数据

因此,lab4中的shardkv,真实物理分布可以参考tikv中的raft-group:

https://tikv.org/img/blog/building-distributed-storage-systems-on-raft/Raft%20group.png

其中的一组region就是一个raft-group,每个Node里的region就是一个raft实例,raft实例之间会通信从而达成一致。

而每个region之间,也会互相交流shard数据。

注意事项:

  • 出现随机的get(x) != expect(x)这种错误,并且发生情况在raft stop后,applyInsertShards之后,看起来像apply一个duplicated的msg,从而产生了get(x)="xabb” != expect(x)="xab” 这个分析了很久,并且重构了日志,让每个gid在一个logfile里。最后发现原因:raft重启后,会重新apply所有logs,这时config change,开始pull shards,待insertShards完成后。raft收到落后的command,导致又apply了duplicate的数据。

    解决方案,在applyInsertShards同时,将reply中的lastOperation来更新自己的session。因此,当下次applyMsg来的时候,就可以根据client的SequenceNum来判断 是否接受这个applyMsg,防止了duplicated的msg。

  • 出现了随机的程序hang住情况,几率比较小,通过pprof发现,goroutine中有400多个在gopark,也就是全都在挂起了,它的traces如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-----------+-------------------------------------------------------
       225   runtime.gopark
             runtime.goparkunlock (inline)
             sync.runtime_notifyListWait
             sync.(*Cond).Wait
             6.824/raft.(*Raft).asyncApplier
-----------+-------------------------------------------------------
       186   runtime.gopark
             runtime.chanrecv
             runtime.chanrecv1
             6.824/shardkv.(*ShardKV).applier
-----------+-------------------------------------------------------
        33   runtime.gopark
             runtime.chanrecv
             runtime.chanrecv1
             6.824/shardctrler.(*ShardCtrler).applier
-----------+-------------------------------------------------------
        17   runtime.gopark
             time.Sleep
             6.824/shardctrler.(*Clerk).command
             6.824/shardctrler.(*Clerk).Query (inline)
             6.824/shardkv.(*ShardKV).monitorConfiguration
-----------+-------------------------------------------------------
        12   runtime.gopark
             runtime.selectgo
             6.824/raft.(*Node).run
-----------+-------------------------------------------------------
        12   runtime.gopark
             runtime.selectgo
             6.824/raft.(*Raft).replicator
-----------+-------------------------------------------------------
        12   runtime.gopark
             runtime.selectgo
             6.824/raft.(*Raft).asyncSnapshoter
-----------+-------------------------------------------------------

因为我们的test是在一个shell里循环跑的,也就是说,当一个test完了,如果其中的goroutine没有全局释放完,就继续下一个test了,造成了goroutine泄漏。

而goroutine leak造成的现象就是,程序出现了假死状态,test整体卡住了,去观察日志发现3个gid的log,发现出现有的shard一直处在ShardPulling状态,即pullShardData没有处理。猜测某些goroutine没有被调度到,明明是基于信号的抢占式调度呀,是否可能出现了饥饿问题呢,即cpu密集型情况下,不会来回调度协成了,这还有待学习研究。

解决方式也很直接,加入go.uber.org/goleak在test完成后,进行goroutine leak检测,在可能泄漏的地方加上return的逻辑。

reference