0%

MIT6.824实验总结

MIT6.824中共设计了四个实验,主要内容是一个分布式kv数据库。

  1. mapreduce分布式实现
  2. raft实现
  3. 基于raft的kv数据库实现
  4. Sharded KV数据库实现

Lab1:MapReduce分布式实现

实现架构按照论文中描述的实现方式:

  1. coordinator(master):一个协作服务器,负责任务的分配+任务运行状态的监控
  2. worker:多个worker,负责map和reduce任务的执行

整个系统的实现逻辑为(worker主动发送请求,coordinator被动响应请求):

  1. coordinator启动,初始化任务信息;同时多个worker启动,开始向coordinator发送rpc请求,请求分配任务
  2. coordinator根据“FIFO”原则,将map和任务分配给请求地worker,并记录任务状态和分配worker
  3. coordinator每收到一个worker完成任务的rpc请求,修改待完成任务数量,当map任务完成进入reduce阶段,当reduce任务完成,结束执行

coordinator主要实现逻辑即为map和reduce任务的分配,以map任务分配的关键代码为例:

  1. 首先判断是否存在未分配的map任务,若存在则进行任务分配

  2. 传入workerId,分配任务(记录workerId,修改任务数量,修改任务状态)

  3. 启动10秒的监控线程,休眠十秒后,如果此时任务状态不为已完成,认为worker执行任务出现问题,将任务状态重新修改为空闲待处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    if c.numMap != 0 {
    allocateNumber, allocateFile := c.AllocateMapJob(args.WorkerId)
    if allocateNumber != -1 {
    reply.JobType = JOBTYPEMAP
    reply.FileList = []string{allocateFile}
    reply.JobNumber = allocateNumber
    //启动线程,10秒种后若结果没有返回,将任务重置为未分配状态
    go func() {
    time.Sleep(10 * time.Second)
    if c.mapJobStatus[allocateFile][1] != JOBCOMPLETED {
    c.mapJobLocks[allocateFile].Lock()
    defer c.mapJobLocks[allocateFile].Unlock()
    if c.mapJobStatus[allocateFile][1] != JOBCOMPLETED {
    c.mapJobStatus[allocateFile][1] = JOBIDLE
    }
    }
    }()
    }
    return nil
    }

worker的主要实现逻辑为不断地向coordinator申请任务,关键代码为:

  1. 启动时首先向coordinator发送注册rpc请求,获得workerid

  2. 循环发送申请任务请求,终止条件为:收到任务结束标志或rpc请求失败

    • 根据获得任务类型(map/reduce),调用对应处理方法,返回处理结果
    • 当返回失败时(执行超时/rpc失败),删除任务输出结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {
    ok, workerId, reduceNumber := ReigsiterWorker()
    if ok {
    for taskEndFlag := false; !taskEndFlag; {
    ok, workInfo := CallForJob(workerId)
    if ok {
    if workInfo.IsOver {
    taskEndFlag = true
    } else {
    switch workInfo.JobType {
    case JOBTYPEMAP:
    ok, intermediate_file_names := doMapWork(workInfo.JobNumber, workInfo.FileList[0], reduceNumber, workerId, mapf)
    if ok {
    ok = CallForMapJobAccomplished(workInfo.FileList[0], workerId, intermediate_file_names)
    }
    if !ok {
    RemoveAllFiles(intermediate_file_names)
    }
    case JOBTYPEREDUCE:
    //省略...................
    default:
    log.Printf("none job recieved, waiting to next call\n")
    }
    }
    } else {
    log.Printf("failed to request a task, retry 2 seconds later")
    }
    time.Sleep(2 * time.Second)
    }
    }
    }

测试用例完全通过

总结

Lab1实现还是比较简单的,主要涉及的技术包括:

  1. 通过锁控制共享变量的访问
  2. 通过gorutine实现多线程并发编程
  3. 通过RPC进行进程间相互通信

Lab2:Raft共识算法实现

实验二主要任务是实现一个不包括成员切换功能的Raft共识算法,主要实现的功能部分如下

  1. Leader Election(选主):实现raft算法的选主功能
  2. Log Replication(日志复制):实现日志添加和多副本备份功能
  3. Persistence(持久化):按照raft论文中持久化要求,实现对应参数的持久化
  4. Snapshot/Log Compaction(快照):raft层实现日志压缩,从而实现上层应用的快照需求

功能实现

上述四个功能的实现主要参照论文中的figure2以及课程的相关资料,还有一部分存在疑问的地方也参考了其他人的实现思路,涉及到的参考均在文章末尾列出。

选主

选主首先要解决是raft结点的状态变更问题(即何时进入选举),按照论文中的思路如下(关键代码为ticker()函数):

  • 为避免选主的争抢问题,随机设置超时时间为250-400ms

  • 采用sleep的方式实现超时检测,而不是timer+事件处理的方式

  • 更新超时时间的时机为收到有效的“RPC”消息:
    1. 为某个 RequestVoteRPC(投票请求) 投出一票
    2. 收到 AppendEntryRPC(添加日志请求) 且该rpc是有效的
    3. 收到主节点的 InstallSnapshotRPC(更新快照请求)
1
2
3
4
5
6
7
8
9
10
11
12
13
func (rf *Raft) ticker() {
for !rf.killed() {
time.Sleep(time.Duration(rf.election_timeout-time.Now().UnixMilli()) * time.Millisecond)
rf.mu.Lock()
if !rf.killed() && rf.peer_status != STATUS_LEADER && rf.election_timeout<=time.Now().UnixMilli() {
if rf.election_timeout <= time.Now().UnixMilli() {
//省略:修改状态,发起选举
}
//省略。。。。。。。
}
rf.mu.Unlock()
}
}

第二个要解决的是leader端的选举判断逻辑(即如何进行选举),基本思路如下(关键代码为startElection()函数):

  1. 主进程为每个其他的raft结点启动一个发送线程,发送请求投票请求
  2. 主进程启动完毕之后,等待到“条件成熟”(主线程使用自旋锁,不断判断),判断是否成功选为leader
  3. 选为leader执行初始化操作,否则进入下一轮选举
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (rf *Raft) startElection(election_timeout int64) {
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
go func(aimServer int) {
//省略:发送投票请求,判断是否同意票
}(i)
}
}
//主线程不断判断是否满足条件(自旋锁)
for {
//所有发送线程退出/投票数达到要求
if int(finished_number) == len(rf.peers) || int(vote_number) > len(rf.peers)/2 {
break
}
rf.mu.Lock()
//超时或者状态变更
if time.Now().UnixMilli() >= election_timeout || rf.currentTerm != vote_requst.CandidateTerm {
rf.mu.Unlock()
break
}
rf.mu.Unlock()
//休息10毫秒
time.Sleep(time.Millisecond * 10)
}
//省略:判断是否能够成为leader,能够成为即转化状态,否则退出,等待下一轮选举
}
总结

主要涉及到的思路包括以下四点:

  1. 超时状态转换:采用sleep()+election_timeout的机制,不断有”事件“更新election_timeout时间点,检测线程不断的休眠到这个时间点,直到某次“起晚了”
  2. 选举判断:采用主线程判断+多个从线程(对应结点)发送的模式,主线程自旋等待条件满足,从线程执行完发送判断即退出
  3. 维持选主状态:通过定时发送heartbeat消息实现(与下部分重叠,在下部分阐述)
  4. 接收端判断投票逻辑:完全按照论文中实现

日志复制

日志发送

日志复制主要为leader结点的日志发送+follower结点的日志接收,实现之前我想到了两种思路:

  1. leader为每个follower设置一个发送线程
  2. leader采用广播形式采用单个发送线程同时向多个follower发送

逻辑上日志是采用广播形式,即leader每次发送日志会发送到所有的follower结点,另外heatbeart在此逻辑下同样是采用广播形式,然而论文中的如下描述与广播的逻辑相悖(单个结点发送失败不需要重新给其他结点法)

If followers crash or run slowly, or if network packets are lost, the leader retries Append- Entries RPCs indefinitely (even after it has responded to the client) until all followers eventually store all log entries.

当followers失效或者运行缓慢,导致发送失败,leader应该无限重试直到所有follower存储所有日志

仔细思考所谓的”retries Append- Entries RPCs indefinitely“会发现存在以下问题:

  1. 若在重试过程中leader需要发送另外一个日志,重试是否应该携带新日志,或者停止重试,重新发送,如果不停止一个,就会出现新旧消息同时发送的情况,然而如何停止无法实现
  2. 无限的重试导致heartbeat需要针对每个结点单独判断,单独发送

经过以上思考,最终确定采用广播的形式实现日志发送和heartbeat,基本思路如下:

  1. 统一发送日志和heartbeat使用一个广播接口,发送日志调用广播接口,heartbeat周期性调用广播接口
  2. 广播时每个follower根据nextIndex发送需要的log(不断地广播相当于实现了无限重试,只是将重试的逻辑转移到了下一次广播)
  3. 由于heartbeat的周期性发送,即使没有外部日志发送请求,其效果也相当于”无限重试”的效果

广播关键代码如下(broadcastAppendEntry()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rf *Raft) broadcastAppendEntry() {
//省略:判断状态函数
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
rf.mu.Lock()
//判断perlogIndex是否已经存储在日志中(小于伪头entryindex)
if rf.nextIndex[i]-1 < rf.log[0].Index {
//省略:发送快照方法
} else {
//根据结点缺失的日志情况,发送日志
args := AppendEntryArgs{LeaderId: rf.me, Entries: rf.log[rf.nextIndex[i]-rf.log[0].Index:], Term: rf.currentTerm,
PreLogIndex: rf.nextIndex[i] - 1, PreLogTerm: rf.log[rf.nextIndex[i]-rf.log[0].Index-1].Term, LeaderCommit: rf.commitIndex + rf.log[0].Index}
reply := AppendEntryReply{}
go rf.sendAppendEntry(i, &args, &reply)
}
rf.mu.Unlock()
}
}
//更新发送时间
rf.lastSendTime = time.Now().UnixMilli()
}

调用broadcastAppendEntry的时机有以下两种:

  • 每当leader接收到一个添加日志请求时,调用broadcastAppendEntry()

  • heartbeat采用类似election_timout的机制实现周期性调用broadcastAppendEntry()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (rf *Raft) Start(command interface{}) (int, int, bool) {
//省略。。。。。
if rf.peer_status == STATUS_LEADER {
//省略。。。。。
go rf.broadcastAppendEntry(false)
}
//省略。。。。。
}
func (rf *Raft) heartsbeats() {
rf.lastSendTime = time.Now().UnixMilli() - IDLE_INTERVAL_TIME
for !rf.killed() {
//省略验证状态代码
if time.Now().UnixMilli()-rf.lastSendTime >= IDLE_INTERVAL_TIME {
rf.broadcastAppendEntry()
}
//休眠到超时(未触发,休息到触发,否则休息一个interval)
time.Sleep(time.Duration(math.Min(IDLE_INTERVAL_TIME, float64(IDLE_INTERVAL_TIME+rf.lastSendTime-time.Now().UnixMilli()))) * time.Millisecond)
}
}
日志提交

日志提交逻辑按照论文中的逻辑,其中实现思路为:

  1. 提交日志到应用:每个raft结点启动时,启动applyEntry线程,等待lastapplied < commitIndex,进行提交
  2. 推进commitIndex:当leader选举成功时,leader启动checkCommit线程,不断推进commitIndex
  • 两个同步方式均采用golang的条件变量:rf.applyCond 和 rf.leaderCond

关键代码如下为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//1.启动raft进程时,启动applyEntry()进程
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
//省略。。。。。。。。。。。。。。。。。。。。。
go rf.checkCommit()
go rf.applyEntry()
//省略。。。。。。。。。。。。。。。。。。。。。
}
//2.applyEntry等待条件变量
func (rf *Raft) applyEntry() {
for !rf.killed() {
rf.applyCond.L.Lock()
rf.applyCond.Wait()
rf.applyCond.L.Unlock()
//省略:提交代码
}
}
//3.当commitIndex修改时(),唤醒条件变量
// 1. follower收到appenEntries时,有可能修改commitIndex
// 2. leader checkCommit时,有可能修改commitIndex
func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {
//省略。。。。。。。。。。。。。。。。。。。。。。。。。
if rf.lastApplied < rf.commitIndex {
rf.applyCond.L.Lock()
rf.applyCond.Signal()
rf.applyCond.L.Unlock()
}
//checkCommit方法不需要等待条件变量,周期性检查
func (rf *Raft) checkCommit() {
for !rf.killed() {
//等待成为leader
rf.leaderCond.L.Lock()
rf.leaderCond.Wait()
rf.leaderCond.L.Unlock()
//开始周期性检查,是否能增加commitIndex
for !rf.killed() {
time.Sleep(LEADER_COMMIT_CHECK_INTERVAL * time.Millisecond)
if rf.lastApplied < rf.commitIndex {
rf.applyCond.L.Lock()
rf.applyCond.Signal()
rf.applyCond.L.Unlock()
}
rf.mu.Unlock()
}
}
}
总结

日志复制部分要实现的逻辑比较多,也比较复杂,难点主要在于设计好整个发送接收以及提交框架,具体的日志验证、nextIndex维护按照论文中的描述即可

  1. 日志发送+heartbeat:统一广播接口,heartbeat周期性调用,日志发送响应外部请求调用
  2. 日志提交:leader的checkCommit线程推进commitIndex,所有raft结点的applyEntry推进lastapplied
  3. 其他实现逻辑:严格按照论文逻辑实现

持久化

持久化思路比较简单,在任何修改涉及到持久化属性时,调用持久化方法即可:

  1. 修改term:任意RPC请求收到Term大于自己的Term响应时;收到任意RPC请求Term大于自己的Term时
  2. 修改log:AppenEntriesRPC涉及到修改自身log时;Leader被调用start()方法,添加日志时;快照、接收到installsnapshot时
  3. 修改voteFor:收到RequestVoteRPC,并同意投票时;超时切换为Candidate状态时

快照

快照的难点不在于快照本身,而是在于快照导致的log的index不等于log在LogEntries中的index,如下图所示,经过快照日志的日志压缩后,raft结点的LogEntries长度从7变为3,导致index为5、6、7的三个日志项在LogEntriesz中的index为1、2、3

针对以上问题以及raft的性质,进行以下设计:

  1. 本地用伪index,包括:leader维护的nextIndex和matchIndex、commitIndex和lastApplied
  2. 传输转化为真Index,包括:appenEntry请求和返回index,requestVote请求和返回Index,installSnpshot请求和返回Index

确定以上index设计思路后,修改部分原始代码:

  1. AppendEntries()中接收到leader的commitIndex(真index)确定commitIndex时,需要转化为logEntries中的index
  2. Snapshot()中接收到leader的commitIndex和lastApplied减去日志压缩的数量
  3. InstallSnapshot()中接收快照
    • 若commitIndex/lastApplied小于快照的LastIncludedIndex,应直接将commitIndex和lastApplied设置为0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//1. 情况1
func (rf *Raft) AppendEntries(args *AppendEntryArgs, reply *AppendEntryReply) {
//省略。。。。。。。。
//更新commitIndex(涉及到index转换)
if args.LeaderCommit-rf.log[0].Index > rf.commitIndex {
rf.commitIndex = args.LeaderCommit - rf.log[0].Index
if rf.commitIndex > len(rf.log)-1 {
rf.commitIndex = len(rf.log) - 1
}
}
//省略。。。。。。。。
}
//2.情况2
func (rf *Raft) Snapshot(index int, snapshot []byte) {
//省略。。。。。。。。
rf.lastApplied -= index - rf.log[0].Index
rf.commitIndex -= index - rf.log[0].Index
//省略。。。。。。。。
//若为leader
if rf.peer_status == STATUS_LEADER {
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
//nextIndex移动
rf.nextIndex[i] -= index - rf.log[0].Index
rf.mathchIndex[i] = rf.nextIndex[i] - 1
}
}
}
}
func (rf *Raft) InstallSnapShot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {{
//省略。。。。。。。。
//如果lastApplied 或者 commitIndex 小于 args.LastIncludedIndex
if rf.commitIndex+rf.log[0].Index < args.LastIncludedIndex
rf.commitIndex = 0
rf.lastApplied = 0
}
if rf.lastApplied+rf.log[0].Index < args.LastIncludedIndex {
rf.lastApplied = 0
}
//省略。。。。。。。。
}

快照相关的方法在确定index的转化后,实现难度并不大,关键代码如下:

  • 在broadcastAppendEntry中增加判断,当prelogIndex指向日志在主节点中不存在时,发送InstallSnapshotRPC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rf *Raft) broadcastAppendEntry(isHeartbeat bool) {
//省略。。。。。。。。
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
//省略。。。。。。。。
//判断perlogIndex是否已经存储在日志中(小于伪头entryindex)
if rf.nextIndex[i] <= 0 {
//调用发送快照接口
args := InstallSnapshotArgs{Term: rf.currentTerm, LeaderId: rf.me,
LastIncludedIndex: rf.log[0].Index, LastIncludedTerm: rf.log[0].Term, Data: rf.persister.snapshot}
reply := InstallSnapshotReply{}
go rf.sendSnapShot(i, &args, &reply)

} else {
//省略发送日志代码
}
}
//更新发送时间
rf.lastSendTime = time.Now().UnixMilli()
}
总结

快照和持久化类似,代码的实现量并不大,关键在于修改历史代码使得兼容当前操作,这几个index的关系和转化折磨了我很久的时间,有时候debug很久才发现,不是逻辑问题,只是index没有考虑到的问题

遇到的实现问题

1. 不要在占有锁的时候进行通信

通信(RPC,管道等)前应该首先释放锁,因为通信是不可靠的,可能存在延迟返回导致长时间占有锁,系统停顿的问题,两种解决方案

  1. 先释放锁,再进行通信,或者通信完成再获取锁
  2. 启动单独的线程机型通信,主线程继续执行

情况1应用较为广泛,如下应用日志的关键代码:

  • 修改之前遇到了死锁bug:当前线程占有锁,向管道(applyCh)中写入,但是由于管道已满导致阻塞,测试代码中管道的消费者消费上一条消息调用Snapshot方法要获取锁,两者构成了占有且等待的条件,构成死锁(这个bug折磨死我了,测试几十次出现一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (rf *Raft) applyEntry() {
//省略:。。。。。。。。。。。。。。。
for !rf.killed() {
rf.mu.Lock()
//省略:获取发送所需资源
rf.mu.Unlock()
//log.Printf("peer:%v try to applyEntry,释放锁", rf.me)
//再发送信息
for i := 0; i < len(applyEntries); i++ {
rf.applyCh <- ApplyMsg{CommandValid: true, SnapshotValid: false, Command: applyEntries[i].Command, CommandIndex: applyEntries[i].Index}
}
//log.Printf("peer:%v try to applyEntry,完成发送", rf.me)
}
}
2. 在收到比自己更新(Term更大)的请求/响应时,应立即修改状态,返回请求

上述机制保证了过期的raft结点不会落后太多,如果在某些地方少考虑了这一要求,会出现意想不到的bug

3. Leader只能提交自己任期内的日志(重点)

这一点在看论文的时候有点难理解,导致在实现过程中容易忘记这一点,如果不按照这一点实现,测试时会出现日志不一致的情况

4. nextIndex会回退,matchIndex不会回退

这一点结合课程guidance思考了很久才想到,导致了之前一直存在的bugappendEntries发送端当prelogIndex冲突时只用改nextIndex,不用改matchIndex

  • matchIndex指向已经成功写入log,nextIndex回退不可能小于等于matchIndex
  • 发送appendEnties是之所以会发生prelogIndex冲突,是由于Leader初始化时将nextIndex设置为自己的日志长度
5.日志复制请求由于网络问题会存在先发送后到达的情况

在做实验3时测试发现了这个bug,Leader向follower先发送的日志复制请求反而后到,导致nextIndex不是因为日志冲突回退,而是因为历史请求延迟返回错误回退,之前没有考虑到这个情况

  • 例如:连续发送两个prelogIndex=12的appendEntry请求,第一个请求返回成功推进,第二个请求延迟,此时Leader发送prelogIndex=19的appendEntry请求,成功将nextIndex推进到29后,第二个请求返回,又将nextIndex回退到19
  • 上述情况在不不使用快照的情况下不会影响正确性,只会影响系统性能;使用快照后,client在收到prelogIndex=19的appendEntry后第二个情况返回之前可能会进行快照,结果导致client认为日志提交到了29以后,将29之前的日志均快照压缩,Leader由于延迟请求将nextIndex回退到了19,下次发送perlogIndex=19的日志复制请求时,client端已经不包含perlogIndex=19的日志

针对以上历史请求定义为:存在比当前请求后发送,但是先被接收到/返回的请求。对此解决方案为:

  1. 在follower接收端过滤历史请求

    • 无法全部过滤,因为follower无法判断请求是否为历史的,只能通过第一个index过滤一部分
    • 执行历史请求在follower端是不影响正确性的,所以没有采用commitIndex等的更加复杂的判断逻辑
    1
    2
    3
    4
    //判断是否过时,如果过时直接返回
    if args.Term < rf.currentTerm || args.PreLogIndex < rf.log[0].Index {
    return
    }
  2. 在leader端发送请求返回处理时,过滤历史请求

    • 判断term和nextIndex是否为发送时的值,term不同说明发生了重新选举,nextIndex不同说明当前请求返回前,有其他后发出的请求返回,两种情况都应丢弃当前返回结果
    1
    2
    3
    if rf.currentTerm == args.Term && rf.nextIndex[server]-1 >= 0 && args.PreLogIndex == rf.log[rf.nextIndex[server]-1].Index {
    //......
    }
  • 能够直接过滤历史请求的原因,在于新请求的成功执行代表了老请求+新信息的共同成功执行,即新请求包括了老请求的所有信息

测试和总结

实现不保证不存在bug,在完成代码之后,运行了400次测试用例,全部pass,可以证明整体上的逻辑没有大问题,实验中我得到的收获有以下几点:

  1. 并发控制很好玩,就是死锁太磨人
  2. 解决问题的成就感是遭受折磨的最大回报(如果实验室的项目能给我带来这种成就感,我可能就不会骂他垃圾了,或者说我的水平不足以坚持到能够给我提供成就感的时候

Lab3:基于Raft的KV数据库实现

实验三的任务是实现一个基于Raft的多副本kv数据库,在能够保证Raft实现正确性的情况下,实现KV数据库不算太难,在实际的调试中,大部分问题来自于Raft之前实现的小bug,说明ab2的测试还是不够完善,经过修改和重新测试后,通过了lab2和lab3的所有测试。

实现架构

该KV数据库满足典型的客户端-服务器的CS实现架构,客户通过Client向发出读写请求,Server接收并执行Client请求,Raft负责实现副本之间操作顺序的共识,client/server/raft之间的具体结构以及交互关系如MIT6.824课程资料中的raft_diagram.pdf所示:

  1. 多个Client,每个Client内部请求逐个发送,即单个Client内不会出现操作并发的情况(这一点很重要,一定程度降低了实现难度)
  2. 多个KV Server,每个Server对应一个Raft peer,不同Server存储相同内容,互为备份,通过Raft实现一致状态保证

一个典型的写操作流程,可以定义为如下流程:

1
2
3
4
5
6
sequenceDiagram
Client->>Leader Server: 1.发送写请求
Leader Server->>Leader Raft: 2.生成操作日志,发送到Raft层
Leader Raft ->> Leader Raft: 3.在Raft节点群内备份日志,达到多数后提交日志
Leader Raft ->> Leader Server: 5.提交日之后,通知server应用操作,修改状态
Leader Server ->> Client: 5.返回操作执行结果

其中每个server对应一个raft peerLeader Raft对应Leader Server:

线性一致性和容错

读线性一致性

基于Raft的WAL机制下,写请求自然是满足线性一致性的,但是对于读请求,如果不进行特殊处理,可能会读到过期数据

  • 例如:当发生网络分区,Client发送请求到了旧Leader,此时新Leader已经执行了部分更新操作,导致旧Leader返回过期数据,导致不满足读写的线性一致性

  • 对于此问题,Raft论文中提出了在响应只读请求之前,与大部分raft peer交互同步,确实自己状态是否已经过期。

    Raft handles this by having the leader exchange heartbeat messages with a majority of the cluster before responding
    to read-only requests.

对于raft读操作线性一致性保证,存在其他的效率更高的实现方式(以后进行总结):

  1. Read Index
  2. Lease Read

同步的方法性能较差,但是实现起来较为简单,结合以上思路系统读写操作实现为:所有的读写操作均通过leader进行,在应用到本地状态机之前首先提交Raft日志,日志提交后才进行状态变更

容错

To achieve linearizability in Raft, servers must filter out duplicate requests. The basic idea is that servers save the results of client operations and use them to skip executing the same request multiple times. To implement this, each client is given a unique identifier, and clients assign unique serial numbers to every command. Each server’s state machine maintains a session for each client. The session tracks the latest serial number processed for the client, along with the associated response. If a server receives a command whose serial number has already been executed, it responds immediately without re-executing the request. - raft论文中容错思路

由于KV层状态机状态以及状态变更基于Raft容错,所以可认为是可靠的,此时需要解决的容错问题其实仅限于请求响应的丢失,即请求成功执行但是客户端未收到响应,针对此问题的容错方案为:

  1. Client在未收到正确的响应之前,不断重试发送请求
  2. Server需要对成功执行的请求进行缓存,应对Client的请求重发(Server并不知道一个成功的请求响应是否被Client收到,必须记录)

由于每个Client的请求串行执行,上述容错方案可以实现为:

  1. 每个Client为每个操作编号,一个操作可以通过 Client号+Operation号唯一标识
  2. Server为每个Client缓存最新操作的执行结果,接收到操作时通过缓存判断是否为已执行过操作

上述机制存在一个漏洞即某操作对应Raft日志成功提交,但是可能由于Raft共识达成过于缓慢,在应用到状态机之前,Client认为请求执行超时,重新发送请求,此时同一个操作在Raft中对应两条日志项,针对此中情况增加过滤:

  • 一条操作可以有多条日志,但是只有一条日志操作会应用到状态机上了,应用到状态机以后即缓存操作结果
  • 在应用日志时,通过缓存判断是否为已执行过操作

小总结:缓存+双重过滤实现了响应丢失的容错,同时避免了重复执行一个相同操作破坏线性一致性

另外由于实验中Client串行发送请求,导致Server只需要存储每个Client最新操作执行结果,如果Client能够并发发送操作请求,则缓存需要基于滑动窗口的方式(来自知乎回答),简单总结加深印象:

  • Server为每个Client缓存可能需要的请求结果窗口:[op_uncheck1,op_uncheck2,……op_latest]
  • Client请求会携带其确认已经接收最大操作号,导致请求结果窗口左边界推进
  • Client新请求导致请求结果窗口右边界推进

功能实现

功能实现部分主要分为两部分进行总结:

  1. KV Client 请求发送逻辑
  2. KV Server 请求处理逻辑

KV Client 请求发送逻辑

Client由于请求串行执行,请求处理逻辑较为简单,主要是请求结果处理以及请求初始,以PutAppend操作代码为例:

  • 初始化请求体是设置操作编号,操作执行成功后才进行操作编号自增
  • 请求server初始随机访问,按照轮询的方式寻找leader,直到操作成功执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
args := PutAppendArgs{ClientStamp: ck.clentStamp, OpStamp: ck.opStamp, Key: key, Value: value, Op: op}
for i := ck.leaderServer; ; i = (i + 1) % len(ck.servers) {
reply := PutAppendReply{}
ok := ck.servers[i].Call("KVServer.PutAppend", &args, &reply)
if ok {
if reply.Err == ErrTimeOut {
} else if reply.Err == ErrWrongLeader {
} else {
if reply.Err == ErrNoKey {
break
}
//修改leaderServer
ck.leaderServer = i
break
}
}
}
//最后操作符加一
ck.opStamp++
}

Client在初始化随机设置clientId:

  • 采用实验代码中提供的nrand()函数,由于随机数范围较大,出现重叠的概率较小
  • 常用的分布式全局不重复ID生成方法为:DB自增、时间戳、snowflake算法(需要进一步学习)
1
2
3
4
5
6
7
8
9
10
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
ck.clentStamp = nrand() //随机生成当前client编号
ck.opStamp = 0 //操作编号初始化为0
bigx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(servers))))
ck.leaderServer = int(bigx.Int64()) //随机初始化当前leader,server
return ck
}

KV Server 处理逻辑

Server端在接收请求,应用请求变更、返回请求结果之前,首先写入Raft日志:

  • 基本流程:缓存去重->提交日志->等待chan通知 超时->返回结果
  • 其中接收到chan通知后,需要判断此时操作缓存中操作号是否为当前操作号
    • 原因:在我的实现中允许非leader向用户返回结果,即只要日志提交,且当前server有client在等待返回结果,即通知client
    • 如此设计会导致一个bug:如果一个日志提交后,旧leader返回结果给client后崩溃,新leader此时上线,client向新leader发送新请求,建立通知通道,此时新leader执行历史请求,向新请求通知通道通知,导致执行历史请求通知了新请求的返回
    • 如何排除此错误:请求返回处理程度接收到通知后,判断缓存中操作是否为历史操作结果,若为历史操作继续循环等待通知
  • 总结:是否返回操作结果由日志是否在超时时间内提交决定,与leader节点状态是否变化无关(此操作规避了leader变换导致的相同操作日志重复提交问题)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (kv *KVServer) processOperation(op Op) (Err, string) {
var resultError Err
resultValue := ""
kv.mu.RLock()
opRes, ok := kv.opResStore[op.ClientStamp]
if ok && opRes.OpStamp == op.OpStamp {
resultError, resultValue = opRes.Err, opRes.Value
kv.mu.RUnlock()
return resultError, resultValue
}
kv.mu.RUnlock()
kv.mu.Lock()
_, _, isLeader := kv.rf.Start(op)
if !isLeader {
kv.mu.Unlock()
resultError = ErrWrongLeader
return resultError, resultValue
}
notifyChan := make(chan int, 1)
kv.notifyChanStore[op.ClientStamp] = notifyChan
kv.mu.Unlock()
timeout := time.Now().UnixMilli() + MaxWaitTime
for time.Now().UnixMilli() < timeout {
select {
case opRes := <-notifyChan:
if opRes.OpStamp == op.OpStamp {
kv.mu.Lock()
kv.notifyChanStore[op.ClientStamp] = nil
kv.mu.Unlock()
resultError = opRes.Err
resultValue = opRes.Value
return resultError, resultValue
}
default:
//休眠10微妙
time.Sleep(10 * time.Millisecond)
}
}
resultError = ErrTimeOut
kv.mu.Lock()
kv.notifyChanStore[op.ClientStamp] = nil
kv.mu.Unlock()
return resultError, resultValue
}

Server在初始化时,启动一个读取提交信息的线程,负责在日志提交后将操作应用到状态机上:

  • log应用也需要过滤重复操作的原因:在一致性和容错部分描述过,即一条操作可能会有多条日志,但是只有一条日志操作会应用到状态机上
  • 进行快照时机:每次应用日志时,判断此时raft日志大小是否达到上限
  • 切换快照的时机:一旦接收到切换快照日志,即进行快照切换(2021版本实验需要用到CondInstallSnapshot函数,而2022版本里推荐不实现该函数,直接返回OK,其中涉及到的同步问题在raft层解决,见注意事项1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (kv *KVServer) applyCommitLog() {
for m := range kv.applyCh {
if m.CommandValid {
opCommand := m.Command.(Op)
kv.mu.Lock()
kv.lastAppliedIndex = m.CommandIndex
opRes, ok := kv.opResStore[opCommand.ClientStamp]
if !ok || opRes.OpStamp != opCommand.OpStamp {
notifyChan := kv.notifyChanStore[opCommand.ClientStamp]
kv.mu.Unlock()
if notifyChan != nil {
kv.notifyChanStore[opCommand.ClientStamp] = nil
notifyChan <- 1
}
}else{
kv.mu.Unlock()
}
if kv.maxraftstate != -1 && kv.rf.GetLogSize() > kv.maxraftstate {
kv.snapshotStatus()
}
} else if m.SnapshotValid {
kv.loadSnapshot(m.SnapshotIndex, m.Snapshot)
}
}
}

遇到的实现问题

1.InstallSnapshot引发的上层快照切换时机同步问题

做lab4在测试多并发时遇到了这个问题,发现自己并没有仔细思考CondInstallSnapshot这个函数存在的意义,简单的认为2022版本丢弃后,直接返回true不做其他处理即可,导致了kv层切换快照和应用日志的不同步引发的系统不满足线性一致性问题

切换日志同步问题:调用installSnapshot唤醒上层进行快照切换时,由于将快照切换消息发送到kv层时不占有锁,同时日志commit也在向kv层发送,两者存在争抢问题,即可能由于并发导致顺序出现错误,如下图所示例子

  1. 正确顺序为raft发送Snapshot:10日志后,发送后续Command:11和12日志
  2. 错误情况1:raft层截断日志之后,发送Snapshot:10日志之前,提交日志进程读取新日志,抢先发送Command:11日志
  3. 错误情况2:raft层截断日志之前,提交日志进程正在准备发送Command:9-10的日志,此时raft接收到InstallSnapshot,截断日志,抢在提交日志进程发送之前,发送Snapshot:10日志
1
2
3
4
5
6
7
8
9
10
graph LR
subgraph 错误顺序2:不需要日志发送
7[Snapshot:10] --> 8[/Command:9/]--> 9[/Command:10/]
end
subgraph 错误顺序1:未来日志提前发
4[/Command:11/] --> 5[Snapshot:10]--> 6[/Command:12/]
end
subgraph 正确顺序:
1[Snapshot:10] --> 2[/Command:11/]--> 3[/Command:12/]
end

切换日志同步问题并不一定会导致上层kv层出现状态不一致的问题,分情况讨论:

  • 错误情况1:会导致过期Snapshot和kv层跳过执行某些日志两种错误

    1. 过期snapshot问题:该问题通过在kv层比较最后应用日志index和快照index来过滤过期快照

    2. kv层跳过执行日志问题:按照上图,raft集群提交到了log:11,然而raft节点由于一定原因只执行到了log:8,此时raft leader向peer节点发送installsnapshot,然而发生了上图情况二,导致peer对应kv层直接从log:8跳到执行log:11,并且在1问题解决措施下,认为Snapshot:10过期并过滤,从而导致kv层漏执行log:9和log10,可能导致不一致问题

  • 错误情况2:导致kv层收到其认为执行过的历史日志

    • 通过比较日志应用index和raft层传入日志,过滤由于snapshot导致的“历史日志”

    • 例如上图:在Snapshot:10日志读取后,更新日志应用index到10,之后接收到Command:9-10均认为过期,直接丢弃

综上所属,错误1情况的跳过执行日志问题无法在kv层解决,需要在raft层避免情况1的出现,针对此设计一下同步思路:

  1. installSnapshot在释放锁之前,sendSnapshot标记+1,完成发送后sendSnapshot标记-1

    1
    2
    3
    4
    5
    6
    7
    8
    //写入applych
    rf.startSendingSnapshot()
    go func() {
    rf.applyCh <- ApplyMsg{CommandValid: false, SnapshotValid: true, Snapshot: args.Data,
    SnapshotIndex: args.LastIncludedIndex, SnapshotTerm: args.LastIncludedTerm}
    rf.finishSendingSnapshot()
    }()
    kv.mu.Unlock()
  2. 日志应用端,发送日志之前等待sendSnapshot为0

    1
    2
    3
    4
    5
    6
    for rf.isSendingSnapshot() {
    time.Sleep(5 * time.Millisecond)
    }
    for i := 0; i < len(applyEntries); i++ {
    rf.applyCh <- ApplyMsg{CommandValid: true, SnapshotValid: false, Command: applyEntries[i].Command, CommandIndex: applyEntries[i].Index}
    }
  3. 标记采用原子性操作的整数(没必要使用锁)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func (rf *Raft) isSendingSnapshot() bool {
    z := atomic.LoadInt32(&rf.snapshotMsgSending)
    return z == 1
    }
    func (rf *Raft) startSendingSnapshot() {
    atomic.AddInt32(&rf.snapshotMsgSending, 1)
    }
    func (rf *Raft) finishSendingSnapshot() {
    atomic.AddInt32(&rf.snapshotMsgSending, -1)
    }

上述不基于锁实现的同步机制,保证了日志应用端在确定发送日志之前执行的InstallSnapshot消息均能够发送,问题是有可能后发生的InstallSnapshot会阻碍与其无关之前日志应用端日志发送,由于installsnapshot调用频率较低,且执行较快,所以该问题影响不大

测试和总结

由于Raft底层保证+单个Client串行执行操作,实现kv服务的难度并不大,主要难点在于请求去重和遗留bug处理,经过三天时间终于写完并且测试通过代码(测试一百轮)。

Lab4:基于Raft的Sharded KV 数据库实现

lab4主要是在原来的kvServer基础上,添加分片(Shard)机制,从而实现一个真正的分布式容错高性能KV数据库,实现过程中的主要难点在于Shard在不同replicate group之间的交互过程。首先系统主要的功能点可以总结为:

  1. 提供包括put(key, value), append(key, value), get(key)的基本kv数据库功能
  2. 基于Raft共识算法的多服务器备份,实现一致性备份存储,实现了系统容错功能
  3. 基于Raft日志的WAL机制以及系统快照机制,允许系统在失效后通过日志重新执行、加载快照等,快速恢复数据
  4. 通过数据分片和多复制服务器组存储方式,实现了系统的高并发访问性能
  5. 支持存储服务器的动态配置,即可以动态的增加删除存储服务器

基本实现架构

如下图所示,系统按照标准的CS架构实现,其中Server端包括一个配置管理集群(Shard Manager)以及多个数据片存储管理集群(KV Server Group);Client端包括两种类型身份的Client:一种为发送KV数据操作请求的客户端(KV Client),一种为管理分片信息以及数据片存储集群的客户端(Shard Manage Client)

  • Shard Manager Server负责kv server group、数据分片以及分片到kv server映射信息等系统元数据的管理(类似于HDFS Master)
  • KV Server Group负责按照分片配置存储对应分片数据以及执行和响应KV客户端操作

其中Shard Manager和每个KV Server Group,通过多服务器备份的方式实现数据的可靠性,具体实现架构如下图所示(以KV Server Group为例):

  • 每个KV Server Group以及ShardManager内包括三个服务器实例,互为备份服务器
  • 通过基于Raft日志的WAL机制,保证不同副本之间的状态一致性以及错误恢复(KV Server中状态机为存储数据片、Shard Manager中状态机为系统shard元数据)

分片以及分片分配(sharding)

kv数据库中的每个key对相当于关系数据库中表中的条目,且value为单值,区别于关系型数据库条目由多个属性组成,采用Horiziontal Partitioning策略,基于hash的方法对数据进行分片,分片方式如下:

  • shardNum为配置的固定分片数量,确定后即不会改变
  • 确定分片后,根据KV Server Group数量将shard均匀分配到KV Server Gruop上,由Shard Manager维护映射关系

上述方法可以总结为:固定分片策略,动态分配方法相,优缺点为:

  • 优点:当KV Server Group配置改变时,涉及到数据迁移时,以shard为单位进行迁移,较于以key直接映射KV Serve Group(如下公式),降低了涉及到的数据迁移通信量
  • 缺点:根据key分布进行划分,当key分布不均有或者某些热点key访问量较高时,无法保证不同KV Serve Group之间的负载均衡

未来可以优化的点:

  • 采用复合划分Composite partitioning策略即:首先基于哈希方法划分,在根据key的分布规律和请求访问,进行基于列表划分(List Partitionning)的二次细粒度划分
  • 可以基于一致性哈希实现shard->KV Serve Group的映射管理,降低由于KV Serve Group的增加或者减少shard重分配导致的数据迁移

分片分配机制

Shard Manager作为的配置管理查询服务,支持动态增加/删除存储服务器,相关接口如下

  • Join(servers):批量增加存储服务器组
  • Leave(gruopIds):批量删除存储服务器组

当kvServerGroup配置发生改变时,Shard Manager会重新在剩余可用Gruop进行shard重分配,基本原则如下:

  • 保证shard在所有server Group上的均匀分配:一部分存储平均数个shard,一部分存储平均数+1个shard
  • 尽量减少shard迁移:重新计算平均数,存储shard大于平均数的group移动到小于平均数的group

重分配关键代码如下所示:

  • 根据group数量,计算平均值averagerShard和余数remindShard最终分配结果为remindShard个server存储averagerShard + 1个shard,其余group存储averagerShard个shard
  • 分为统计重分配shard和重新分配两个步骤,两步骤思路基本相同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//首先统计当前gruop数量
for key, _ := range newConfig.Groups {
groupShardNumMap[key] = 0
allGroupId = append(allGroupId, key)
}
//将shard超过average的日志重新分配
reallocateShards := make([]int, 0)
averageShard := len(newConfig.Shards) / len(allGroupId)
remindShard := len(newConfig.Shards) % len(allGroupId)
for shard, group := range newConfig.Shards {
_, ok := newConfig.Groups[group]
if !ok {
//shard所属gruop被删除的情况
reallocateShards = append(reallocateShards, shard)
} else {
numShard := groupShardNumMap[group]
//如何判断一个group是否超量(关键):管理shard数量 > averageShard 或者 管理shard数量 == averageShard 且此时可以管理averageShard + 1个shard的机会已经用尽
if numShard > averageShard || (numShard == averageShard && remindShard == 0) {
reallocateShards = append(reallocateShards, shard)
} else {
//如果当前group管理shard数量为averageShard,再分配一个shard,当前group管理了averageShard + 1个shard,则需要占用一个管理averageShard + 1的名额
if numShard == averageShard {
remindShard--
}
groupShardNumMap[group] += 1
}
}
}
//开始重新分配
//。。。。。。省略重新分配代码,和上部分差异不大

分片迁移实现

当系统发生Server Gruop的增加或者删除时,会触发shard在不同server之间的变更,虽然整体上思考较为复杂,但是从单个shard迁移的角度考虑,可以将变更过程定义为:多个同时进行的shard从一个server group 到 另一个server group的过程,如下图所示:

  • 对于每个server group来说,在每个配置变更期,要么有一定量移出的shard,要么有一定量等待移入的shard,要么shard不变

在shard迁移过程中,我们必须保证一下几点:

  1. shard不能丢失:需要迁出shard的server group只有在确保对方成功接收对应shard后,才能安全删除
  2. shard一旦迁出,不能再提供服务:server group在迁出shard后,不能再服务shard上的数据操作(client端可能为获取到最新配置,导致该问题的出现)
  3. 在配置切换过程中,系统能够继续提供服务

针对上述问题,为shard定义以下几个状态,迁入迁出过程可以通过状态变更实现:

  • Normal:默认正常状态(正常访问操作
  • WaitIn:等待迁入状态(无法访问操作)
  • In:已经迁入,但还向发送端确认状态(正常访问操作
  • out:等待迁出状态(无法访问操作)
  • Delelte:迁出完毕状态,可以进行垃圾回收(无法访问操作)

如下图以一个shard迁移过程中的状态变更为例,分析变更流程:

  • shard接收端读取到新配置后,创建空shard,并设置状态为 WaitIn
  • 不断地向发送端,拉取shard(由发送端推也一样,只是选择实现了拉),不断重试,直到获取到shard,修改状态: WaitIn -> In
  • 完成状态转换后,向发动端发送确认收到RPC,发动端修改shard状态: Out -> Delete
  • 接收端不断地发送确认收到RPC,直到RPC请求返回,携带有发送端已经将对应shard状态变更为Delete或者删除的信息后,停止发送,修改状态:In -> Normal
  • 当server Group的所有shard状态变为Normal或者Delete时,完成配置变更
  • 上述所有状态变更操作均首先提交到raft,在操作日志成功提交后,执行对应状态变更

上述设计思路的原因:

  1. 为什么需要确认消息,才能将对应shard状态变更Out -> Delete

    • 由于发送端是被拉取方,在接收端发送确认收到消息后,发送端才能保证shard已经发送到接收端且成功存储可以删除
  2. 为什么确认收到消息,返回需要携带发动端是否将对应shard状态变更为delete

    • 接受端发送确认消息的目的是为了通知发送端自己确认收到shard,接收端需要保证发送端收到并且成功记录的自己的确认消息,当发送端shard状态变为delete时,接收端可以确定自己的确认消息成功执行
    • 若不按照上述方式执行,可能存在第一次消息确认成功返回,接收端停止发送,但是发送端由于leader切换等,消息确认操作log未成功提交,导致发送端无限期等待接收端的确认消息

    • 上述设计来源于假设:即使请求成功返回,对应操作不一定成功执行,只有操作结果出现(raft保证操作结果不丢失),才能保证操作执行

  • 简单总结:发送端需要保证接收端接收到才能删除shard->接收端需要通知发送端自己收到了->接收端需要保证接收端知道了自己成功接收,才能停止通知

代码实现

在具体设计实现过程中,并未采用状态变更与通信绑定的操作,即一个线程执行了状态变更后,进行对应发送请求,具体考虑如下:

  • 通信可能失败,需要不断重试,状态变更线程不应等待通信,应该继续执行其他操作
  • 状态变更线程由于互斥需要,往往需要持有锁,由于通信的不确定性(延迟、失败),持有锁时进行RPC通信,可能导致系统性能大幅下降

综合考虑上述设计问题,采用了状态变更线程+周期性状态检测线程的思路

  • 状态变更线程:负责读取raft日志,根据日志中操作变更shard状态
  • 周期性状态检测线程:周期性遍历shard,根据shard状态按照上述交互图,发送消息

状态变更线程代码如下所示:

  • 只负责根据日志操作进行状态变更,不负责状态变更后的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (kv *ShardKV) processConfigOp(commandInterface interface{}) {
switch opCommand := commandInterface.(type) {
case ConfigOp:
//过滤重复的修改配置操作(因为从写入日志到日志提交存在时间差,可能重复提交日志)
if opCommand.Config.Num > kv.config.Num {
//省略根据配置信息修改shard状态代码
}
case InShardOp:
if opCommand.ConfigNum == kv.config.Num {
//省略添加shard(去重)
}
case DelShardOp:
if opCommand.ConfigNum == kv.config.Num {
//省略修改shard状态为out->delete(需要去重)
}
case AckShardOp:
if opCommand.ConfigNum == kv.config.Num {
//省略从in状态修改为normal状态(需要去重)
}
}
}

周期性检测线程代码如下所示:

  • 遍历所有shard,启动单独线程负责通信,主线程等待所有通信线程退出
  • 所有通信线程推出后,主线程遍历所有shard,判断是否退出配置切换状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (kv *ShardKV) updateShardState(updateFunc func(int, int, []string), chaeckStatus string) {
kv.mu.RLock()
_, isLeader := kv.rf.GetState()
//如果在配置
if kv.isConfiging() && isLeader {
var wg sync.WaitGroup
//遍历所有shard,根据状态执行对应操作(如:WaitIn状态发起拉取RPC请求,In状态发起确认RPC)
for shardId, shard := range kv.allShards {
if shard.ShardStatus == chaeckStatus {
wg.Add(1)
go func(shardId int, configNum int, allServers []string) {
defer wg.Done()
updateFunc(shardId, configNum, allServers)
}(shardId, kv.config.Num, kv.preConfig.Groups[kv.preConfig.Shards[shardId]])
}
}
//释放锁,并等待
kv.mu.RUnlock()
wg.Wait()
//上锁,判断当前状态是否可以退出配置状态
kv.mu.RLock()
completeFlag := true
for _, shard := range kv.allShards {
if shard.ShardStatus != ShardNormal && shard.ShardStatus != ShardDelete {
completeFlag = false
}
}
kv.mu.RUnlock()
if completeFlag {
kv.changeConfigState(false)
}
} else {
kv.mu.RUnlock()
}
}

由于后台存在多个周期性运行函数(状态检测、垃圾回收),抽取一个公用的周期循环方法:

1
2
3
4
5
6
7
8
9
10
11
func (kv *ShardKV) backRoutine(operation func(), interval int) {
for !kv.killed() {
//执行具体操作
operation()
time.Sleep(time.Duration(interval) * time.Millisecond)
}
}
//传入需要周期运行的方法
go kv.backRoutine(func() { kv.updateShardState(kv.callMigrateShard, ShardWaitIn) }, UpdateShardInterval)
//启动确认收到对应shard的线程
go kv.backRoutine(func() { kv.updateShardState(kv.callMigrateShardAck, ShardIn) }, UpdateShardInterval)

垃圾回收实现

根据分片迁移实现部分逻辑,仅仅需要回收状态为delete状态的shard,实现逻辑较为简单,采用周期性回收线程的方式,关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
func (kv *ShardKV) garbageCollect() {
kv.mu.Lock()
for shardId, shard := range kv.allShards {
if shard.ShardStatus == ShardDelete {
//删除对应状态的shard
delete(kv.allShards, shardId)
}
}
kv.mu.Unlock()
}
//启动垃圾回收线程
go kv.backRoutine(kv.garbageCollect, GCInterval)

遇到的实现问题

1.同一个类型中的RPC请求/结构体中,由于条件不同请求参数不同,导致大量无用参数

在实现过程中遇到了一个操作请求对应多种不同操作的情况,不同操作需要携带不同的操作参数,如下代码所示:

  • 一种操作对应四种类型的操作,传输其他操作时需要占用其他三种操作参数的空间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ShardOp struct {
//公共参数
OpType string
ConfigNum int
//修改配置操作参数
Config shardctrler.Config
//添加shard操作参数
AddShard int
Shard ShardData
//删除操作参数
DelShard int
//确认shard操作参数
AckShard int //迁移进入shard的参数
}

针对以上情况,想出了三种解决方案

  1. 发送不特殊处理,接收端根据opType进行处理(不做处理),缺点是:多余参数占用空间

  2. 修改结构体,使用byte[]存储编码后的参数,发送端编码,接收端根据OpType进行解码,缺点:编解码浪费时间,发送接收端需要确定编码顺序

    1
    2
    3
    4
    5
    6
    7
    type ShardOp struct {
    //公共参数
    OpType string
    ConfigNum int
    //修改配置操作参数
    parameters []byte
    }
  3. 将结构体拆分,传输不同的结构体,在接收端基于golang反射进行操作,缺点:反射的运行效率较低,影响系统运行效率

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    type DelShardOp struct {
    ConfigNum int //非切换配置需要使用的去重参数
    DelShard int //迁移出删除shard的参数

    AddShard int //迁移进入shard的参数
    Shard ShardData //迁移进入shard的参数
    }
    type InShardOp struct {
    ConfigNum int //非切换配置需要使用的去重参数
    AddShard int //迁移进入shard的参数
    Shard ShardData //迁移进入shard的参数
    }
    type AckShardOp struct {
    ConfigNum int //非切换配置需要使用的去重参数
    AckShard int //迁移进入shard的参数
    }
    //接收端执行操作
    func (kv *ShardKV) processConfigOp(commandInterface interface{}) {
    switch opCommand := commandInterface.(type) {
    case ConfigOp:
    case InShardOp:
    case DelShardOp:
    case AckShardOp:
    }
    }

最后综合考虑采用第三种方法,虽然执行效率低,但是实现逻辑上更加清晰,相较于第一种方式减少了空间浪费,降低了网络通信代价

测试与总结

测试过程按照以下方式进行:

  • 执行测试脚本,测试200次,每次输出结果写入到文件中

    1
    for ((i = 0; i < 200; i++)); do echo $i; (go test) > ./res/$i; grep -nr "FAIL.*" res;  done
  • 执行完毕,统计通过数量

    1
    grep -nr "PASS" res |wc -l
  • 重复执行三轮,共计测试600次

测试结果为:

  • 测试所有轮次均通过
  • 其中一次的测试输出为:
总结

终于经过了一个多月的视频学习和接近一个月的实验实现,终于完成MIT6.824的学习,现在回看自己的收获可以总结为以下几点:

  1. 对于分布式系统概念以及涉及到的知识点,有了广泛但不一定深入的了解
  2. 掌握了基本golang开发和调试的能力,对于golang的特性和语法有了一定程度的理解
  3. 对于并发编程,RPC通信,线程和进程有了更深的理解

参考

在实现过程中,由于存在部分知识点理解不够透彻,漏看某些实验条件和实验约束,导致部分实验卡壳,实现过程中参考了部分其他实现方案,具体参考如下:

  1. 知乎:MIT6.824-2021 Lab4 : MultiRaft 主要参考了shard封装和状态的思路,并从博主其他博客中了解到了其他可用来帮助加深理解Raft等算法的资料
  2. 博客园:MIT6.824 spring21 Lab2D总结记录 根据博客中快照同步的讲解理解了为什么会需要快照同步,不进行快照同步可能带来的bug
  3. 知乎:MIT6.824_2021_labs 主要和他实现性能进行对比(因为只有他放了结果截图),基本所有lab实现测试时间小于他的水平(达到心理的满足

除此之外,参考其他的大多是golang相关的问题,四个实验从设计到实现基本上独立完成,参考较少,没有进行过代码copy。