首页 文章资讯内容详情

Java并发编程--ReentrantReadWriteLock

2026-06-01 4 花语

本文内容纲要:

-概述 -使用 -实现原理 -参考资料

概述

ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。

读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有ReadWriteLock实现都必须保证writeLock操作的内存同步效果也要保持与相关readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。

ReentrantReadWriteLock支持以下功能:

1)支持公平和非公平的获取锁的方式;

2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;

3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;

4)读取锁和写入锁都支持锁获取期间的中断;

5)Condition支持。仅写入锁提供了一个Conditon实现;读取锁不支持Conditon,readLock().newCondition()会抛出UnsupportedOperationException。

使用

示例一:利用重入来执行升级缓存后的锁降级

1classCachedData{ 2Objectdata; 3volatilebooleancacheValid;//缓存是否有效 4ReentrantReadWriteLockrwl=newReentrantReadWriteLock(); 5 6voidprocessCachedData(){ 7rwl.readLock().lock();//获取读锁 8//如果缓存无效,更新cache;否则直接使用data 9if(!cacheValid){ 10//Mustreleasereadlockbeforeacquiringwritelock 11//获取写锁前须释放读锁 12rwl.readLock().unlock(); 13rwl.writeLock().lock(); 14//Recheckstatebecauseanotherthreadmighthaveacquired 15//writelockandchangedstatebeforewedid. 16if(!cacheValid){ 17data=... 18cacheValid=true; 19} 20//Downgradebyacquiringreadlockbeforereleasingwritelock 21//锁降级,在释放写锁前获取读锁 22rwl.readLock().lock(); 23rwl.writeLock().unlock();//Unlockwrite,stillholdread 24} 25 26use(data); 27rwl.readLock().unlock();//释放读锁 28} 29}

示例二:使用ReentrantReadWriteLock来提高Collection的并发性

通常在collection数据很多,读线程访问多于写线程并且entail操作的开销高于同步开销时尝试这么做。

1classRWDictionary{ 2privatefinalMap<String,Data>m=newTreeMap<String,Data>(); 3privatefinalReentrantReadWriteLockrwl=newReentrantReadWriteLock(); 4privatefinalLockr=rwl.readLock();//读锁 5privatefinalLockw=rwl.writeLock();//写锁 6 7publicDataget(Stringkey){ 8r.lock(); 9try{returnm.get(key);} 10finally{r.unlock();} 11} 12publicString[]allKeys(){ 13r.lock(); 14try{returnm.keySet().toArray();} 15finally{r.unlock();} 16} 17publicDataput(Stringkey,Datavalue){ 18w.lock(); 19try{returnm.put(key,value);} 20finally{w.unlock();} 21} 22publicvoidclear(){ 23w.lock(); 24try{m.clear();} 25finally{w.unlock();} 26} 27}

实现原理

ReentrantReadWriteLock也是基于AQS实现的,它的自定义同步器(继承AQS)需要在同步状态(一个整型变量state)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。

ReentrantReadWriteLock含有两把锁readerLock和writerLock,其中ReadLock和WriteLock都是内部类。

1/**Innerclassprovidingreadlock*/ 2privatefinalReentrantReadWriteLock.ReadLockreaderLock; 3/**Innerclassprovidingwritelock*/ 4privatefinalReentrantReadWriteLock.WriteLockwriterLock; 5/**Performsallsynchronizationmechanics*/ 6finalSyncsync; 写锁的获取与释放(WriteLock)

写锁是一个可重入的独占锁,使用AQS提供的独占式获取同步状态的策略。

(一)获取写锁

1//获取写锁 2publicvoidlock(){ 3sync.acquire(1); 4} 5 6//AQS实现的独占式获取同步状态方法 7publicfinalvoidacquire(intarg){ 8if(!tryAcquire(arg)&& 9acquireQueued(addWaiter(Node.EXCLUSIVE),arg)) 10selfInterrupt(); 11} 12 13//自定义重写的tryAcquire方法 14protectedfinalbooleantryAcquire(intacquires){ 15/* 16*Walkthrough: 17*1.Ifreadcountnonzeroorwritecountnonzero 18*andownerisadifferentthread,fail. 19*2.Ifcountwouldsaturate,fail.(Thiscanonly 20*happenifcountisalreadynonzero.) 21*3.Otherwise,thisthreadiseligibleforlockif 22*itiseitherareentrantacquireor 23*queuepolicyallowsit.Ifso,updatestate 24*andsetowner. 25*/ 26Threadcurrent=Thread.currentThread(); 27intc=getState(); 28intw=exclusiveCount(c);//取同步状态state的低16位,写同步状态 29if(c!=0){ 30//(Note:ifc!=0andw==0thensharedcount!=0) 31//存在读锁或当前线程不是已获取写锁的线程,返回false 32if(w==0||current!=getExclusiveOwnerThread()) 33returnfalse; 34//判断同一线程获取写锁是否超过最大次数,支持可重入 35if(w+exclusiveCount(acquires)>MAX_COUNT)// 36thrownewError("Maximumlockcountexceeded"); 37//Reentrantacquire 38setState(c+acquires); 39returntrue; 40} 41//此时c=0,读锁和写锁都没有被获取 42if(writerShouldBlock()|| 43!compareAndSetState(c,c+acquires)) 44returnfalse; 45setExclusiveOwnerThread(current); 46returntrue; 47}

从源代码可以看出,获取写锁的步骤如下:

1)判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行2);否则执行5)。

2)判断同步状态state的低16位(w)是否为0。如果w=0,说明其他线程获取了读锁,返回false;如果w!=0,说明其他线程获取了写锁,执行步骤3)。

3)判断获取了写锁是否是当前线程,若不是返回false,否则执行4);

4)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程以获取写锁,更新是线程安全的),返回true。

5)此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,否则返回false。如果需要阻塞则返回false。

writerShouldBlock()表示当前线程是否应该被阻塞。NonfairSync和FairSync中有不同是实现。

1//FairSync中需要判断是否有前驱节点,如果有则返回false,否则返回true。遵循FIFO 2finalbooleanwriterShouldBlock(){ 3returnhasQueuedPredecessors(); 4} 5 6//NonfairSync中直接返回false,可插队。 7finalbooleanwriterShouldBlock(){ 8returnfalse;//writerscanalwaysbarge 9}

(二)释放写锁

1//写锁释放 2publicvoidunlock(){ 3sync.release(1); 4} 5 6//AQS提供独占式释放同步状态的方法 7publicfinalbooleanrelease(intarg){ 8if(tryRelease(arg)){ 9Nodeh=head; 10if(h!=null&&h.waitStatus!=0) 11unparkSuccessor(h); 12returntrue; 13} 14returnfalse; 15} 16 17//自定义重写的tryRelease方法 18protectedfinalbooleantryRelease(intreleases){ 19if(!isHeldExclusively()) 20thrownewIllegalMonitorStateException(); 21intnextc=getState()-releases;//同步状态减去releases 22//判断同步状态的低16位(写同步状态)是否为0,如果为0则返回true,否则返回false. 23//因为支持可重入 24booleanfree=exclusiveCount(nextc)==0; 25if(free) 26setExclusiveOwnerThread(null); 27setState(nextc);//以获取写锁,不需要其他同步措施,是线程安全的 28returnfree; 29} 读锁的获取与释放(ReadLock)

读锁是一个可重入的共享锁,采用AQS提供的共享式获取同步状态的策略。

(一)获取读锁

1publicvoidlock(){ 2sync.acquireShared(1); 3} 4 5//使用AQS提供的共享式获取同步状态的方法 6publicfinalvoidacquireShared(intarg){ 7if(tryAcquireShared(arg)<0) 8doAcquireShared(arg); 9} 10 11//自定义重写的tryAcquireShared方法,参数是unused,因为读锁的重入计数是内部维护的 12protectedfinalinttryAcquireShared(intunused){ 13/* 14*Walkthrough: 15*1.Ifwritelockheldbyanotherthread,fail. 16*2.Otherwise,thisthreadiseligiblefor 17*lockwrtstate,soaskifitshouldblock 18*becauseofqueuepolicy.Ifnot,try 19*tograntbyCASingstateandupdatingcount. 20*Notethatstepdoesnotcheckforreentrant 21*acquires,whichispostponedtofullversion 22*toavoidhavingtocheckholdcountin 23*themoretypicalnon-reentrantcase. 24*3.Ifstep2failseitherbecausethread 25*apparentlynoteligibleorCASfailsorcount 26*saturated,chaintoversionwithfullretryloop. 27*/ 28Threadcurrent=Thread.currentThread(); 29intc=getState(); 30//exclusiveCount(c)取低16位写锁。存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败。 31if(exclusiveCount(c)!=0&& 32getExclusiveOwnerThread()!=current) 33return-1; 34intr=sharedCount(c);//取高16位读锁, 35//readerShouldBlock()用来判断当前线程是否应该被阻塞 36if(!readerShouldBlock()&& 37r<MAX_COUNT&&//MAX_COUNT为获取读锁的最大数量,为16位的最大值 38compareAndSetState(c,c+SHARED_UNIT)){ 39//firstReader是不会放到readHolds里的,这样,在读锁只有一个的情况下,就避免了查找readHolds。 40if(r==0){//是firstReader,计数不会放入readHolds。 41firstReader=current; 42firstReaderHoldCount=1; 43}elseif(firstReader==current){//firstReader重入 44firstReaderHoldCount++; 45}else{ 46//非firstReader读锁重入计数更新 47HoldCounterrh=cachedHoldCounter;//读锁重入计数缓存,基于ThreadLocal实现 48if(rh==null||rh.tid!=current.getId()) 49cachedHoldCounter=rh=readHolds.get(); 50elseif(rh.count==0) 51readHolds.set(rh); 52rh.count++; 53} 54return1; 55} 56//第一次获取读锁失败,有两种情况: 57//1)没有写锁被占用时,尝试通过一次CAS去获取锁时,更新失败(说明有其他读锁在申请) 58//2)当前线程占有写锁,并且有其他写锁在当前线程的下一个节点等待获取写锁,除非当前线程的下一个节点被取消,否则fullTryAcquireShared也获取不到读锁 59returnfullTryAcquireShared(current); 60}

从源代码可以看出,获取读锁的大致步骤如下:

1)通过同步状态低16位判断,如果存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败;否则执行步骤2)。

2)通过readerShouldBlock判断当前线程是否应该被阻塞,如果不应该阻塞则尝试CAS同步状态;否则执行3)。

3)第一次获取读锁失败,通过fullTryAcquireShared再次尝试获取读锁。

readerShouldBlock方法用来判断当前线程是否应该被阻塞,NonfairSync和FairSync中有不同是实现。

1//FairSync中需要判断是否有前驱节点,如果有则返回false,否则返回true。遵循FIFO 2finalbooleanreaderShouldBlock(){ 3returnhasQueuedPredecessors(); 4} 5finalbooleanreaderShouldBlock(){ 6returnapparentlyFirstQueuedIsExclusive(); 7} 8//当head节点不为null且head节点的下一个节点s不为null且s是独占模式(写线程)且s的线程不为null时,返回true。 9//目的是不应该让写锁始终等待。作为一个启发式方法用于避免可能的写线程饥饿,这只是一种概率性的作用,因为如果有一个等待的写线程在其他尚未从队列中出队的读线程后面等待,那么新的读线程将不会被阻塞。 10finalbooleanapparentlyFirstQueuedIsExclusive(){ 11Nodeh,s; 12return(h=head)!=null&& 13(s=h.next)!=null&& 14!s.isShared()&& 15s.thread!=null; 16}

fullTryAcquireShared方法

1finalintfullTryAcquireShared(Threadcurrent){ 2/* 3*Thiscodeisinpartredundantwiththatin 4*tryAcquireSharedbutissimpleroverallbynot 5*complicatingtryAcquireSharedwithinteractionsbetween 6*retriesandlazilyreadingholdcounts. 7*/ 8HoldCounterrh=null; 9for(;;){ 10intc=getState(); 11//如果当前线程不是写锁的持有者,直接返回-1,结束尝试获取读锁,需要排队去申请读锁 12if(exclusiveCount(c)!=0){ 13if(getExclusiveOwnerThread()!=current) 14return-1; 15//elseweholdtheexclusivelock;blockinghere 16//wouldcausedeadlock. 17//如果需要阻塞,说明除了当前线程持有写锁外,还有其他线程已经排队在申请写锁,故,即使申请读锁的线程已经持有写锁(写锁内部再次申请读锁,俗称锁降级)还是会失败,因为有其他线程也在申请写锁,此时,只能结束本次申请读锁的请求,转而去排队,否则,将造成死锁。 18}elseif(readerShouldBlock()){ 19//Makesurewerenotacquiringreadlockreentrantly 20if(firstReader==current){ 21//如果当前线程是第一个获取了写锁,那其他线程无法申请写锁 22//assertfirstReaderHoldCount>0; 23}else{ 24//从readHolds中移除当前线程的持有数,然后返回-1,然后去排队获取读锁。 25if(rh==null){ 26rh=cachedHoldCounter; 27if(rh==null||rh.tid!=current.getId()){ 28rh=readHolds.get(); 29if(rh.count==0) 30readHolds.remove(); 31} 32} 33if(rh.count==0) 34return-1; 35} 36} 37if(sharedCount(c)==MAX_COUNT) 38thrownewError("Maximumlockcountexceeded"); 39if(compareAndSetState(c,c+SHARED_UNIT)){ 40//示成功获取读锁,后续就是更新readHolds等内部变量, 41if(sharedCount(c)==0){ 42firstReader=current; 43firstReaderHoldCount=1; 44}elseif(firstReader==current){ 45firstReaderHoldCount++; 46}else{ 47if(rh==null) 48rh=cachedHoldCounter; 49if(rh==null||rh.tid!=current.getId()) 50rh=readHolds.get(); 51elseif(rh.count==0) 52readHolds.set(rh); 53rh.count++; 54cachedHoldCounter=rh;//cacheforrelease 55} 56return1; 57} 58} 59}

(二)释放读锁

1publicvoidunlock(){ 2sync.releaseShared(1); 3} 4 5publicfinalbooleanreleaseShared(intarg){ 6if(tryReleaseShared(arg)){ 7doReleaseShared(); 8returntrue; 9} 10returnfalse; 11} 12 13protectedfinalbooleantryReleaseShared(intunused){ 14Threadcurrent=Thread.currentThread(); 15//更新计数 16if(firstReader==current){ 17//assertfirstReaderHoldCount>0; 18if(firstReaderHoldCount==1) 19firstReader=null; 20else 21firstReaderHoldCount--; 22}else{ 23HoldCounterrh=cachedHoldCounter; 24if(rh==null||rh.tid!=current.getId()) 25rh=readHolds.get(); 26intcount=rh.count; 27if(count<=1){ 28readHolds.remove(); 29if(count<=0) 30throwunmatchedUnlockException(); 31} 32--rh.count; 33} 34//自旋CAS,减去1<<16 35for(;;){ 36intc=getState(); 37intnextc=c-SHARED_UNIT; 38if(compareAndSetState(c,nextc)) 39//Releasingthereadlockhasnoeffectonreaders, 40//butitmayallowwaitingwriterstoproceedif 41//bothreadandwritelocksarenowfree. 42returnnextc==0; 43} 44}

参考资料

(java并发锁ReentrantReadWriteLock读写锁源码分析)http://blog.csdn.net/prestigeding/article/details/53286756

《Java并发编程的艺术》

本文内容总结:概述,使用,实现原理,参考资料,

原文链接:https://www.cnblogs.com/zaizhoumo/p/7782941.html