首頁 > 軟體

Java並行之CAS原理詳解

2022-03-16 19:01:00

開端

在學習原始碼之前我們先從一個需求開始

需求

我們開發一個網站,需要對存取量進行統計,使用者每傳送一次請求,存取量+1.如何實現?我們模擬有100個人同時存取,並且每個人對咱們的網站發起10次請求,最後總存取次數應該是1000次

1.程式碼

package day03;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * Description
 * User:
 * Date:
 * Time:
 */
public class Demo {
    //總存取量
    static int count = 0;
    //模擬存取的方法
    public static void request() throws InterruptedException {
        //模擬耗時5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
        count++;
    }
    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        int threadSize=100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        for (int i=0;i<threadSize;i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //每個使用者存取10次網站
                    try {
                        for (int j=0;j<10;j++) {
                            request();
                        }
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        countDownLatch.countDown();
                    }
                }
            });
            thread.start();
        }
        //怎麼保證100個執行緒執行之後,執行後面的程式碼
        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName()+"耗時:"+(endTime-startTime)+",count:"+count);
    }
}

我們多輸出幾次結果

main耗時:66,count:950

main耗時:67,count:928

發現每一次count都不相同,和我們期待的1000相差一點,這裡就牽扯到了並行問題,我們的count++在底層實際上由3步操作組成

  • 獲取count,各個執行緒寫入自己的工作記憶體
  • count執行+1操作
  • 將+1後的值寫回主記憶體中

這並不是一個執行緒安全的過程,如果有A、B兩個執行緒同時執行count++,同時執行到第一步,得到的count是一樣的,三步操作完成後,count只加1,導致count結果不正確

那麼怎麼解決這個問題呢?

我們可以考慮使用synchronized關鍵字和ReentrantLock對資源加鎖,保證並行的正確性,多執行緒的情況下,可以保證被鎖住的資源被序列存取

1.1修改後的程式碼

package day03;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * Description
 * User:
 * Date:
 * Time:
 */
public class Demo02 {
    //總存取量
    static int count = 0;
    //模擬存取的方法
    public static synchronized void request() throws InterruptedException {
        //模擬耗時5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
        count++;
    }
    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        int threadSize=100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        for (int i=0;i<threadSize;i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //每個使用者存取10次網站
                    try {
                        for (int j=0;j<10;j++) {
                            request();
                        }
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        countDownLatch.countDown();
                    }
                }
            });
            thread.start();
        }
        //怎麼保證100個執行緒執行之後,執行後面的程式碼
        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName()+"耗時:"+(endTime-startTime)+",count:"+count);
    }
}

執行結果

main耗時:5630,count:1000

可以看到,由於sychronized鎖住了整個方法,雖然結果正確,但因為執行緒執行方法均為序列執行,導致執行效率大大下降

那麼我們如何才能使程式執行無誤時,效率還不會降低呢?

縮小鎖的範圍,升級上述3步中第三步的實現

  • 獲取鎖
  • 獲取count最新的值,記作LV
  • 判斷LV是否等於A,如果相等,則將B的值賦值給count,並返回true,否則返回false
  • 釋放鎖

1.2程式碼改進:CAS模仿

package day03;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * Description
 * User:
 * Date:
 * Time:
 */
public class Demo03 {
    //總存取量
    volatile static int count = 0;
    //模擬存取的方法
    public static void request() throws InterruptedException {
        //模擬耗時5毫秒
        TimeUnit.MILLISECONDS.sleep(5);
//        count++;
        int expectCount;
        while (!compareAndSwap(expectCount=getCount(),expectCount+1)){}
    }
    /**
     * @param expectCount 期待的值,比如最剛開始count=3
     * @param newCount 新值 count+1之後的值,4
     * @return
     */
    public static synchronized  boolean compareAndSwap(int expectCount,int newCount){
        if (getCount()==expectCount){
            count = newCount;
            return true;
        }
        return false;
    }
    public static int getCount(){return count;}
    public static void main(String[] args) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        int threadSize=100;
        CountDownLatch countDownLatch = new CountDownLatch(threadSize);
        for (int i=0;i<threadSize;i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //每個使用者存取10次網站
                    try {
                        for (int j=0;j<10;j++) {
                            request();
                        }
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        countDownLatch.countDown();
                    }
                }
            });
            thread.start();
        }
        //怎麼保證100個執行緒執行之後,執行後面的程式碼
        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        System.out.println(Thread.currentThread().getName()+"耗時:"+(endTime-startTime)+",count:"+count);
    }
}

main耗時:67,count:1000

2.CAS分析

CAS全稱“CompareAndSwap”,中文翻譯過來為“比較並替換”

定義:

  • CAS操作包含三個運算元——記憶體位置(V)期望值(A)新值(B)。如果記憶體位置的值和期望值匹配,那麼處理器會自動將該位置值更新為新值。否則處理器不作任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。
  • CAS在一些特殊情況下僅返回CAS是否成功,而不提取當前值,CAS有效的說明了我認為位置V應該包含值A,如果包含該值,將B放到這個位置,否則不要更改該位置的值,只告訴我這個位置現在的值即可

2.1Java對CAS的支援

java中提供了對CAS操作的支援,具體在sun.misc.unsafe類中,宣告如下

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
  • 引數var1:表示要操作的物件
  • 引數var2:表示要操作屬性地址的偏移量
  • 引數var4:表示需要修改資料的期望的值
  • 引數var5:表示需要修改的新值

2.2CAS實現原理是什麼?

CAS通過呼叫JNI的程式碼實現,JNI:java native interface,允許java呼叫其他語言。而compareAndSwapxxx系列的方法就是藉助C語言來呼叫cpu底層指令實現的

以常用的Intel x86平臺為例,最終對映到cpu的指令為"cmpxchg",這是一個原子指令,cpu執行此命令時,實現比較並替換的操作

現代計算機動不動就上百核心,cmpxchg怎麼保證多核心下的執行緒安全?

系統底層在進行CAS操作的時候,會判斷當前系統是否為多核心繫統,如果是就給“匯流排”加鎖,只有一個執行緒會對匯流排加鎖成功,加鎖之後執行CAS操作,也就是說CAS的原子性是平臺級別的

2.3CAS存在的問題

2.3.1什麼是ABA問題?

CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,在CAS方法執行之前,被其他執行緒修改為B,然後又修改回了A,那麼CAS方法執行檢查的時候會發現它的值沒有發生變化,但是實際卻不是原來的A了,這就是CAS的ABA問題

可以看到上圖中執行緒A在真正更改A之前,A已經被其他執行緒修改為B然後又修改為A了

程式模擬ABA問題

package day04;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * Description
 * User:
 * Date:
 * Time:
 */
public class Test01 {
    public static AtomicInteger a = new AtomicInteger();
    public static void main(String[] args) {
        Thread main = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"執行,a的值為:"+a.get());
                try {
                    int expect = a.get();
                    int update = expect+1;
                    //讓出cpu
                    Thread.sleep(1000);
                    boolean b = a.compareAndSet(expect, update);
                    System.out.println(Thread.currentThread().getName()+"CAS執行:"+b+",a的值為:"+a.get());
                }
                 catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"主執行緒");
//        main.start();
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20);
                    a.incrementAndGet();
                    System.out.println(Thread.currentThread().getName()+"更改a的值為:"+a.get());
                    a.decrementAndGet();
                    System.out.println(Thread.currentThread().getName()+"更改a的值為:"+a.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"其他執行緒");
        main.start();
        thread1.start();
    }
}

主執行緒執行,a的值為:0
其他執行緒更改a的值為:1
其他執行緒更改a的值為:0
主執行緒CAS執行:true,a的值為:1

可以看到,在執行CAS之前,a被其他執行緒修改為1又修改為0,但是對執行CAS並沒有影響,因為它根本沒有察覺到其他執行緒對a的修改

2.3.2如何解決ABA問題

解決ABA問題最簡單的方案就是給值加一個修改版本號,每次值變化,都會修改它的版本號,CAS操作時都去對比此版本號

在java中的ABA解決方案(AtomicStampedReference

AtomicStampedReference主要包含一個物件參照及一個可以自動更新的整數stamp的pair物件來解決ABA問題

AtomicStampedReference原始碼

    /**
     * Atomically sets the value of both the reference and stamp
     * to the given update values if the
     * current reference is {@code ==} to the expected reference
     * and the current stamp is equal to the expected stamp.
     *
     * @param expectedReference the expected value of the reference 期待參照
     * @param newReference the new value for the reference          新值參照
     * @param expectedStamp the expected value of the stamp         期望參照的版本號
     * @param newStamp the new value for the stamp                  新值的版本號
     * @return {@code true} if successful
     */
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&//期望參照與當前參照保持一致
            expectedStamp == current.stamp &&//期望參照版本號與當前版本號保持一致
            ((newReference == current.reference &&//新值參照與當前參照一致並且新值版本號與當前版本號保持一致
              newStamp == current.stamp)
                    ||//如果上述版本號不一致,則通過casPair方法新建一個Pair物件,更新值和版本號,進行再次比較
             casPair(current, Pair.of(newReference, newStamp)));
    }
    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }

使用AtomicStampedReference解決ABA問題程式碼

package day04;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
 * Description
 * User:
 * Date:
 * Time:
 */
public class Test02 {
    public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1),1);
    public static void main(String[] args) {
        Thread main = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"執行,a的值為:"+a.getReference());
                try {
                    Integer expectReference = a.getReference();
                    Integer newReference = expectReference+1;
                    Integer expectStamp = a.getStamp();
                    Integer newStamp = expectStamp+1;
                    //讓出cpu
                    Thread.sleep(1000);
                    boolean b = a.compareAndSet(expectReference, newReference,expectStamp,newStamp);
                    System.out.println(Thread.currentThread().getName()+"CAS執行:"+b);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"主執行緒");
//        main.start();
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(20);
                    a.compareAndSet(a.getReference(),a.getReference()+1,a.getStamp(),a.getStamp()+1);
                    System.out.println(Thread.currentThread().getName()+"更改a的值為:"+a.getReference());
                    a.compareAndSet(a.getReference(),a.getReference()-1,a.getStamp(),a.getStamp()-1);
                    System.out.println(Thread.currentThread().getName()+"更改a的值為:"+a.getReference());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"其他執行緒");
        main.start();
        thread1.start();
    }
}

主執行緒執行,a的值為:1
其他執行緒更改a的值為:2
其他執行緒更改a的值為:1
主執行緒CAS執行:false

因為AtomicStampedReference執行CAS會去檢查版本號,版本號不一致則不會進行CAS,所以ABA問題成功解決

總結

本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!   


IT145.com E-mail:sddin#qq.com