基于Raft的Sharded KV 数据库实现
项目来自于MIT6.824分布式系统的结课大作业,实现代码已上传 github仓库,该博客为项目的总体框架总结,省略了大量的实现细节和代码,细节总结可参考 MIT6.824实验总结
该项目实现目标为一个分布式容错的简单KV数据库,系统主要的功能点可以总结为:
- 提供包括
put(key, value), append(key, value), get(key)
的基本kv数据库功能 - 基于Raft共识算法的多服务器备份,实现一致性备份存储,实现了系统容错功能
- 基于Raft日志的WAL机制以及系统快照机制,允许系统在失效后通过日志重新执行、加载快照等,快速恢复数据
- 通过数据分片和多复制服务器组存储方式,实现了系统的高并发访问性能
- 支持存储服务器的动态配置,即可以动态的增加删除存储服务器
基本实现架构
如下图所示,系统按照标准的CS架构实现,其中Server端包括一个配置管理集群(Shard Manager)以及多个数据片存储管理集群(KV Server Group);Client端包括两种类型身份的Client:一种为发送KV数据操作请求的客户端(KV Client),一种为管理分片信息以及数据片存储集群的客户端(Shard Manage Client)
- Shard Manager Server负责kv server group、数据分片以及分片到kv server映射信息等系统元数据的管理(类似于HDFS Master)
- KV Server Group负责按照分片配置存储对应分片数据以及执行和响应KV客户端操作
其中Shard Manager和每个KV Server Group,通过多服务器备份的方式实现数据的可靠性,具体实现架构如下图所示(以KV Server Group为例):
- 每个KV Server Group以及ShardManager内包括三个服务器实例,互为备份服务器
- 通过基于Raft日志的WAL机制,保证不同副本之间的状态一致性以及错误恢复(KV Server中状态机为存储数据片、Shard Manager中状态机为系统shard元数据)
分片以及分片分配(sharding)
kv数据库中的每个key对相当于关系数据库中表中的条目,且value为单值,区别于关系型数据库条目由多个属性组成,采用Horiziontal Partitioning策略,基于hash的方法对数据进行分片,分片方式如下:
- shardNum为配置的固定分片数量,确定后即不会改变
- 确定分片后,根据KV Server Group数量将shard均匀分配到KV Server Gruop上,由Shard Manager维护映射关系
上述方法可以总结为:固定分片策略,动态分配方法相,优缺点为:
- 优点:当KV Server Group配置改变时,涉及到数据迁移时,以shard为单位进行迁移,较于以key直接映射KV Serve Group(如下公式),降低了涉及到的数据迁移通信量
- 缺点:根据key分布进行划分,当key分布不均有或者某些热点key访问量较高时,无法保证不同KV Serve Group之间的负载均衡
未来可以优化的点:
- 采用复合划分Composite partitioning策略即:首先基于哈希方法划分,在根据key的分布规律和请求访问,进行基于列表划分(List Partitionning)的二次细粒度划分
- 可以基于一致性哈希实现shard->KV Serve Group的映射管理,降低由于KV Serve Group的增加或者减少shard重分配导致的数据迁移
分片分配机制
Shard Manager作为的配置管理查询服务,支持动态增加/删除存储服务器,相关接口如下
Join(servers)
:批量增加存储服务器组Leave(gruopIds)
:批量删除存储服务器组
当kvServerGroup配置发生改变时,Shard Manager会重新在剩余可用Gruop进行shard重分配,基本原则如下:
- 保证shard在所有server Group上的均匀分配:一部分存储平均数个shard,一部分存储平均数+1个shard
- 尽量减少shard迁移:重新计算平均数,存储shard大于平均数的group移动到小于平均数的group
重分配关键代码如下所示:
- 根据group数量,计算平均值
averagerShard
和余数remindShard
,最终分配结果为:remindShard
个server存储averagerShard + 1
个shard,其余group存储averagerShard
个shard - 分为统计重分配shard和重新分配两个步骤,两步骤思路基本相同
1 | //首先统计当前gruop数量 |
分片迁移实现
当系统发生Server Gruop的增加或者删除时,会触发shard在不同server之间的变更,虽然整体上思考较为复杂,但是从单个shard迁移的角度考虑,可以将变更过程定义为:多个同时进行的shard从一个server group 到 另一个server group的过程,如下图所示:
- 对于每个server group来说,在每个配置变更期,要么有一定量移出的shard,要么有一定量等待移入的shard,要么shard不变
在shard迁移过程中,我们必须保证一下几点:
- shard不能丢失:需要迁出shard的server group只有在确保对方成功接收对应shard后,才能安全删除
- shard一旦迁出,不能再提供服务:server group在迁出shard后,不能再服务shard上的数据操作(client端可能为获取到最新配置,导致该问题的出现)
- 在配置切换过程中,系统能够继续提供服务
针对上述问题,为shard定义以下几个状态,迁入迁出过程可以通过状态变更实现:
Normal
:默认正常状态(正常访问操作)WaitIn
:等待迁入状态(无法访问操作)In
:已经迁入,但还向发送端确认状态(正常访问操作)out
:等待迁出状态(无法访问操作)Delelte
:迁出完毕状态,可以进行垃圾回收(无法访问操作)
如下图以一个shard迁移过程中的状态变更为例,分析变更流程:
- shard接收端读取到新配置后,创建空shard,并设置状态为
WaitIn
- 不断地向发送端,拉取shard(由发送端推也一样,只是选择实现了拉),不断重试,直到获取到shard,修改状态:
WaitIn -> In
- 完成状态转换后,向发动端发送确认收到RPC,发动端修改shard状态:
Out -> Delete
- 接收端不断地发送确认收到RPC,直到RPC请求返回,携带有发送端已经将对应shard状态变更为
Delete
或者删除的信息后,停止发送,修改状态:In -> Normal
- 当server Group的所有shard状态变为
Normal
或者Delete
时,完成配置变更 - 上述所有状态变更操作均首先提交到raft,在操作日志成功提交后,执行对应状态变更
上述设计思路的原因:
为什么需要确认消息,才能将对应shard状态变更
Out -> Delete
- 由于发送端是被拉取方,在接收端发送确认收到消息后,发送端才能保证shard已经发送到接收端且成功存储可以删除
为什么确认收到消息,返回需要携带发动端是否将对应shard状态变更为delete
- 接受端发送确认消息的目的是为了通知发送端自己确认收到shard,接收端需要保证发送端收到并且成功记录的自己的确认消息,当发送端shard状态变为delete时,接收端可以确定自己的确认消息成功执行
若不按照上述方式执行,可能存在第一次消息确认成功返回,接收端停止发送,但是发送端由于leader切换等,消息确认操作log未成功提交,导致发送端无限期等待接收端的确认消息
上述设计来源于假设:即使请求成功返回,对应操作不一定成功执行,只有操作结果出现(raft保证操作结果不丢失),才能保证操作执行
- 简单总结:发送端需要保证接收端接收到才能删除shard->接收端需要通知发送端自己收到了->接收端需要保证发送端知道了自己成功接收,才能停止通知
代码实现
在具体设计实现过程中,并未采用状态变更与通信绑定的操作,即一个线程执行了状态变更后,进行对应发送请求,具体考虑如下:
- 通信可能失败,需要不断重试,状态变更线程不应等待通信,应该继续执行其他操作
- 状态变更线程由于互斥需要,往往需要持有锁,由于通信的不确定性(延迟、失败),持有锁时进行RPC通信,可能导致系统性能大幅下降
综合考虑上述设计问题,采用了状态变更线程+周期性状态检测线程的思路
- 状态变更线程:负责读取raft日志,根据日志中操作变更shard状态
- 周期性状态检测线程:周期性遍历shard,根据shard状态按照上述交互图,发送消息
状态变更线程代码如下所示:
- 只负责根据日志操作进行状态变更,不负责状态变更后的操作
1 | func (kv *ShardKV) processConfigOp(commandInterface interface{}) { |
周期性检测线程代码如下所示:
- 遍历所有shard,启动单独线程负责通信,主线程等待所有通信线程退出
- 所有通信线程退出后,主线程遍历所有shard,判断是否退出配置切换状态
1 | func (kv *ShardKV) updateShardState(updateFunc func(int, int, []string), chaeckStatus string) { |
由于后台存在多个周期性运行函数(状态检测、垃圾回收),抽取一个公用的周期循环方法:
1 | func (kv *ShardKV) backRoutine(operation func(), interval int) { |
垃圾回收实现
根据分片迁移实现部分逻辑,仅仅需要回收状态为delete状态的shard,实现逻辑较为简单,采用周期性回收线程的方式,关键代码如下:
1 | func (kv *ShardKV) garbageCollect() { |
遇到的实现问题
1.同一个类型中的RPC请求/结构体中,由于条件不同请求参数不同,导致大量无用参数
在实现过程中遇到了一个操作请求对应多种不同操作的情况,不同操作需要携带不同的操作参数,如下代码所示:
- 一种操作对应四种类型的操作,传输其他操作时需要占用其他三种操作参数的空间
1 | type ShardOp struct { |
针对以上情况,想出了三种解决方案
发送不特殊处理,接收端根据opType进行处理(不做处理),缺点是:多余参数占用空间
修改结构体,使用byte[]存储编码后的参数,发送端编码,接收端根据OpType进行解码,缺点:编解码浪费时间,发送接收端需要确定编码顺序
1
2
3
4
5
6
7type ShardOp struct {
//公共参数
OpType string
ConfigNum int
//修改配置操作参数
parameters []byte
}将结构体拆分,传输不同的结构体,在接收端基于golang反射进行操作,缺点:反射的运行效率较低,影响系统运行效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25type DelShardOp struct {
ConfigNum int //非切换配置需要使用的去重参数
DelShard int //迁移出删除shard的参数
AddShard int //迁移进入shard的参数
Shard ShardData //迁移进入shard的参数
}
type InShardOp struct {
ConfigNum int //非切换配置需要使用的去重参数
AddShard int //迁移进入shard的参数
Shard ShardData //迁移进入shard的参数
}
type AckShardOp struct {
ConfigNum int //非切换配置需要使用的去重参数
AckShard int //迁移进入shard的参数
}
//接收端执行操作
func (kv *ShardKV) processConfigOp(commandInterface interface{}) {
switch opCommand := commandInterface.(type) {
case ConfigOp:
case InShardOp:
case DelShardOp:
case AckShardOp:
}
}
最后综合考虑采用第三种方法,虽然执行效率低,但是实现逻辑上更加清晰,相较于第一种方式减少了空间浪费,降低了网络通信代价
测试与总结
测试过程按照以下方式进行:
执行测试脚本,测试200次,每次输出结果写入到文件中
1
for ((i = 0; i < 200; i++)); do echo $i; (go test) > ./res/$i; grep -nr "FAIL.*" res; done
执行完毕,统计通过数量
1
grep -nr "PASS" res |wc -l
重复执行三轮,共计测试600次
测试结果为:
- 测试所有轮次均通过
- 其中一次的测试输出为:
总结
终于经过了一个多月的视频学习和接近一个月的实验实现,终于完成MIT6.824的学习,现在回看自己的收获可以总结为以下几点:
- 对于分布式系统概念以及涉及到的知识点,有了广泛但不一定深入的了解
- 掌握了基本golang开发和调试的能力,对于golang的特性和语法有了一定程度的理解
- 对于并发编程,RPC通信,线程和进程有了更深的理解