再談AbstractQueuedSynchronizer:獨占模式

關于AbstractQueuedSynchronizer

JDK1.5之后引入了并發包java.util.concurrent,大大提高了Java程序的并發性能。關于java.util.concurrent包我總結如下:

  • AbstractQueuedSynchronizer是并發類諸如ReentrantLock、CountDownLatch、Semphore的核心

  • CAS算法是AbstractQueuedSynchronizer的核心

  • 可以說AbstractQueuedSynchronizer是并發類的重中之重。其實之前在ReentrantLock實現原理深入探究一文中已經有結合ReentrantLock詳細解讀過AbstractQueuedSynchronizer,但限于當時水平原因,回看一年半前的此文,感覺對于AbstractQueuedSynchronizer的解讀理解還不夠深,因此這里更新一篇文章,再次解讀AbstractQueuedSynchronizer的數據結構即相關源碼實現,本文基于JDK1.7版本。

     

    AbstactQueuedSynchronizer的基本數據結構

    AbstractQueuedSynchronizer的基本數據結構為Node,關于Node,JDK作者寫了詳細的注釋,這里我大致總結幾點:

    1. AbstractQueuedSynchronizer的等待隊列是CLH隊列的變種,CLH隊列通常用于自旋鎖,AbstractQueuedSynchronizer的等待隊列用于阻塞同步器

    2. 每個節點中持有一個名為"status"的字段用于是否一條線程應當阻塞的追蹤,但是status字段并不保證加鎖

    3. 一條線程如果它處于隊列的頭,那么他會嘗試去acquire,但是成為頭并不保證成功,它只是有權利去競爭

    4. 要進入隊列,你只需要自動將它拼接在隊列尾部即可;要從隊列中移除,你只需要設置header字段

    下面我用一張表格總結一下Node中持有哪些變量且每個變量的含義:

    關于SIGNAL、CANCELLED、CONDITION、PROPAGATE四個狀態,JDK源碼的注釋中同樣有了詳細的解讀,再用一張表格總結一下:

     

    AbstractQueuedSynchronizer供子類實現的方法

    AbstractQueuedSynchzonizer是基于模板模式的實現,不過它的模板模式寫法有點特別,整個類中沒有任何一個abstract的抽象方法,取而代之的是,需要子類去實現的那些方法通過一個方法體拋出UnsupportedOperationException異常來讓子類知道。

    AbstractQueuedSynchronizer類中一共有五處方法供子類實現,用表格總結一下:

    這里的acquire不好翻譯,所以就直接原詞放上來了,因為acquire是一個動詞,后面并沒有帶賓語,因此不知道具體acquire的是什么。按照我個人理解,acquire的意思應當是根據狀態字段state去獲取一個執行當前動作的資格。

    比如ReentrantLock的lock()方法最終會調用acquire方法,那么:

    1. 線程1去lock(),執行acquire,發現state=0,因此有資格執行lock()的動作,將state設置為1,返回true

    2. 線程2去lock(),執行acquire,發現state=1,因此沒有資格執行lock()的動作,返回false

    這種理解我認為應當是比較準確的。

     

    獨占模式acquire實現流程

    有了上面的這些基礎,我們看一下獨占式acquire的實現流程,主要是在線程acquire失敗后,是如何構建數據結構的,先看理論,之后再用一個例子畫圖說明。

    看一下AbstractQuueuedSynchronizer的acquire方法實現流程,acquire方法是用于獨占模式下進行操作的:

     1 public final void acquire(int arg) {
     2     if (!tryAcquire(arg) &&
     3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     4         selfInterrupt();
     5 }

     

    tryAcquire方法前面說過了,是子類實現的一個方法,如果tryAcquire返回的是true(成功),即表明當前線程獲得了一個執行當前動作的資格,自然也就不需要構建數據結構進行阻塞等待。

    如果tryAcquire方法返回的是false,那么當前線程沒有獲得執行當前動作的資格,接著執行"acquireQueued(addWaiter(Node.EXCLUSIVE), arg))"這句代碼,這句話很明顯,它是由兩步構成的:

    1. addWaiter,添加一個等待者

    2. acquireQueued,嘗試從等待隊列中去獲取執行一次acquire動作

    分別看一下每一步做了什么。

     

    addWaiter

    先看第一步,addWaiter做了什么,從傳入的參數Node.EXCLUSIVE我們知道這是獨占模式的:

     1 private Node addWaiter(Node mode) {
     2     Node node = new Node(Thread.currentThread(), mode);
     3     // Try the fast path of enq; backup to full enq on failure
     4     Node prev = tail;
     5     if (prev != null) {
     6         node.prev = prev;
     7         if (compareAndSetTail(prev, node)) {
     8             prev.next = node;
     9             return node;
    10         }
    11     }
    12     enq(node);
    13     return node;
    14 }

     

    首先看第4行~第11行的代碼,獲得當前數據結構中的尾節點,如果有尾節點,那么先獲取這個節點認為它是前驅節點prev,然后:

    1. 新生成的Node的前驅節點指向prev

    2. 并發下只有一條線程可以通過CAS算法讓自己的Node成為尾節點,此時將此prev的next指向該線程對應的Node

    因此在數據結構中有節點的情況下,所有新增節點都是作為尾節點插入數據結構。從注釋上來看,這段邏輯的存在的意義是以最短路徑O(1)的效果完成快速入隊,以最大化減小開銷。

    假如當前節點沒有被設置為尾節點,那么執行enq方法:

     1 private Node enq(final Node node) {
     2     for (;;) {
     3         Node t = tail;
     4         if (t == null) { // Must initialize
     5             if (compareAndSetHead(new Node()))
     6                 tail = head;
     7         } else {
     8             node.prev = t;
     9             if (compareAndSetTail(t, node)) {
    10                 t.next = node;
    11                 return t;
    12             }
    13         }
    14     }
    15 }

     

    這段代碼的邏輯為:

    1. 如果尾節點為空,即當前數據結構中沒有節點,那么new一個不帶任何狀態的Node作為頭節點

    2. 如果尾節點不為空,那么并發下使用CAS算法將當前Node追加成為尾節點,由于是一個for(;;)循環,因此所有沒有成功acquire的Node最終都會被追加到數據結構中

    看完了代碼,用一張圖表示一下AbstractQueuedSynchronizer的整體數據結構(比較簡單,就不自己畫了,網上隨便找了一張圖):

     

    acquireQueued

    隊列構建好了,下一步就是在必要的時候從隊列里面拿出一個Node了,這就是acquireQueued方法,顧名思義,從隊列里面acquire??聰耡cquireQueued方法的實現:

     1 final boolean acquireQueued(final Node node, int arg) {
     2     boolean failed = true;
     3     try {
     4         boolean interrupted = false;
     5         for (;;) {
     6             final Node p = node.prevecessor();
     7             if (p == head && tryAcquire(arg)) {
     8                 setHead(node);
     9                 p.next = null; // help GC
    10                 failed = false;
    11                 return interrupted;
    12             }
    13             if (shouldParkAfterFailedAcquire(p, node) &&
    14                 parkAndCheckInterrupt())
    15                 interrupted = true;
    16         }
    17     } finally {
    18         if (failed)
    19             cancelAcquire(node);
    20     }
    21 }

     

    這段代碼描述了幾件事:

    1. 從第6行的代碼獲取節點的前驅節點p,第7行的代碼判斷p是前驅節點并tryAcquire我們知道,只有當前第一個持有Thread的節點才會嘗試acquire,如果節點acquire成功,那么setHead方法,將當前節點作為head、將當前節點中的thread設置為null、將當前節點的prev設置為null,這保證了數據結構中頭結點永遠是一個不帶Thread的空節點

    2. 如果當前節點不是前驅節點或者tryAcquire失敗,那么執行第13行~第15行的代碼,做了兩步操作,首先判斷在acquie失敗后是否應該park,其次park并檢查中斷狀態

    看一下第一步shouldParkAfterFailedAcquire代碼做了什么:

     1 private static boolean shouldParkAfterFailedAcquire(Node prev, Node node) {
     2     int ws = prev.waitStatus;
     3     if (ws == Node.SIGNAL)
     4         /*
     5          * This node has already set status asking a release
     6          * to signal it, so it can safely park.
     7          */
     8         return true;
     9     if (ws > 0) {
    10         /*
    11          * prevecessor was cancelled. Skip over prevecessors and
    12          * indicate retry.
    13          */
    14         do {
    15             node.prev = prev = prev.prev;
    16         } while (prev.waitStatus > 0);
    17         prev.next = node;
    18     } else {
    19         /*
    20          * waitStatus must be 0 or PROPAGATE.  Indicate that we
    21          * need a signal, but don't park yet.  Caller will need to
    22          * retry to make sure it cannot acquire before parking.
    23          */
    24         compareAndSetWaitStatus(prev, ws, Node.SIGNAL);
    25     }
    26     return false;
    27 }

     

    這里每個節點判斷它前驅節點的狀態,如果:

    1. 它的前驅節點是SIGNAL狀態的,返回true,表示當前節點應當park

    2. 它的前驅節點的waitStatus>0,相當于CANCELLED(因為狀態值里面只有CANCELLED是大于0的),那么CANCELLED的節點作廢,當前節點不斷向前找并重新連接為雙向隊列,直到找到一個前驅節點waitStats不是CANCELLED的為止

    3. 它的前驅節點不是SIGNAL狀態且waitStatus<=0,此時執行第24行代碼,利用CAS機制,如果waitStatus的前驅節點是0那么更新為SIGNAL狀態

    如果判斷判斷應當park,那么parkAndCheckInterrupt方法:

     1 private final boolean parkAndCheckInterrupt() {
     2     LockSupport.park(this);
     3     return Thread.interrupted();
     4 }

     

    利用LockSupport的park方法讓當前線程阻塞。

     

    獨占模式release流程

    上面整理了獨占模式的acquire流程,看到了等待的Node是如何構建成一個數據結構的,下面看一下釋放的時候做了什么,release方法的實現為:

    1 public final boolean release(int arg) {
    2     if (tryRelease(arg)) {
    3         Node h = head;
    4         if (h != null && h.waitStatus != 0)
    5             unparkSuccessor(h);
    6         return true;
    7     }
    8     return false;
    9 }

     

    tryRelease同樣是子類去實現的,表示當前動作我執行完了,要釋放我執行當前動作的資格,講這個資格讓給其它線程,然后tryRelease釋放成功,獲取到head節點,如果head節點的waitStatus不為0的話,執行unparkSuccessor方法,顧名思義unparkSuccessor意為unpark頭結點的繼承者,方法實現為:

     1 private void unparkSuccessor(Node node) {
     2         /*
     3          * If status is negative (i.e., possibly needing signal) try
     4          * to clear in anticipation of signalling.  It is OK if this
     5          * fails or if status is changed by waiting thread.
     6          */
     7         int ws = node.waitStatus;
     8         if (ws < 0)
     9             compareAndSetWaitStatus(node, ws, 0);
    10 
    11         /*
    12          * Thread to unpark is held in successor, which is normally
    13          * just the next node.  But if cancelled or apparently null,
    14          * traverse backwards from tail to find the actual
    15          * non-cancelled successor.
    16          */
    17         Node s = node.next;
    18         if (s == null || s.waitStatus > 0) {
    19             s = null;
    20             for (Node t = tail; t != null && t != node; t = t.prev)
    21                 if (t.waitStatus <= 0)
    22                     s = t;
    23         }
    24         if (s != null)
    25             LockSupport.unpark(s.thread);
    26 }

     

    這段代碼比較好理解,整理一下流程:

    1. 頭節點的waitStatus<0,將頭節點的waitStatus設置為0

    2. 拿到頭節點的下一個節點s,如果s==null或者s的waitStatus>0(被取消了),那么從隊列尾巴開始向前尋找一個waitStatus<=0的節點作為后繼要喚醒的節點

    最后,如果拿到了一個不等于null的節點s,就利用LockSupport的unpark方法讓它取消阻塞。

     

    實戰舉例:數據結構構建

    上面的例子講解地過于理論,下面利用ReentrantLock舉個例子,但是這里不講ReentrantLock實現原理,只是利用ReentrantLock研究AbstractQueuedSynchronizer的acquire和release。示例代碼為:

     1 /**
     2  * @author 五月的倉頡//www.cnblogs.com/xrq730/p/7056614.html
     3  */
     4 public class AbstractQueuedSynchronizerTest {
     5 
     6     @Test
     7     public void testAbstractQueuedSynchronizer() {
     8         Lock lock = new ReentrantLock();
     9         
    10         Runnable runnable0 = new ReentrantLockThread(lock);
    11         Thread thread0 = new Thread(runnable0);
    12         thread0.setName("線程0");
    13         
    14         Runnable runnable1 = new ReentrantLockThread(lock);
    15         Thread thread1 = new Thread(runnable1);
    16         thread1.setName("線程1");
    17         
    18         Runnable runnable2 = new ReentrantLockThread(lock);
    19         Thread thread2 = new Thread(runnable2);
    20         thread2.setName("線程2");
    21         
    22         thread0.start();
    23         thread1.start();
    24         thread2.start();
    25         
    26         for (;;);
    27     }
    28     
    29     private class ReentrantLockThread implements Runnable {
    30         
    31         private Lock lock;
    32         
    33         public ReentrantLockThread(Lock lock) {
    34             this.lock = lock;
    35         }
    36         
    37         @Override
    38         public void run() {
    39             try {
    40                 lock.lock();
    41                 for (;;);
    42             } finally {
    43                 lock.unlock();
    44             }
    45         }
    46         
    47     }
    48     
    49 }

     

    全部是死循環,相當于第一條線程(線程0)acquire成功之后,后兩條線程(線程1、線程2)阻塞,下面的代碼就不考慮后兩條線程誰先誰后的問題,就一條線程(線程1)流程執行到底、另一條線程(線程2)流程執行到底這么分析了。

    這里再把addWaiter和enq兩個方法源碼貼一下:

     1 private Node addWaiter(Node mode) {
     2     Node node = new Node(Thread.currentThread(), mode);
     3     // Try the fast path of enq; backup to full enq on failure
     4     Node prev = tail;
     5     if (prev != null) {
     6         node.prev = prev;
     7         if (compareAndSetTail(prev, node)) {
     8             prev.next = node;
     9             return node;
    10         }
    11     }
    12     enq(node);
    13     return node;
    14 }

     

     1 private Node enq(final Node node) {
     2     for (;;) {
     3         Node t = tail;
     4         if (t == null) { // Must initialize
     5             if (compareAndSetHead(new Node()))
     6                 tail = head;
     7         } else {
     8             node.prev = t;
     9             if (compareAndSetTail(t, node)) {
    10                 t.next = node;
    11                 return t;
    12             }
    13         }
    14     }
    15 }

     

    首先第一個acquire失敗的線程1,由于此時整個數據結構中么沒有任何數據,因此addWaiter方法第4行中拿到的prev=tail為空,執行enq方法,首先第3行獲取tail,第4行判斷到tail是null,因此頭結點new一個Node出來通過CAS算法設置為數據結構的head,tail同樣也是這個Node,此時數據結構為:

    為了方便描述,prev和next,我給每個Node隨便加了一個地址。接著繼續enq,因為enq內是一個死循環,所以繼續第3行獲取tail,new了一個空的Node之后tail就有了,執行else判斷,通過第8行~第10行代碼將當前線程對應的Node追加到數據結構尾部,那么當前構建的數據結構為:

    這樣,線程1對應的Node被加入數據結構,成為數據結構的tail,而數據結構的head是一個什么都沒有的空Node。

    接著線程2也acquire失敗了,線程2既然acquire失敗,那也要準備被加入數據結構中,繼續先執行addWaiter方法,由于此時已經有了tail,因此不需要執行enq方法,可以直接將當前Node添加到數據結構尾部,那么當前構建的數據結構為:

    至此,兩個阻塞的線程構建的三個Node已經全部歸位。

     

    實戰舉例:線程阻塞

    上述流程只是描述了構建數據結構的過程,并沒有描述線程1、線程2阻塞的流程,因此接著繼續用實際例子看一下線程1、線程2如何阻塞。貼一下acquireQueued、shouldParkAfterFailedAcquire兩個方法源碼:

     1 final boolean acquireQueued(final Node node, int arg) {
     2     boolean failed = true;
     3     try {
     4         boolean interrupted = false;
     5         for (;;) {
     6             final Node p = node.prevecessor();
     7             if (p == head && tryAcquire(arg)) {
     8                 setHead(node);
     9                 p.next = null; // help GC
    10                 failed = false;
    11                 return interrupted;
    12             }
    13             if (shouldParkAfterFailedAcquire(p, node) &&
    14                 parkAndCheckInterrupt())
    15                 interrupted = true;
    16         }
    17     } finally {
    18         if (failed)
    19             cancelAcquire(node);
    20     }
    21 }

     

     1 private static boolean shouldParkAfterFailedAcquire(Node prev, Node node) {
     2     int ws = prev.waitStatus;
     3     if (ws == Node.SIGNAL)
     4         /*
     5          * This node has already set status asking a release
     6          * to signal it, so it can safely park.
     7          */
     8         return true;
     9     if (ws > 0) {
    10         /*
    11          * prevecessor was cancelled. Skip over prevecessors and
    12          * indicate retry.
    13          */
    14         do {
    15             node.prev = prev = prev.prev;
    16         } while (prev.waitStatus > 0);
    17         prev.next = node;
    18     } else {
    19         /*
    20          * waitStatus must be 0 or PROPAGATE.  Indicate that we
    21          * need a signal, but don't park yet.  Caller will need to
    22          * retry to make sure it cannot acquire before parking.
    23          */
    24         compareAndSetWaitStatus(prev, ws, Node.SIGNAL);
    25     }
    26     return false;
    27 }

     

    首先是線程1,它的前驅節點是head節點,在它tryAcquire成功的情況下,執行第8行~第11行的代碼。做幾件事情:

    1. head為線程1對應的Node

    2. 線程1對應的Node的thread置空

    3. 線程1對應的Node的prev置空

    4. 原head的next置空,這樣原head中的prev、next、thread都為空,對象內沒有引用指向其他地方,GC可以認為這個Node是垃圾,對這個Node進行回收,注釋"Help GC"就是這個意思

    5. failed=false表示沒有失敗

    因此,如果線程1執行tryAcquire成功,那么數據結構將變為:

    從上述流程可以總結到:只有前驅節點為head的節點會嘗試tryAcquire,其余都不會,結合后面的release選繼承者的方式,保證了先acquire失敗的線程會優先從阻塞狀態中解除去重新acquire。這是一種公平的acquire方式,因為它遵循"先到先得"原則,但是我們可以動動手腳讓這種公平變為非公平,比如ReentrantLock默認的費公平模式,這個留在后面說。

    那如果線程1執行tryAcquire失敗,那么要執行shouldParkAfterFailedAcquire方法了,shouldParkAfterFailedAcquire拿線程1的前驅節點也就是head節點的waitStatus做了一個判斷,因為waitStatus=0,因此執行第18行~第20行的邏輯,將head的waitStatus設置為SIGNAL即-1,然后方法返回false,數據結構變為:

    看到這里就一個變化:head的waitStatus從0變成了-1。既然shouldParkAfterFailedAcquire返回false,acquireQueued的第13行~第14行的判斷自然不通過,繼續走for(;;)循環,如果tryAcquire失敗顯然又來到了shouldParkAfterFailedAcquire方法,此時線程1對應的Node的前驅節點head節點的waitStatus已經變為了SIGNAL即-1,因此執行第4行~第8行的代碼,直接返回true出去。

    shouldParkAfterFailedAcquire返回true,parkAndCheckInterrupt直接調用LockSupport的park方法:

     1 private final boolean parkAndCheckInterrupt() {
     2     LockSupport.park(this);
     3     return Thread.interrupted();
     4 }

     

    至此線程1阻塞,線程2阻塞的流程與線程1阻塞的流程相同,可以自己分析一下。

    另外再提一個問題,不知道大家會不會想:

    1. 為什么線程1對應的Node構建完畢不直接調用LockSupport的park方法進行阻塞?

    2. 為什么不直接把head的waitStatus直接設置為Signal而要從0設置為Signal?

    我認為這是AbstractQueuedSynchronizer開發人員做了類似自旋的操作。因為很多時候獲取acquire進行操作的時間很短,阻塞會引起上下文的切換,而很短時間就從阻塞狀態解除,這樣相對會比較耗費性能。

    因此我們看到線程1自構建完畢Node加入數據結構到阻塞,一共嘗試了兩次tryAcquire,如果其中有一次成功,那么線程1就沒有必要被阻塞,提升了性能。

    來源:itnose

上一篇: IO【轉換流,打印流,序列化】

下一篇: 打造獨立數據庫訪問的中間服務

分享到: 更多
pk10赛车冠军大小计划 奔驰团队pk10全天计划 新时时彩 北京pk10哪种最稳 彩世界北京pk手机版 k10必中冠军计划 三肖八码 飞艇计划网页版全天 扑克牌21点游戏下载 重庆时时51计划 全天pk10最稳计划 pk10双面盘 时时彩买单双技巧有规律吗 现金提现的棋牌游戏 重庆时时开奖结果记录 北京pk10官方软件下载