本文是使用golang实现redis系列的第八篇,将介绍如何在分布式缓存中使用Try-Commit-Catch方式来解决分布式一致性问题。
godis集群的源码在Github:Godis/cluster
在上一篇文章中我们使用一致性hash算法将缓存中的key分散到不同的服务器节点中,从而实现了分布式缓存。随之而来的问题是:一条指令(比如MSET)可能需要多个节点同时执行,可能有些节点成功而另一部分节点失败。
对于使用者而言这种部分成功部分失败的情况非常难以处理,所以我们需要保证MSET操作要么全部成功要么全部失败。
于是问题来了DEL、MSET等命令所涉及的key可能分布在不同的节点中,在集群模式下实现这类涉及多个key的命令最简单的方式当然是For-Each遍历key并向它们所在的节点发送相应的操作指令。以MGET命令的实现为例:
funcMGet(cluster*Cluster,credis.Connection,args[][]byte)redis.Reply{ iflen(args)<2{ returnreply.MakeErrReply("ERRwrongnumberofargumentsformgetcommand") } //从参数列表中取出要读取的key keys:=make([]string,len(args)-1) fori:=1;i<len(args);i++{ keys[i-1]=string(args[i]) } resultMap:=make(map[string][]byte) //计算每个key所在的节点,并按照节点分组 groupMap:=cluster.groupBy(keys) //groupMap的类型为map[string][]string,key是节点的地址,value是keys中属于该节点的key列表 forpeer,group:=rangegroupMap{ //向每个节点发送mget指令,读取分布在它上面的key resp:=cluster.Relay(peer,c,makeArgs("MGET",group...)) ifreply.IsErrorReply(resp){ errReply:=resp.(reply.ErrorReply) returnreply.MakeErrReply(fmt.Sprintf("ERRduringget%soccurs:%v",group[0],errReply.Error())) } arrReply,_:=resp.(*reply.MultiBulkReply) //将每个节点上的结果merge到map中 fori,v:=rangearrReply.Args{ key:=group[i] resultMap[key]=v } } result:=make([][]byte,len(keys)) fori,k:=rangekeys{ result[i]=resultMap[k] } returnreply.MakeMultiBulkReply(result) } //计算key所属的节点,并按节点分组 func(cluster*Cluster)groupBy(keys[]string)map[string][]string{ result:=make(map[string][]string) for_,key:=rangekeys{ //使用一致性hash计算所属节点 peer:=cluster.peerPicker.Get(key) //将key加入到相应节点的分组中 group,ok:=result[peer] if!ok{ group=make([]string,0) } group=append(group,key) result[peer]=group } returnresult }那么MSET命令的实现能否如法炮制呢?答案是否定的。在上面的代码中我们注意到,在向各个节点发送指令时若某个节点读取失败则会直接退出整个MGET执行过程。
若在执行MSET指令时遇到部分节点失败或超时,则会出现部分key设置成功而另一份设置失败的情况。对于缓存使用者而言这种部分成功部分失败的情况非常难以处理,所以我们需要保证MSET操作要么全部成功要么全部失败。
两阶段提交(2-PhaseCommit,2PC)算法是解决我们遇到的一致性问题最简单的算法。在2PC算法中写操作被分为两个阶段来执行:
Prepare阶段
协调者向所有参与者发送事务内容,询问是否可以执行事务操作。在Godis中收到客户端MSET命令的节点是事务的协调者,所有持有相关key的节点都要参与事务。
各参与者锁定事务相关key防止被其它操作修改。各参与者写undolog准备在事务失败后进行回滚。
参与者回复协调者可以提交。若协调者收到所有参与者的YES回复,则准备进行事务提交。若有参与者回复NO或者超时,则准备回滚事务
Commit阶段
协调者向所有参与者发送提交请求 参与者正式提交事务,并在完成后释放相关key的锁。 参与者协调者回复ACK,协调者收到所有参与者的ACK后认为事务提交成功。Rollback阶段
在事务请求阶段若有参与者回复NO或者超时,协调者向所有参与者发出回滚请求 各参与者执行事务回滚,并在完成后释放相关资源。 参与者协调者回复ACK,协调者收到所有参与者的ACK后认为事务回滚成功。2PC是一种简单的一致性协议,它存在一些问题:
单点服务:若协调者突然崩溃则事务流程无法继续进行或者造成状态不一致 无法保证一致性:若协调者第二阶段发送提交请求时崩溃,可能部分参与者受到COMMIT请求提交了事务,而另一部分参与者未受到请求而放弃事务造成不一致现象。 阻塞:为了保证事务完成提交,各参与者在完成第一阶段事务执行后必须锁定相关资源直到正式提交,影响系统的吞吐量。首先我们定义事务的描述结构:
typeTransactionstruct{ idstring//事务ID,由snowflake算法生成 args[][]byte//命令参数 cluster*Cluster connredis.Connection keys[]string//事务中涉及的key undoLogmap[string][]byte//每个key在事务执行前的值,用于回滚事务 }先看事务参与者prepare阶段的操作:
//prepare命令的格式是:PrepareMSetTxIDkey1,key2... //TxID是事务ID,由协调者决定 funcPrepareMSet(cluster*Cluster,credis.Connection,args[][]byte)redis.Reply{ iflen(args)<3{ returnreply.MakeErrReply("ERRwrongnumberofargumentsforpreparemsetcommand") } txId:=string(args[1]) size:=(len(args)-2)/2 keys:=make([]string,size) fori:=0;i<size;i++{ keys[i]=string(args[2*i+2]) } txArgs:=[][]byte{ []byte("MSet"), }//actualargsforcluster.db txArgs=append(txArgs,args[2:]...) tx:=NewTransaction(cluster,c,txId,txArgs,keys)//创建新事务 cluster.transactions.Put(txId,tx)//存储到节点的事务列表中 err:=tx.prepare()//准备事务 iferr!=nil{ returnreply.MakeErrReply(err.Error()) } return&reply.OkReply{} }实际的准备操作在tx.prepare()中:
func(tx*Transaction)prepare()error{ tx.mu.Lock() defertx.mu.Unlock() //锁定涉及的key避免并发问题 tx.writeKeys,tx.readKeys=godis.GetRelatedKeys(tx.cmdLine) tx.lockKeys() //准备undoLog tx.undoLog=tx.cluster.db.GetUndoLogs(tx.dbIndex,tx.cmdLine) tx.status=preparedStatus taskKey:=genTaskKey(tx.id) //添加hook,回滚超时未提交的事务 timewheel.Delay(maxLockTime,taskKey,func(){ iftx.status==preparedStatus{ logger.Info("aborttransaction:"+tx.id) _=tx.rollback() } }) returnnil }看看协调者在做什么:
funcMSet(cluster*Cluster,credis.Connection,args[][]byte)redis.Reply{ //解析参数 argCount:=len(args)-1 ifargCount%2!=0||argCount<1{ returnreply.MakeErrReply("ERRwrongnumberofargumentsformsetcommand") } size:=argCount/2 keys:=make([]string,size) valueMap:=make(map[string]string) fori:=0;i<size;i++{ keys[i]=string(args[2*i+1]) valueMap[keys[i]]=string(args[2*i+2]) } //找到所属的节点 groupMap:=cluster.groupBy(keys) iflen(groupMap)==1{//dofast //若所有的key都在同一个节点直接执行,不使用较慢的2pc算法 forpeer:=rangegroupMap{ returncluster.Relay(peer,c,args) } } //开始准备阶段 varerrReplyredis.Reply txId:=cluster.idGenerator.NextId()//使用snowflake算法决定事务ID txIdStr:=strconv.FormatInt(txId,10) rollback:=false //向所有参与者发送prepare请求 forpeer,group:=rangegroupMap{ peerArgs:=[]string{txIdStr} for_,k:=rangegroup{ peerArgs=append(peerArgs,k,valueMap[k]) } varrespredis.Reply ifpeer==cluster.self{ resp=PrepareMSet(cluster,c,makeArgs("PrepareMSet",peerArgs...)) }else{ resp=cluster.Relay(peer,c,makeArgs("PrepareMSet",peerArgs...)) } ifreply.IsErrorReply(resp){ errReply=resp rollback=true break } } ifrollback{ //若prepare过程出错则执行回滚 RequestRollback(cluster,c,txId,groupMap) }else{ _,errReply=RequestCommit(cluster,c,txId,groupMap) rollback=errReply!=nil } if!rollback{ return&reply.OkReply{} } returnerrReply }事务参与者提交本地事务:
funcCommit(cluster*Cluster,credis.Connection,args[][]byte)redis.Reply{ iflen(args)!=2{ returnreply.MakeErrReply("ERRwrongnumberofargumentsforcommitcommand") } //读取事务信息 txId:=string(args[1]) raw,ok:=cluster.transactions.Get(txId) if!ok{ returnreply.MakeIntReply(0) } tx,_:=raw.(*Transaction) //在提交成功后解锁key deferfunc(){ cluster.db.UnLocks(tx.keys...) tx.status=CommitedStatus //cluster.transactions.Remove(tx.id)//cannotremove,mayrollbackaftercommit }() cmd:=strings.ToLower(string(tx.args[0])) varresultredis.Reply ifcmd=="del"{ result=CommitDel(cluster,c,tx) }elseifcmd=="mset"{ result=CommitMSet(cluster,c,tx) } //提交失败 ifreply.IsErrorReply(result){ err2:=tx.rollback() returnreply.MakeErrReply(fmt.Sprintf("erroccurswhenrollback:%v,originerr:%s",err2,result)) } returnresult } //执行操作 funcCommitMSet(cluster*Cluster,credis.Connection,tx*Transaction)redis.Reply{ size:=len(tx.args)/2 keys:=make([]string,size) values:=make([][]byte,size) fori:=0;i<size;i++{ keys[i]=string(tx.args[2*i+1]) values[i]=tx.args[2*i+2] } fori,key:=rangekeys{ value:=values[i] cluster.db.Put(key,&db.DataEntity{Data:value}) } cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args)) return&reply.OkReply{} }协调者的逻辑也很简单:
funcRequestCommit(cluster*Cluster,credis.Connection,txIdint64,peersmap[string][]string)([]redis.Reply,reply.ErrorReply){ varerrReplyreply.ErrorReply txIdStr:=strconv.FormatInt(txId,10) respList:=make([]redis.Reply,0,len(peers)) forpeer:=rangepeers{ varrespredis.Reply ifpeer==cluster.self{ resp=Commit(cluster,c,makeArgs("commit",txIdStr)) }else{ resp=cluster.Relay(peer,c,makeArgs("commit",txIdStr)) } ifreply.IsErrorReply(resp){ errReply=resp.(reply.ErrorReply) break } respList=append(respList,resp) } iferrReply!=nil{ RequestRollback(cluster,c,txId,peers) returnnil,errReply } returnrespList,nil }回滚本地事务:
funcRollback(cluster*Cluster,credis.Connection,args[][]byte)redis.Reply{ iflen(args)!=2{ returnreply.MakeErrReply("ERRwrongnumberofargumentsforrollbackcommand") } txId:=string(args[1]) raw,ok:=cluster.transactions.Get(txId) if!ok{ returnreply.MakeIntReply(0) } tx,_:=raw.(*Transaction) err:=tx.rollback() iferr!=nil{ returnreply.MakeErrReply(err.Error()) } returnreply.MakeIntReply(1) } func(tx*Transaction)rollback()error{ forkey,blob:=rangetx.undoLog{ iflen(blob)>0{ entity:=&db.DataEntity{} err:=gob.UnMarshal(blob,entity)//反序列化事务前的快照 iferr!=nil{ returnerr } tx.cluster.db.Put(key,entity)//写入事务前的数据 }else{ tx.cluster.db.Remove(key)//若事务开始之前key不存在则将其删除 } } iftx.status!=CommitedStatus{ tx.cluster.db.UnLocks(tx.keys...) } tx.status=RollbackedStatus returnnil }协调者的逻辑与commit类似:
funcRequestRollback(cluster*Cluster,credis.Connection,txIdint64,peersmap[string][]string){ txIdStr:=strconv.FormatInt(txId,10) forpeer:=rangepeers{ ifpeer==cluster.self{ Rollback(cluster,c,makeArgs("rollback",txIdStr)) }else{ cluster.Relay(peer,c,makeArgs("rollback",txIdStr)) } } }本文内容总结:MSET命令在集群模式下的问题,两阶段提交,Prepare阶段,Commit阶段,Rollback,
原文链接:https://www.cnblogs.com/Finley/p/14079108.html