Key/Value Server实现 KV Server Key/Value Server(键值对服务器)是一种专门用于存储和检索键值对数据的服务器系统。它是NoSQL数据库的一种常见形式,以其简单性、高性能和可扩展性而闻名。
每个客户端通过一个 Clerk 发送 RPC 请求与服务端进行交互,基础的API包括两种:
服务端通过内存中的 Map 记录所有键值对(key: value, version),key 和 value 为 string 类型,version 记录了 key 被写入的次数。
分布式锁 分布式锁是一种在分布式系统中协调多个进程/机器对共享资源进行互斥访问的机制。它的核心目标是确保在多个节点(可能位于不同机器上)并发访问时,同一时间只有一个客户端能持有锁,从而避免数据竞争和不一致问题。在许多分布式应用中,运行在不同机器上的客户端通过键值服务器来协调活动,例如 ZooKeeper 和 Etcd 通过条件式 Put 操作实现这种锁机制。
实现 在 rpc.go 中定义错误类型以及 RPC 通信所需的参数。
rpc.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 type Err string const ( OK = "OK" ErrNoKey = "ErrNoKey" ErrVersion = "ErrVersion" ErrMaybe = "ErrMaybe" ) type Tversion uint64 type PutArgs struct { Key string Value string Version Tversion } type PutReply struct { Err Err } type GetArgs struct { Key string } type GetReply struct { Value string Version Tversion Err Err }
服务端由包含一个 Map 存储键值对,一个互斥锁防止并发冲突。
server.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type KVEntry struct { Value string Version rpc.Tversion } type KVServer struct { mu sync.Mutex KVMap map [string ]*KVEntry } func MakeServer () *KVServer { kv := &KVServer{ KVMap: make (map [string ]*KVEntry) } return kv }
Get 方法返回键对应的值和版本号,当键不存在时返回键不存在的错误;Put 方法在请求的版本号与当前存储的版本号一致时更新值,若请求的键不存在且版本号为0则进行初始化操作。
server.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) { kv.mu.Lock() defer kv.mu.Unlock() entry, exist := kv.KVMap[args.Key] if exist { reply.Value = entry.Value reply.Version = entry.Version reply.Err = rpc.OK return } reply.Err = rpc.ErrNokey } func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) { kv.mu.Lock() defer kv.mu.Unlock() entry, exist := kv.KVMap[args.Key] if !exist { if args.Version > 0 { reply.Err = rpc.ErrNokey return } kv.KVMap[args.Key] = &KVEntry{ Value: args.Value, Version: args.Version + 1 , } reply.Err = rpc.OK return } if entry.Version == args.Version { entry.Value = args.Value entry.Version++ reply.Err = rpc.OK return } reply.Err = rpc.ErrVersion }
客户端通过 Clerk 向服务端发送 RPC 通信。 Get 方法获取一个键对应的当前值和版本号,当键不存在时(reply.Err == rpc.ErrNoKey)返回错误,若是其他类型的错误则不断重试;Put 方法会在版本号一致时更新值,但当网络不可靠时,可能会丢弃 RPC 请求/响应:
因此处理方案为:若重试的 Put 请求 收到 rpc.ErrVersion,客户端应向上层返回 rpc.ErrMaybe,表示请求可能已执行 ;而首次 Put 请求 直接收到 rpc.ErrVersion,则返回 rpc.ErrVersion 明确未执行。
client.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func (ck *Clerk) Get(key string ) (string , rpc.Tversion, rpc.Err) { args := rpc.GetArgs{Key: key} reply := rpc.GetReply{} for { ok := ck.clnt.Call(ck.server, "KVServer.Get" , &args, &reply) if ok { if reply.Err == rpc.ErrNoKey { return "" , 0 , rpc.ErrNoKey } return reply.Value, reply.Version, reply.Err } time.Sleep(time.Millisecond * 100 ) } } func (ck *Clerk) Put(key, value string , version rpc.Tversion) rpc.Err { args := rpc.PutArgs{ Key: key, Value: value, Version: version, } reply := rpc.PutReply{} firstAttempt := true for { ok := ck.clnt.Call(ck.server, "KVServer.Put" , &args, &reply) if ok { switch reply.Err { case rpc.ErrVersion: if firstAttempt { return rpc.ErrVersion } return rpc.ErrMaybe case rpc.ErrNoKey: return rpc.ErrNoKey default : return reply.Err } } firstAttempt = false time.Sleep(time.Millisecond * 100 ) } }
借助上述的 K/V Server 系统的接口(Get / Put)来实现一个基于 CAS(Compare-And-Swap)思想的锁,支持两个方法 Acquire(获取锁)和 Release(释放锁)。同一时间只能有一个客户端成功获取锁,其他客户端必须等待当前持有锁的客户端调用 Release 释放锁后,才能尝试获取。
为了确保锁的获取与释放的原子性,为每个锁生成一个唯一标识,通过特定的 key 将该标识存储在服务端。获取锁时调用 KVServer 中的 Get 方法,若返回的值不为空,则表明该锁已经被其他客户端占用;释放锁时先通过 Get 方法获取值和版本信息,验证该值是否和客户端的标识相等以保证只有锁的拥有者才能释放锁,同时通过版本号实现 CAS,调用 Put 方法将值更新为0表示释放锁。
lock.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 type Lock struct { ck kvtest.IKVClerk key string state string } func MakeLock (ck kvtest.IKVClerk, l string ) *Lock { lk := &Lock{ck: ck} lk.key = "lock:" + l lk.state = kvtest.RandValue(8 ) return lk } func (lk *Lock) Acquire() { for { value, version, err := lk.ck.Get(lk.key) if err == rpc.ErrNoKey || value == "" { err = lk.ck.Put(lk.key, lk.state, version) if err == rpc.ErrMaybe { if lk.checkLockHeld() { return } } if err == rpc.OK { return } } time.Sleep(time.Millisecond * 10 ) } } func (lk *Lock) checkLockHeld() bool { value, _, err := lk.ck.Get(lk.key) if err != rpc.OK { return false } return value == lk.state } func (lk *Lock) Release() { for { value, version, err := lk.ck.Get(lk.key) if err != rpc.OK || value != lk.state { return } err = lk.ck.Put(lk.key, "" , version) if err == rpc.OK { return } time.Sleep(time.Millisecond * 10 ) } }
注意:RPC不可靠 + 状态不明确 → 要用额外确认来补全语义。因此在 Acquire 的过程中,若 Get 返回的错误为 ErrMaybe,必须额外再调用一次 Get 来确认,否则客户端无法确认自己是否加锁成功。