0%

基于Raft的ShardedKV数据库实现

基于Raft的Sharded KV 数据库实现

项目来自于MIT6.824分布式系统的结课大作业,实现代码已上传 github仓库,该博客为项目的总体框架总结,省略了大量的实现细节和代码,细节总结可参考 MIT6.824实验总结

该项目实现目标为一个分布式容错的简单KV数据库,系统主要的功能点可以总结为:

  1. 提供包括put(key, value), append(key, value), get(key)的基本kv数据库功能
  2. 基于Raft共识算法的多服务器备份,实现一致性备份存储,实现了系统容错功能
  3. 基于Raft日志的WAL机制以及系统快照机制,允许系统在失效后通过日志重新执行、加载快照等,快速恢复数据
  4. 通过数据分片和多复制服务器组存储方式,实现了系统的高并发访问性能
  5. 支持存储服务器的动态配置,即可以动态的增加删除存储服务器

基本实现架构

如下图所示,系统按照标准的CS架构实现,其中Server端包括一个配置管理集群(Shard Manager)以及多个数据片存储管理集群(KV Server Group);Client端包括两种类型身份的Client:一种为发送KV数据操作请求的客户端(KV Client),一种为管理分片信息以及数据片存储集群的客户端(Shard Manage Client)

  • Shard Manager Server负责kv server group、数据分片以及分片到kv server映射信息等系统元数据的管理(类似于HDFS Master)
  • KV Server Group负责按照分片配置存储对应分片数据以及执行和响应KV客户端操作

其中Shard Manager和每个KV Server Group,通过多服务器备份的方式实现数据的可靠性,具体实现架构如下图所示(以KV Server Group为例):

  • 每个KV Server Group以及ShardManager内包括三个服务器实例,互为备份服务器
  • 通过基于Raft日志的WAL机制,保证不同副本之间的状态一致性以及错误恢复(KV Server中状态机为存储数据片、Shard Manager中状态机为系统shard元数据)

分片以及分片分配(sharding)

kv数据库中的每个key对相当于关系数据库中表中的条目,且value为单值,区别于关系型数据库条目由多个属性组成,采用Horiziontal Partitioning策略,基于hash的方法对数据进行分片,分片方式如下:

  • shardNum为配置的固定分片数量,确定后即不会改变
  • 确定分片后,根据KV Server Group数量将shard均匀分配到KV Server Gruop上,由Shard Manager维护映射关系

上述方法可以总结为:固定分片策略,动态分配方法相,优缺点为:

  • 优点:当KV Server Group配置改变时,涉及到数据迁移时,以shard为单位进行迁移,较于以key直接映射KV Serve Group(如下公式),降低了涉及到的数据迁移通信量
  • 缺点:根据key分布进行划分,当key分布不均有或者某些热点key访问量较高时,无法保证不同KV Serve Group之间的负载均衡

未来可以优化的点:

  • 采用复合划分Composite partitioning策略即:首先基于哈希方法划分,在根据key的分布规律和请求访问,进行基于列表划分(List Partitionning)的二次细粒度划分
  • 可以基于一致性哈希实现shard->KV Serve Group的映射管理,降低由于KV Serve Group的增加或者减少shard重分配导致的数据迁移

分片分配机制

Shard Manager作为的配置管理查询服务,支持动态增加/删除存储服务器,相关接口如下

  • Join(servers):批量增加存储服务器组
  • Leave(gruopIds):批量删除存储服务器组

当kvServerGroup配置发生改变时,Shard Manager会重新在剩余可用Gruop进行shard重分配,基本原则如下:

  • 保证shard在所有server Group上的均匀分配:一部分存储平均数个shard,一部分存储平均数+1个shard
  • 尽量减少shard迁移:重新计算平均数,存储shard大于平均数的group移动到小于平均数的group

重分配关键代码如下所示:

  • 根据group数量,计算平均值averagerShard和余数remindShard最终分配结果为remindShard个server存储averagerShard + 1个shard,其余group存储averagerShard个shard
  • 分为统计重分配shard和重新分配两个步骤,两步骤思路基本相同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//首先统计当前gruop数量
for key, _ := range newConfig.Groups {
groupShardNumMap[key] = 0
allGroupId = append(allGroupId, key)
}
//将shard超过average的日志重新分配
reallocateShards := make([]int, 0)
averageShard := len(newConfig.Shards) / len(allGroupId)
remindShard := len(newConfig.Shards) % len(allGroupId)
for shard, group := range newConfig.Shards {
_, ok := newConfig.Groups[group]
if !ok {
//shard所属gruop被删除的情况
reallocateShards = append(reallocateShards, shard)
} else {
numShard := groupShardNumMap[group]
//如何判断一个group是否超量(关键):管理shard数量 > averageShard 或者 管理shard数量 == averageShard 且此时可以管理averageShard + 1个shard的机会已经用尽
if numShard > averageShard || (numShard == averageShard && remindShard == 0) {
reallocateShards = append(reallocateShards, shard)
} else {
//如果当前group管理shard数量为averageShard,再分配一个shard,当前group管理了averageShard + 1个shard,则需要占用一个管理averageShard + 1的名额
if numShard == averageShard {
remindShard--
}
groupShardNumMap[group] += 1
}
}
}
//开始重新分配
//。。。。。。省略重新分配代码,和上部分差异不大

分片迁移实现

当系统发生Server Gruop的增加或者删除时,会触发shard在不同server之间的变更,虽然整体上思考较为复杂,但是从单个shard迁移的角度考虑,可以将变更过程定义为:多个同时进行的shard从一个server group 到 另一个server group的过程,如下图所示:

  • 对于每个server group来说,在每个配置变更期,要么有一定量移出的shard,要么有一定量等待移入的shard,要么shard不变

在shard迁移过程中,我们必须保证一下几点:

  1. shard不能丢失:需要迁出shard的server group只有在确保对方成功接收对应shard后,才能安全删除
  2. shard一旦迁出,不能再提供服务:server group在迁出shard后,不能再服务shard上的数据操作(client端可能为获取到最新配置,导致该问题的出现)
  3. 在配置切换过程中,系统能够继续提供服务

针对上述问题,为shard定义以下几个状态,迁入迁出过程可以通过状态变更实现:

  • Normal:默认正常状态(正常访问操作
  • WaitIn:等待迁入状态(无法访问操作)
  • In:已经迁入,但还向发送端确认状态(正常访问操作
  • out:等待迁出状态(无法访问操作)
  • Delelte:迁出完毕状态,可以进行垃圾回收(无法访问操作)

如下图以一个shard迁移过程中的状态变更为例,分析变更流程:

  • shard接收端读取到新配置后,创建空shard,并设置状态为 WaitIn
  • 不断地向发送端,拉取shard(由发送端推也一样,只是选择实现了拉),不断重试,直到获取到shard,修改状态: WaitIn -> In
  • 完成状态转换后,向发动端发送确认收到RPC,发动端修改shard状态: Out -> Delete
  • 接收端不断地发送确认收到RPC,直到RPC请求返回,携带有发送端已经将对应shard状态变更为Delete或者删除的信息后,停止发送,修改状态:In -> Normal
  • 当server Group的所有shard状态变为Normal或者Delete时,完成配置变更
  • 上述所有状态变更操作均首先提交到raft,在操作日志成功提交后,执行对应状态变更

上述设计思路的原因:

  1. 为什么需要确认消息,才能将对应shard状态变更Out -> Delete

    • 由于发送端是被拉取方,在接收端发送确认收到消息后,发送端才能保证shard已经发送到接收端且成功存储可以删除
  2. 为什么确认收到消息,返回需要携带发动端是否将对应shard状态变更为delete

    • 接受端发送确认消息的目的是为了通知发送端自己确认收到shard,接收端需要保证发送端收到并且成功记录的自己的确认消息,当发送端shard状态变为delete时,接收端可以确定自己的确认消息成功执行
    • 若不按照上述方式执行,可能存在第一次消息确认成功返回,接收端停止发送,但是发送端由于leader切换等,消息确认操作log未成功提交,导致发送端无限期等待接收端的确认消息

    • 上述设计来源于假设:即使请求成功返回,对应操作不一定成功执行,只有操作结果出现(raft保证操作结果不丢失),才能保证操作执行

  • 简单总结:发送端需要保证接收端接收到才能删除shard->接收端需要通知发送端自己收到了->接收端需要保证发送端知道了自己成功接收,才能停止通知

代码实现

在具体设计实现过程中,并未采用状态变更与通信绑定的操作,即一个线程执行了状态变更后,进行对应发送请求,具体考虑如下:

  • 通信可能失败,需要不断重试,状态变更线程不应等待通信,应该继续执行其他操作
  • 状态变更线程由于互斥需要,往往需要持有锁,由于通信的不确定性(延迟、失败),持有锁时进行RPC通信,可能导致系统性能大幅下降

综合考虑上述设计问题,采用了状态变更线程+周期性状态检测线程的思路

  • 状态变更线程:负责读取raft日志,根据日志中操作变更shard状态
  • 周期性状态检测线程:周期性遍历shard,根据shard状态按照上述交互图,发送消息

状态变更线程代码如下所示:

  • 只负责根据日志操作进行状态变更,不负责状态变更后的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (kv *ShardKV) processConfigOp(commandInterface interface{}) {
switch opCommand := commandInterface.(type) {
case ConfigOp:
//过滤重复的修改配置操作(因为从写入日志到日志提交存在时间差,可能重复提交日志)
if opCommand.Config.Num > kv.config.Num {
//省略根据配置信息修改shard状态代码
}
case InShardOp:
if opCommand.ConfigNum == kv.config.Num {
//省略添加shard(去重)
}
case DelShardOp:
if opCommand.ConfigNum == kv.config.Num {
//省略修改shard状态为out->delete(需要去重)
}
case AckShardOp:
if opCommand.ConfigNum == kv.config.Num {
//省略从in状态修改为normal状态(需要去重)
}
}
}

周期性检测线程代码如下所示:

  • 遍历所有shard,启动单独线程负责通信,主线程等待所有通信线程退出
  • 所有通信线程退出后,主线程遍历所有shard,判断是否退出配置切换状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (kv *ShardKV) updateShardState(updateFunc func(int, int, []string), chaeckStatus string) {
kv.mu.RLock()
_, isLeader := kv.rf.GetState()
//如果在配置
if kv.isConfiging() && isLeader {
var wg sync.WaitGroup
//遍历所有shard,根据状态执行对应操作(如:WaitIn状态发起拉取RPC请求,In状态发起确认RPC)
for shardId, shard := range kv.allShards {
if shard.ShardStatus == chaeckStatus {
wg.Add(1)
go func(shardId int, configNum int, allServers []string) {
defer wg.Done()
updateFunc(shardId, configNum, allServers)
}(shardId, kv.config.Num, kv.preConfig.Groups[kv.preConfig.Shards[shardId]])
}
}
//释放锁,并等待
kv.mu.RUnlock()
wg.Wait()
//上锁,判断当前状态是否可以退出配置状态
kv.mu.RLock()
completeFlag := true
for _, shard := range kv.allShards {
if shard.ShardStatus != ShardNormal && shard.ShardStatus != ShardDelete {
completeFlag = false
}
}
kv.mu.RUnlock()
if completeFlag {
kv.changeConfigState(false)
}
} else {
kv.mu.RUnlock()
}
}

由于后台存在多个周期性运行函数(状态检测、垃圾回收),抽取一个公用的周期循环方法:

1
2
3
4
5
6
7
8
9
10
11
func (kv *ShardKV) backRoutine(operation func(), interval int) {
for !kv.killed() {
//执行具体操作
operation()
time.Sleep(time.Duration(interval) * time.Millisecond)
}
}
//传入需要周期运行的方法
go kv.backRoutine(func() { kv.updateShardState(kv.callMigrateShard, ShardWaitIn) }, UpdateShardInterval)
//启动确认收到对应shard的线程
go kv.backRoutine(func() { kv.updateShardState(kv.callMigrateShardAck, ShardIn) }, UpdateShardInterval)

垃圾回收实现

根据分片迁移实现部分逻辑,仅仅需要回收状态为delete状态的shard,实现逻辑较为简单,采用周期性回收线程的方式,关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
func (kv *ShardKV) garbageCollect() {
kv.mu.Lock()
for shardId, shard := range kv.allShards {
if shard.ShardStatus == ShardDelete {
//删除对应状态的shard
delete(kv.allShards, shardId)
}
}
kv.mu.Unlock()
}
//启动垃圾回收线程
go kv.backRoutine(kv.garbageCollect, GCInterval)

遇到的实现问题

1.同一个类型中的RPC请求/结构体中,由于条件不同请求参数不同,导致大量无用参数

在实现过程中遇到了一个操作请求对应多种不同操作的情况,不同操作需要携带不同的操作参数,如下代码所示:

  • 一种操作对应四种类型的操作,传输其他操作时需要占用其他三种操作参数的空间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ShardOp struct {
//公共参数
OpType string
ConfigNum int
//修改配置操作参数
Config shardctrler.Config
//添加shard操作参数
AddShard int
Shard ShardData
//删除操作参数
DelShard int
//确认shard操作参数
AckShard int //迁移进入shard的参数
}

针对以上情况,想出了三种解决方案

  1. 发送不特殊处理,接收端根据opType进行处理(不做处理),缺点是:多余参数占用空间

  2. 修改结构体,使用byte[]存储编码后的参数,发送端编码,接收端根据OpType进行解码,缺点:编解码浪费时间,发送接收端需要确定编码顺序

    1
    2
    3
    4
    5
    6
    7
    type ShardOp struct {
    //公共参数
    OpType string
    ConfigNum int
    //修改配置操作参数
    parameters []byte
    }
  3. 将结构体拆分,传输不同的结构体,在接收端基于golang反射进行操作,缺点:反射的运行效率较低,影响系统运行效率

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    type DelShardOp struct {
    ConfigNum int //非切换配置需要使用的去重参数
    DelShard int //迁移出删除shard的参数

    AddShard int //迁移进入shard的参数
    Shard ShardData //迁移进入shard的参数
    }
    type InShardOp struct {
    ConfigNum int //非切换配置需要使用的去重参数
    AddShard int //迁移进入shard的参数
    Shard ShardData //迁移进入shard的参数
    }
    type AckShardOp struct {
    ConfigNum int //非切换配置需要使用的去重参数
    AckShard int //迁移进入shard的参数
    }
    //接收端执行操作
    func (kv *ShardKV) processConfigOp(commandInterface interface{}) {
    switch opCommand := commandInterface.(type) {
    case ConfigOp:
    case InShardOp:
    case DelShardOp:
    case AckShardOp:
    }
    }

最后综合考虑采用第三种方法,虽然执行效率低,但是实现逻辑上更加清晰,相较于第一种方式减少了空间浪费,降低了网络通信代价

测试与总结

测试过程按照以下方式进行:

  • 执行测试脚本,测试200次,每次输出结果写入到文件中

    1
    for ((i = 0; i < 200; i++)); do echo $i; (go test) > ./res/$i; grep -nr "FAIL.*" res;  done
  • 执行完毕,统计通过数量

    1
    grep -nr "PASS" res |wc -l
  • 重复执行三轮,共计测试600次

测试结果为:

  • 测试所有轮次均通过
  • 其中一次的测试输出为:
总结

终于经过了一个多月的视频学习和接近一个月的实验实现,终于完成MIT6.824的学习,现在回看自己的收获可以总结为以下几点:

  1. 对于分布式系统概念以及涉及到的知识点,有了广泛但不一定深入的了解
  2. 掌握了基本golang开发和调试的能力,对于golang的特性和语法有了一定程度的理解
  3. 对于并发编程,RPC通信,线程和进程有了更深的理解