Java是一門面向對象編程語言,可以編寫桌面應用程序、Web應用程序、分布式系統和嵌入式系統應用程序。
ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支持并發操作,所以要復雜一些。
整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。注意,行文中,我很多地方用了“槽”來代表一個 segment。
簡單理解就是,ConcurrentHashMap 是一個 Segment 數組,Segment 通過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是線程安全的,也就實現了全局的線程安全。
concurrencyLevel:并行級別、并發數、Segment 數,怎么翻譯不重要,理解它。默認是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支持 16 個線程并發寫,只要它們的操作分別分布在不同的 Segment 上。這個值可以在初始化的時候設置為其他值,但是一旦初始化以后,它是不可以擴容的。
再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證線程安全,所以處理起來要麻煩些。
initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。
loadFactor:負載因子,之前我們說了,Segment 數組不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。
publicConcurrentHashMap(intinitialCapacity,floatloadFactor,intconcurrencyLevel){if(!(loadFactor>0)||initialCapacity<0||concurrencyLevel<=0)thrownewIllegalArgumentException();if(concurrencyLevel>MAX_SEGMENTS)concurrencyLevel=MAX_SEGMENTS;//Findpower-of-twosizesbestmatchingargumentsintsshift=0;intssize=1;//計算并行級別ssize,因為要保持并行級別是2的n次方while(ssize<concurrencyLevel){++sshift;ssize<<=1;}//我們這里先不要那么燒腦,用默認值,concurrencyLevel為16,sshift為4//那么計算出segmentShift為28,segmentMask為15,后面會用到這兩個值this.segmentShift=32-sshift;this.segmentMask=ssize-1;if(initialCapacity>MAXIMUM_CAPACITY)initialCapacity=MAXIMUM_CAPACITY;//initialCapacity是設置整個map初始的大小,//這里根據initialCapacity計算Segment數組中每個位置可以分到的大小//如initialCapacity為64,那么每個Segment或稱之為"槽"可以分到4個intc=initialCapacity/ssize;if(c*ssize<initialCapacity)++c;//默認MIN_SEGMENT_TABLE_CAPACITY是2,這個值也是有講究的,因為這樣的話,對于具體的槽上,//插入一個元素不至于擴容,插入第二個的時候才會擴容intcap=MIN_SEGMENT_TABLE_CAPACITY;while(cap<c)cap<<=1;//創建Segment數組,//并創建數組的第一個元素segment[0]Segment<K,V>s0=newSegment<K,V>(loadFactor,(int)(cap*loadFactor),(HashEntry<K,V>[])newHashEntry[cap]);Segment<K,V>[]ss=(Segment<K,V>[])newSegment[ssize];//往數組寫入segment[0]UNSAFE.putOrderedObject(ss,SBASE,s0);//orderedwriteofsegments[0]this.segments=ss;}
初始化完成,我們得到了一個 Segment 數組。
我們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那么初始化完成后:
Segment 數組長度為 16,不可以擴容
Segment[i] 的默認大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以后插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
這里初始化了 segment[0],其他位置還是 null,至于為什么要初始化 segment[0],后面的代碼會介紹
當前 segmentShift 的值為 32 - 4 = 28,segmentMask 為 16 - 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到
我們先看 put 的主流程,對于其中的一些關鍵細節操作,后面會進行詳細介紹。
publicVput(Kkey,Vvalue){Segment<K,V>s;if(value==null)thrownewNullPointerException();//1.計算key的hash值inthash=hash(key);//2.根據hash值找到Segment數組中的位置j//hash是32位,無符號右移segmentShift(28)位,剩下高4位,//然后和segmentMask(15)做一次與操作,也就是說j是hash值的高4位,也就是槽的數組下標intj=(hash>>>segmentShift)&segmentMask;//剛剛說了,初始化的時候初始化了segment[0],但是其他位置還是null,//ensureSegment(j)對segment[j]進行初始化if((s=(Segment<K,V>)UNSAFE.getObject//nonvolatile;recheck(segments,(j<<SSHIFT)+SBASE))==null)//inensureSegments=ensureSegment(j);//3.插入新值到槽s中returns.put(key,hash,value,false);}
第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之后就是 Segment 內部的 put 操作了。
Segment 內部是由 數組+鏈表 組成的。
finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){//在往該segment寫入前,需要先獲取該segment的獨占鎖//先看主流程,后面還會具體介紹這部分內容HashEntry<K,V>node=tryLock()?null:scanAndLockForPut(key,hash,value);VoldValue;try{//這個是segment內部的數組HashEntry<K,V>[]tab=table;//再利用hash值,求應該放置的數組下標intindex=(tab.length-1)&hash;//first是數組該位置處的鏈表的表頭HashEntry<K,V>first=entryAt(tab,index);//下面這串for循環雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個鏈表這兩種情況for(HashEntry<K,V>e=first;;){if(e!=null){Kk;if((k=e.key)==key||(e.hash==hash&&key.equals(k))){oldValue=e.value;if(!onlyIfAbsent){//覆蓋舊值e.value=value;++modCount;}break;}//繼續順著鏈表走e=e.next;}else{//node到底是不是null,這個要看獲取鎖的過程,不過和這里都沒有關系。//如果不為null,那就直接將它設置為鏈表表頭;如果是null,初始化并設置為鏈表表頭。if(node!=null)node.setNext(first);elsenode=newHashEntry<K,V>(hash,key,value,first);intc=count+1;//如果超過了該segment的閾值,這個segment需要擴容if(c>threshold&&tab.length<MAXIMUM_CAPACITY)rehash(node);//擴容后面也會具體分析else//沒有達到閾值,將node放到數組tab的index位置,//其實就是將新的節點設置成原鏈表的表頭setEntryAt(tab,index,node);++modCount;count=c;oldValue=null;break;}}}finally{//解鎖unlock();}returnoldValue;}
整體流程還是比較簡單的,由于有獨占鎖的保護,所以 segment 內部的操作并不復雜。至于這里面的并發問題,我們稍后再進行介紹。
到這里 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對于其他槽來說,在插入第一個值的時候進行初始化。
這里需要考慮并發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。
privateSegment<K,V>ensureSegment(intk){finalSegment<K,V>[]ss=this.segments;longu=(k<<SSHIFT)+SBASE;//rawoffsetSegment<K,V>seg;if((seg=(Segment<K,V>)UNSAFE.getObjectVolatile(ss,u))==null){//這里看到為什么之前要初始化segment[0]了,//使用當前segment[0]處的數組長度和負載因子來初始化segment[k]//為什么要用“當前”,因為segment[0]可能早就擴容過了Segment<K,V>proto=ss[0];intcap=proto.table.length;floatlf=proto.loadFactor;intthreshold=(int)(cap*lf);//初始化segment[k]內部的數組HashEntry<K,V>[]tab=(HashEntry<K,V>[])newHashEntry[cap];if((seg=(Segment<K,V>)UNSAFE.getObjectVolatile(ss,u))==null){//再次檢查一遍該槽是否被其他線程初始化了。Segment<K,V>s=newSegment<K,V>(lf,threshold,tab);//使用while循環,內部用CAS,當前線程成功設值或其他線程成功設值后,退出while((seg=(Segment<K,V>)UNSAFE.getObjectVolatile(ss,u))==null){if(UNSAFE.compareAndSwapObject(ss,u,null,seg=s))break;}}}returnseg;}
總的來說,ensureSegment(int k) 比較簡單,對于并發操作使用 CAS 進行控制。
我沒搞懂這里為什么要搞一個 while 循環,CAS 失敗不就代表有其他線程成功了嗎,為什么要再進行判斷?
感謝評論區的李子木,如果當前線程 CAS 失敗,這里的 while 循環是為了將 seg 賦值返回。
前面我們看到,在往某個 segment 中 put 的時候,首先會調用 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨占鎖,如果失敗,那么進入到 scanAndLockForPut 這個方法來獲取鎖。
下面我們來具體分析這個方法中是怎么控制加鎖的。
privateHashEntry<K,V>scanAndLockForPut(Kkey,inthash,Vvalue){HashEntry<K,V>first=entryForHash(this,hash);HashEntry<K,V>e=first;HashEntry<K,V>node=null;intretries=-1;//negativewhilelocatingnode//循環獲取鎖while(!tryLock()){HashEntry<K,V>f;//torecheckfirstbelowif(retries<0){if(e==null){if(node==null)//speculativelycreatenode//進到這里說明數組該位置的鏈表是空的,沒有任何元素//當然,進到這里的另一個原因是tryLock()失敗,所以該槽存在并發,不一定是該位置node=newHashEntry<K,V>(hash,key,value,null);retries=0;}elseif(key.equals(e.key))retries=0;else//順著鏈表往下走e=e.next;}//重試次數如果超過MAX_SCAN_RETRIES(單核1多核64),那么不搶了,進入到阻塞隊列等待鎖//lock()是阻塞方法,直到獲取鎖后返回elseif(++retries>MAX_SCAN_RETRIES){lock();break;}elseif((retries&1)==0&&//這個時候是有大問題了,那就是有新的元素進到了鏈表,成為了新的表頭//所以這邊的策略是,相當于重新走一遍這個scanAndLockForPut方法(f=entryForHash(this,hash))!=first){e=first=f;//re-traverseifentrychangedretries=-1;}}returnnode;}
這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨占鎖。
這個方法就是看似復雜,但是其實就是做了一件事,那就是獲取該 segment 的獨占鎖,如果需要的話順便實例化了一下 node。
重復一下,segment 數組不能擴容,擴容是 segment 數組某個位置內部的數組 HashEntry[] 進行擴容,擴容后,容量為原來的 2 倍。
首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那么先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。
該方法不需要考慮并發,因為到這里的時候,是持有該 segment 的獨占鎖的。
//方法參數上的node是這次擴容后,需要添加到新的數組中的數據。privatevoidrehash(HashEntry<K,V>node){HashEntry<K,V>[]oldTable=table;intoldCapacity=oldTable.length;//2倍intnewCapacity=oldCapacity<<1;threshold=(int)(newCapacity*loadFactor);//創建新數組HashEntry<K,V>[]newTable=(HashEntry<K,V>[])newHashEntry[newCapacity];//新的掩碼,如從16擴容到32,那么sizeMask為31,對應二進制‘000...00011111'intsizeMask=newCapacity-1;//遍歷原數組,老套路,將原數組位置i處的鏈表拆分到新數組位置i和i+oldCap兩個位置for(inti=0;i<oldCapacity;i++){//e是鏈表的第一個元素HashEntry<K,V>e=oldTable[i];if(e!=null){HashEntry<K,V>next=e.next;//計算應該放置在新數組中的位置,//假設原數組長度為16,e在oldTable[3]處,那么idx只可能是3或者是3+16=19intidx=e.hash&sizeMask;if(next==null)//該位置處只有一個元素,那比較好辦newTable[idx]=e;else{//Reuseconsecutivesequenceatsameslot//e是鏈表表頭HashEntry<K,V>lastRun=e;//idx是當前鏈表的頭結點e的新位置intlastIdx=idx;//下面這個for循環會找到一個lastRun節點,這個節點之后的所有元素是將要放到一起的for(HashEntry<K,V>last=next;last!=null;last=last.next){intk=last.hash&sizeMask;if(k!=lastIdx){lastIdx=k;lastRun=last;}}//將lastRun及其之后的所有節點組成的這個鏈表放到lastIdx這個位置newTable[lastIdx]=lastRun;//下面的操作是處理lastRun之前的節點,//這些節點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中for(HashEntry<K,V>p=e;p!=lastRun;p=p.next){Vv=p.value;inth=p.hash;intk=h&sizeMask;HashEntry<K,V>n=newTable[k];newTable[k]=newHashEntry<K,V>(h,p.key,v,n);}}}}//將新來的node放到新數組中剛剛的兩個鏈表之一的頭部intnodeIndex=node.hash&sizeMask;//addthenewnodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex]=node;table=newTable;}
這里的擴容比之前的 HashMap 要復雜一些,代碼難懂一點。上面有兩個挨著的 for 循環,第一個 for 有什么用呢?
仔細一看發現,如果沒有第一個 for 循環,也是可以工作的,但是,這個 for 循環下來,如果 lastRun 的后面還有比較多的節點,那么這次就是值得的。因為我們只需要克隆 lastRun 前面的節點,后面的一串節點跟著 lastRun 走就是了,不需要做任何操作。
我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是鏈表的最后一個元素或者很靠后的元素,那么這次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,如果使用默認的閾值,大約只有 1/6 的節點需要克隆。
相對于 put 來說,get 真的不要太簡單。
計算 hash 值,找到 segment 數組中的具體位置,或我們前面用的“槽”
槽中也是一個數組,根據 hash 找到數組中具體的位置
到這里是鏈表了,順著鏈表進行查找即可
publicVget(Objectkey){Segment<K,V>s;//manuallyintegrateaccessmethodstoreduceoverheadHashEntry<K,V>[]tab;//1.hash值inth=hash(key);longu=(((h>>>segmentShift)&segmentMask)<<SSHIFT)+SBASE;//2.根據hash找到對應的segmentif((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab=s.table)!=null){//3.找到segment內部數組相應位置的鏈表,遍歷for(HashEntry<K,V>e=(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)(((tab.length-1)&h))<<TSHIFT)+TBASE);e!=null;e=e.next){Kk;if((k=e.key)==key||(e.hash==h&&key.equals(k)))returne.value;}}returnnull;}
現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮并發問題。
添加節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨占鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。
put 操作的線程安全性
初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的數組。
添加節點到鏈表的操作是插入到表頭的,所以,如果這個時候 get 操作在鏈表遍歷的過程已經到了中間,是不會影響的。當然,另一個并發問題就是 get 操作在 put 之后,需要保證剛剛插入表頭的節點被讀取,這個依賴于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
擴容。擴容是新創建了數組,然后進行遷移數據,最后面將 newTable 設置給屬性 table。所以,如果 get 操作此時也在進行,那么也沒關系,如果 get 先行,那么就是在舊的 table 上做查詢操作;而 put 先行,那么 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
remove 操作的線程安全性
remove 操作我們沒有分析源碼,所以這里說的讀者感興趣的話還是需要到源碼中去求實一下的。
get 操作需要遍歷鏈表,但是 remove 操作會"破壞"鏈表。
如果 remove 破壞的節點 get 操作已經過去了,那么這里不存在任何問題。
如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那么需要將頭結點的 next 設置為數組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 并不能提供數組內部操作的可見性保證,所以源碼中使用了 UNSAFE 來操作數組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的后繼節點接到前驅節點中,這里的并發保證就是 next 屬性是 volatile 的。
Java7 中實現的 ConcurrentHashMap 說實話還是比較復雜的,Java8 對 ConcurrentHashMap 進行了比較大的改動。建議讀者可以參考 Java8 中 HashMap 相對于 Java7 HashMap 的改動,對于 ConcurrentHashMap,Java8 也引入了紅黑樹。
說實話,Java8 ConcurrentHashMap 源碼真心不簡單,最難的在于擴容,數據遷移操作不容易看懂。
我們先用一個示意圖來描述下其結構:
結構上和 Java8 的 HashMap 基本上一樣,不過它要保證線程安全性,所以在源碼上確實要復雜一些。
//這構造函數里,什么都不干publicConcurrentHashMap(){}
publicConcurrentHashMap(intinitialCapacity){if(initialCapacity<0)thrownewIllegalArgumentException();intcap=((initialCapacity>=(MAXIMUM_CAPACITY>>>1))?MAXIMUM_CAPACITY:tableSizeFor(initialCapacity+(initialCapacity>>>1)+1));this.sizeCtl=cap;}
這個初始化方法有點意思,通過提供初始容量,計算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 為 10,那么得到 sizeCtl 為 16,如果 initialCapacity 為 11,得到 sizeCtl 為 32。
sizeCtl 這個屬性使用的場景很多,不過只要跟著文章的思路來,就不會被它搞暈了。
如果你愛折騰,也可以看下另一個有三個參數的構造方法,這里我就不說了,大部分時候,我們會使用無參構造函數進行實例化,我們也按照這個思路來進行源碼分析吧。
仔細地一行一行代碼看下去:
publicVput(Kkey,Vvalue){returnputVal(key,value,false);}
finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){if(key==null||value==null)thrownewNullPointerException();//得到hash值inthash=spread(key.hashCode());//用于記錄相應鏈表的長度intbinCount=0;for(Node<K,V>[]tab=table;;){Node<K,V>f;intn,i,fh;//如果數組"空",進行數組初始化if(tab==null||(n=tab.length)==0)//初始化數組,后面會詳細介紹tab=initTable();//找該hash值對應的數組下標,得到第一個節點felseif((f=tabAt(tab,i=(n-1)&hash))==null){//如果數組該位置為空,//用一次CAS操作將這個新值放入其中即可,這個put操作差不多就結束了,可以拉到最后面了//如果CAS失敗,那就是有并發操作,進到下一個循環就好了if(casTabAt(tab,i,null,newNode<K,V>(hash,key,value,null)))break;//nolockwhenaddingtoemptybin}//hash居然可以等于MOVED,這個需要到后面才能看明白,不過從名字上也能猜到,肯定是因為在擴容elseif((fh=f.hash)==MOVED)//幫助數據遷移,這個等到看完數據遷移部分的介紹后,再理解這個就很簡單了tab=helpTransfer(tab,f);else{//到這里就是說,f是該位置的頭結點,而且不為空VoldVal=null;//獲取數組該位置的頭結點的監視器鎖synchronized(f){if(tabAt(tab,i)==f){if(fh>=0){//頭結點的hash值大于0,說明是鏈表//用于累加,記錄鏈表的長度binCount=1;//遍歷鏈表for(Node<K,V>e=f;;++binCount){Kek;//如果發現了"相等"的key,判斷是否要進行值覆蓋,然后也就可以break了if(e.hash==hash&&((ek=e.key)==key||(ek!=null&&key.equals(ek)))){oldVal=e.val;if(!onlyIfAbsent)e.val=value;break;}//到了鏈表的最末端,將這個新值放到鏈表的最后面Node<K,V>pred=e;if((e=e.next)==null){pred.next=newNode<K,V>(hash,key,value,null);break;}}}elseif(finstanceofTreeBin){//紅黑樹Node<K,V>p;binCount=2;//調用紅黑樹的插值方法插入新節點if((p=((TreeBin<K,V>)f).putTreeVal(hash,key,value))!=null){oldVal=p.val;if(!onlyIfAbsent)p.val=value;}}}}if(binCount!=0){//判斷是否要將鏈表轉換為紅黑樹,臨界值和HashMap一樣,也是8if(binCount>=TREEIFY_THRESHOLD)//這個方法和HashMap中稍微有一點點不同,那就是它不是一定會進行紅黑樹轉換,//如果當前數組的長度小于64,那么會選擇進行數組擴容,而不是轉換為紅黑樹//具體源碼我們就不看了,擴容部分后面說treeifyBin(tab,i);if(oldVal!=null)returnoldVal;break;}}}//addCount(1L,binCount);returnnull;}
put 的主流程看完了,但是至少留下了幾個問題,第一個是初始化,第二個是擴容,第三個是幫助數據遷移,這些我們都會在后面進行一一介紹。
這個比較簡單,主要就是初始化一個合適大小的數組,然后會設置 sizeCtl。
初始化方法中的并發問題是通過對 sizeCtl 進行一個 CAS 操作來控制的。
privatefinalNode<K,V>[]initTable(){Node<K,V>[]tab;intsc;while((tab=table)==null||tab.length==0){//初始化的"功勞"被其他線程"搶去"了if((sc=sizeCtl)<0)Thread.yield();//lostinitializationrace;justspin//CAS一下,將sizeCtl設置為-1,代表搶到了鎖elseif(U.compareAndSwapInt(this,SIZECTL,sc,-1)){try{if((tab=table)==null||tab.length==0){//DEFAULT_CAPACITY默認初始容量是16intn=(sc>0)?sc:DEFAULT_CAPACITY;//初始化數組,長度為16或初始化時提供的長度Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];//將這個數組賦值給table,table是volatile的table=tab=nt;//如果n為16的話,那么這里sc=12//其實就是0.75*nsc=n-(n>>>2);}}finally{//設置sizeCtl為sc,我們就當是12吧sizeCtl=sc;}break;}}returntab;}
前面我們在 put 源碼分析也說過,treeifyBin 不一定就會進行紅黑樹轉換,也可能是僅僅做數組擴容。我們還是進行源碼分析吧。
privatefinalvoidtreeifyBin(Node<K,V>[]tab,intindex){Node<K,V>b;intn,sc;if(tab!=null){//MIN_TREEIFY_CAPACITY為64//所以,如果數組長度小于64的時候,其實也就是32或者16或者更小的時候,會進行數組擴容if((n=tab.length)<MIN_TREEIFY_CAPACITY)//后面我們再詳細分析這個方法tryPresize(n<<1);//b是頭結點elseif((b=tabAt(tab,index))!=null&&b.hash>=0){//加鎖synchronized(b){if(tabAt(tab,index)==b){//下面就是遍歷鏈表,建立一顆紅黑樹TreeNode<K,V>hd=null,tl=null;for(Node<K,V>e=b;e!=null;e=e.next){TreeNode<K,V>p=newTreeNode<K,V>(e.hash,e.key,e.val,null,null);if((p.prev=tl)==null)hd=p;elsetl.next=p;tl=p;}//將紅黑樹設置到數組相應位置中setTabAt(tab,index,newTreeBin<K,V>(hd));}}}}}
如果說 Java8 ConcurrentHashMap 的源碼不簡單,那么說的就是擴容操作和遷移操作。
這個方法要完完全全看懂還需要看之后的 transfer 方法,讀者應該提前知道這點。
這里的擴容也是做翻倍擴容的,擴容后數組容量為原來的 2 倍。
//首先要說明的是,方法參數size傳進來的時候就已經翻了倍了privatefinalvoidtryPresize(intsize){//c:size的1.5倍,再加1,再往上取最近的2的n次方。intc=(size>=(MAXIMUM_CAPACITY>>>1))?MAXIMUM_CAPACITY:tableSizeFor(size+(size>>>1)+1);intsc;while((sc=sizeCtl)>=0){Node<K,V>[]tab=table;intn;//這個if分支和之前說的初始化數組的代碼基本上是一樣的,在這里,我們可以不用管這塊代碼if(tab==null||(n=tab.length)==0){n=(sc>c)?sc:c;if(U.compareAndSwapInt(this,SIZECTL,sc,-1)){try{if(table==tab){@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];table=nt;sc=n-(n>>>2);//0.75*n}}finally{sizeCtl=sc;}}}elseif(c<=sc||n>=MAXIMUM_CAPACITY)break;elseif(tab==table){//我沒看懂rs的真正含義是什么,不過也關系不大intrs=resizeStamp(n);if(sc<0){Node<K,V>[]nt;if((sc>>>RESIZE_STAMP_SHIFT)!=rs||sc==rs+1||sc==rs+MAX_RESIZERS||(nt=nextTable)==null||transferIndex<=0)break;//2.用CAS將sizeCtl加1,然后執行transfer方法//此時nextTab不為nullif(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))transfer(tab,nt);}//1.將sizeCtl設置為(rs<<RESIZE_STAMP_SHIFT)+2)//我是沒看懂這個值真正的意義是什么?不過可以計算出來的是,結果是一個比較大的負數//調用transfer方法,此時nextTab參數為nullelseif(U.compareAndSwapInt(this,SIZECTL,sc,(rs<<RESIZE_STAMP_SHIFT)+2))transfer(tab,null);}}}
這個方法的核心在于 sizeCtl 值的操作,首先將其設置為一個負數,然后執行 transfer(tab, null),再下一個循環將 sizeCtl 加 1,并執行 transfer(tab, nt),之后可能是繼續 sizeCtl 加 1,并執行 transfer(tab, nt)。
所以,可能的操作就是執行 1 次 transfer(tab, null) + 多次 transfer(tab, nt),這里怎么結束循環的需要看完 transfer 源碼才清楚。
下面這個方法很點長,將原來的 tab 數組的元素遷移到新的 nextTab 數組中。
雖然我們之前說的 tryPresize 方法中多次調用 transfer 不涉及多線程,但是這個 transfer 方法可以在其他地方被調用,典型地,我們之前在說 put 方法的時候就說過了,請往上看 put 方法,是不是有個地方調用了 helpTransfer 方法,helpTransfer 方法會調用 transfer 方法的。
此方法支持多線程執行,外圍調用此方法的時候,會保證第一個發起數據遷移的線程,nextTab 參數為 null,之后再調用此方法的時候,nextTab 不會為 null。
閱讀源碼之前,先要理解并發操作的機制。原數組長度為 n,所以我們有 n 個遷移任務,讓每個線程每次負責一個小任務是最簡單的,每做完一個任務再檢測是否有其他沒做完的任務,幫助遷移就可以了,而 Doug Lea 使用了一個 stride,簡單理解就是步長,每個線程每次負責遷移其中的一部分,如每次遷移 16 個小任務。所以,我們就需要一個全局的調度者來安排哪個線程執行哪幾個任務,這個就是屬性 transferIndex 的作用。
第一個發起數據遷移的線程會將 transferIndex 指向原數組最后的位置,然后從后往前的 stride 個任務屬于第一個線程,然后將 transferIndex 指向新的位置,再往前的 stride 個任務屬于第二個線程,依此類推。當然,這里說的第二個線程不是真的一定指代了第二個線程,也可以是同一個線程,這個讀者應該能理解吧。其實就是將一個大的遷移任務分為了一個個任務包。
privatefinalvoidtransfer(Node<K,V>[]tab,Node<K,V>[]nextTab){intn=tab.length,stride;//stride在單核下直接等于n,多核模式下為(n>>>3)/NCPU,最小值是16//stride可以理解為”步長“,有n個位置是需要進行遷移的,//將這n個任務分為多個任務包,每個任務包有stride個任務if((stride=(NCPU>1)?(n>>>3)/NCPU:n)<MIN_TRANSFER_STRIDE)stride=MIN_TRANSFER_STRIDE;//subpiderange//如果nextTab為null,先進行一次初始化//前面我們說了,外圍會保證第一個發起遷移的線程調用此方法時,參數nextTab為null//之后參與遷移的線程調用此方法時,nextTab不會為nullif(nextTab==null){try{//容量翻倍Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n<<1];nextTab=nt;}catch(Throwableex){//trytocopewithOOMEsizeCtl=Integer.MAX_VALUE;return;}//nextTable是ConcurrentHashMap中的屬性nextTable=nextTab;//transferIndex也是ConcurrentHashMap的屬性,用于控制遷移的位置transferIndex=n;}intnextn=nextTab.length;//ForwardingNode翻譯過來就是正在被遷移的Node//這個構造方法會生成一個Node,key、value和next都為null,關鍵是hash為MOVED//后面我們會看到,原數組中位置i處的節點完成遷移工作后,//就會將位置i處設置為這個ForwardingNode,用來告訴其他線程該位置已經處理過了//所以它其實相當于是一個標志。ForwardingNode<K,V>fwd=newForwardingNode<K,V>(nextTab);//advance指的是做完了一個位置的遷移工作,可以準備做下一個位置的了booleanadvance=true;booleanfinishing=false;//toensuresweepbeforecommittingnextTab/**下面這個for循環,最難理解的在前面,而要看懂它們,應該先看懂后面的,然后再倒回來看**///i是位置索引,bound是邊界,注意是從后往前for(inti=0,bound=0;;){Node<K,V>f;intfh;//下面這個while真的是不好理解//advance為true表示可以進行下一個位置的遷移了//簡單理解結局:i指向了transferIndex,bound指向了transferIndex-stridewhile(advance){intnextIndex,nextBound;if(--i>=bound||finishing)advance=false;//將transferIndex值賦給nextIndex//這里transferIndex一旦小于等于0,說明原數組的所有位置都有相應的線程去處理了elseif((nextIndex=transferIndex)<=0){i=-1;advance=false;}elseif(U.compareAndSwapInt(this,TRANSFERINDEX,nextIndex,nextBound=(nextIndex>stride?nextIndex-stride:0))){//看括號中的代碼,nextBound是這次遷移任務的邊界,注意,是從后往前bound=nextBound;i=nextIndex-1;advance=false;}}if(i<0||i>=n||i+n>=nextn){intsc;if(finishing){//所有的遷移操作已經完成nextTable=null;//將新的nextTab賦值給table屬性,完成遷移table=nextTab;//重新計算sizeCtl:n是原數組長度,所以sizeCtl得出的值將是新數組長度的0.75倍sizeCtl=(n<<1)-(n>>>1);return;}//之前我們說過,sizeCtl在遷移前會設置為(rs<<RESIZE_STAMP_SHIFT)+2//然后,每有一個線程參與遷移就會將sizeCtl加1,//這里使用CAS操作對sizeCtl進行減1,代表做完了屬于自己的任務if(U.compareAndSwapInt(this,SIZECTL,sc=sizeCtl,sc-1)){//任務結束,方法退出if((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT)return;//到這里,說明(sc-2)==resizeStamp(n)<<RESIZE_STAMP_SHIFT,//也就是說,所有的遷移任務都做完了,也就會進入到上面的if(finishing){}分支了finishing=advance=true;i=n;//recheckbeforecommit}}//如果位置i處是空的,沒有任何節點,那么放入剛剛初始化的ForwardingNode”空節點“elseif((f=tabAt(tab,i))==null)advance=casTabAt(tab,i,null,fwd);//該位置處是一個ForwardingNode,代表該位置已經遷移過了elseif((fh=f.hash)==MOVED)advance=true;//alreadyprocessedelse{//對數組該位置處的結點加鎖,開始處理數組該位置處的遷移工作synchronized(f){if(tabAt(tab,i)==f){Node<K,V>ln,hn;//頭結點的hash大于0,說明是鏈表的Node節點if(fh>=0){//下面這一塊和Java7中的ConcurrentHashMap遷移是差不多的,//需要將鏈表一分為二,//找到原鏈表中的lastRun,然后lastRun及其之后的節點是一起進行遷移的//lastRun之前的節點需要進行克隆,然后分到兩個鏈表中intrunBit=fh&n;Node<K,V>lastRun=f;for(Node<K,V>p=f.next;p!=null;p=p.next){intb=p.hash&n;if(b!=runBit){runBit=b;lastRun=p;}}if(runBit==0){ln=lastRun;hn=null;}else{hn=lastRun;ln=null;}for(Node<K,V>p=f;p!=lastRun;p=p.next){intph=p.hash;Kpk=p.key;Vpv=p.val;if((ph&n)==0)ln=newNode<K,V>(ph,pk,pv,ln);elsehn=newNode<K,V>(ph,pk,pv,hn);}//其中的一個鏈表放在新數組的位置isetTabAt(nextTab,i,ln);//另一個鏈表放在新數組的位置i+nsetTabAt(nextTab,i+n,hn);//將原數組該位置處設置為fwd,代表該位置已經處理完畢,//其他線程一旦看到該位置的hash值為MOVED,就不會進行遷移了setTabAt(tab,i,fwd);//advance設置為true,代表該位置已經遷移完畢advance=true;}elseif(finstanceofTreeBin){//紅黑樹的遷移TreeBin<K,V>t=(TreeBin<K,V>)f;TreeNode<K,V>lo=null,loTail=null;TreeNode<K,V>hi=null,hiTail=null;intlc=0,hc=0;for(Node<K,V>e=t.first;e!=null;e=e.next){inth=e.hash;TreeNode<K,V>p=newTreeNode<K,V>(h,e.key,e.val,null,null);if((h&n)==0){if((p.prev=loTail)==null)lo=p;elseloTail.next=p;loTail=p;++lc;}else{if((p.prev=hiTail)==null)hi=p;elsehiTail.next=p;hiTail=p;++hc;}}//如果一分為二后,節點數少于8,那么將紅黑樹轉換回鏈表ln=(lc<=UNTREEIFY_THRESHOLD)?untreeify(lo):(hc!=0)?newTreeBin<K,V>(lo):t;hn=(hc<=UNTREEIFY_THRESHOLD)?untreeify(hi):(lc!=0)?newTreeBin<K,V>(hi):t;//將ln放置在新數組的位置isetTabAt(nextTab,i,ln);//將hn放置在新數組的位置i+nsetTabAt(nextTab,i+n,hn);//將原數組該位置處設置為fwd,代表該位置已經處理完畢,//其他線程一旦看到該位置的hash值為MOVED,就不會進行遷移了setTabAt(tab,i,fwd);//advance設置為true,代表該位置已經遷移完畢advance=true;}}}}}}
說到底,transfer 這個方法并沒有實現所有的遷移任務,每次調用這個方法只實現了 transferIndex 往前 stride 個位置的遷移工作,其他的需要由外圍來控制。
這個時候,再回去仔細看 tryPresize 方法可能就會更加清晰一些了。
get 方法從來都是最簡單的,這里也不例外:
1.計算 hash 值
2.根據 hash 值找到數組對應位置: (n - 1) & h
3.根據該位置處結點性質進行相應查找
如果該位置為 null,那么直接返回 null 就可以了
如果該位置處的節點剛好就是我們需要的,返回該節點的值即可
如果該位置節點的 hash 值小于 0,說明正在擴容,或者是紅黑樹,后面我們再介紹 find 方法
如果以上 3 條都不滿足,那就是鏈表,進行遍歷比對即可
publicVget(Objectkey){Node<K,V>[]tab;Node<K,V>e,p;intn,eh;Kek;inth=spread(key.hashCode());if((tab=table)!=null&&(n=tab.length)>0&&(e=tabAt(tab,(n-1)&h))!=null){//判斷頭結點是否就是我們需要的節點if((eh=e.hash)==h){if((ek=e.key)==key||(ek!=null&&key.equals(ek)))returne.val;}//如果頭結點的hash小于0,說明正在擴容,或者該位置是紅黑樹elseif(eh<0)//參考ForwardingNode.find(inth,Objectk)和TreeBin.find(inth,Objectk)return(p=e.find(h,key))!=null?p.val:null;//遍歷鏈表while((e=e.next)!=null){if(e.hash==h&&((ek=e.key)==key||(ek!=null&&key.equals(ek))))returne.val;}}returnnull;}
上述就是小編為大家分享的Java8中ConcurrentHashMap的原理是什么了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注本站行業資訊頻道。
本文由 貴州做網站公司 整理發布,部分圖文來源于互聯網,如有侵權,請聯系我們刪除,謝謝!
c語言中正確的字符常量是用一對單引號將一個字符括起表示合法的字符常量。例如‘a’。數值包括整型、浮點型。整型可用十進制,八進制,十六進制。八進制前面要加0,后面...
2022年天津專場考試原定于3月19日舉行,受疫情影響確定延期,但目前延期后的考試時間推遲。 符合報名條件的考生,須在規定時間登錄招考資訊網(www.zha...
:喜歡聽,樂意看。指很受歡迎?!巴卣官Y料”喜聞樂見:[ xǐ wén lè jiàn ]詳細解釋1. 【解釋】:喜歡聽,樂意看。指很受歡迎。2. 【示例】:這是...
手機銀行是什么意思?手機銀行就是指銀行用智能手機為承載體,使客戶在此終端上使用銀行服務的渠道模式。關于手機銀行怎么開通,正常情況下是需要先下載好銀行的手機官方軟件,然后打開銀行軟件,根據提示完善相關資料信息,最后點擊開通就可以啦。手機銀行開通程序是什么?首先打開手機的應用市場,下載相應的銀行軟件,例如,你需要開通建行的手機銀行,就下載建行的官方手機銀行軟件,以此類推。下載好軟件后,打開軟件,允許一...
經濟下行壓力大是什么意思?經濟下行壓力大的意思是指衡量經濟增長的數據指標在不斷的降低,且下降速度還在不斷加快。在發展中國家,肯定會存在機制、體制、結構不合理等一些問題。 但是經濟增長的速度下滑,更多的原因可能是因為外在原因所導致的,比如說短期影響經濟增長的三大因素。經濟下行會給企業和個人帶來哪些影響?第一、經濟下行會使得企業發展緩慢當經濟處于下行通道中,意味著社會的經濟規模增長放緩,對于企業來說,...
近日,網絡名人新友志收到7000萬雪花,邀請42位大牌明星舉辦豪華演唱會的消息成為網絡熱門話題。當然,這么大的手筆可以說是相當驚人了。畢竟這么大的一筆錢,就算是浙江衛視和湖南衛視也要認真考慮一下。最重要的是這么多明星聚集在一起,沒有硬實力是絕對不行的。當然,楚和辛酉之確實很有商業頭腦??梢哉f他們從這個活動中賺了不少錢。很多人也有疑惑。他們不是說有7000多萬嗎?你是怎么回到本身邊的?這里有一個門。...