日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

并發編程之CAS和Atomic原子操作類

作者:bingtanghulu_6 更新時間: 2022-05-11 編程語言

目錄

1. 線程安全問題?

1.1 CAS工具類代碼?

2. 什么是CAS?

2. CAS的java實現

2.1??創建一個工廠類獲取unsafe對象

2.2 測試unsafe的三個方法

3.ABA問題

3.1 什么是ABA問題?

?3.1 ABA解決方案

4.Atomic原子操作類詳解

?4.1?AtomicInteger

4.2?AtomicIntegerArray?

4.3?AtomicReference

4.4?AtomicIntegerFieldUpdater

4.5?LongAdder

4.6?LongAccumulator?


1. 線程安全問題?

?sum++不能保證原子性,volatile只能保證可見性和有序性,為了解決線程安全問題,有以下三種解決方案:

1. synchronized

2. reentrantLock

3. CAS

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class testCASTest {

    private volatile static int sum = 0;

    private static Object obj = "";

    private static ReentrantLock lock = new ReentrantLock();

    private static CASLock casLock = new CASLock();

    private static AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {

        //模擬10個線程計算10000的數,出現計算不等于100000的場景
//        for(int i =0;i<10;i++){
//            new Thread(()->{
//                for(int j =0;j<10000;j++){
//                    sum++;
//                }
//            }).start();
//        }

        //1.第一種方法,使用synchronized解決問題
//        for(int i =0;i<10;i++){
//            Thread thread = new Thread(()->{
//                synchronized (obj){
//                    for(int j =0;j<10000;j++){
//                        sum++;
//                    }
//                }
//            });
//            thread.start();
//        }

        //第二種方法,使用ReenTrantLock解決問題
//        for(int i =0;i<10;i++){
//            Thread thread = new Thread(()->{
//
//                lock.lock();
//                try{
//                    for(int j =0;j<10000;j++){
//                        sum++;
//                    }
//                }finally {
//                    lock.unlock();
//                }
//            });
//            thread.start();
//        }

        //第三種方法,使用CAS空轉方式
//        for(int i =0;i<10;i++){
//            Thread thread = new Thread(()->{
//
//                //CAS空轉的方式
//                for(;;){
//                    //當state為0時并且做一次CAS賦值操作時可以處理業務邏輯
//                    if(casLock.getState() == 0 && casLock.cas()){
//                        try{
//                            for(int j =0;j<10000;j++){
//                                sum++;
//                            }
//                        }finally {
//                            casLock.setState(0);
//                        }
//                        break;
//                    }
//                }
//            });
//            thread.start();
//        }
        System.out.println(sum);
        //第四種方法,使用CAS自帶的工具類atomicInteger
        for(int i =0;i<10;i++){
            Thread thread = new Thread(()->{
                for(int j =0;j<10000;j++){
                    atomicInteger.incrementAndGet();
                }
            });
            thread.start();
        }

        //休眠3秒,模擬業務操作,等待10個線程計算完畢再打印最終值
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(atomicInteger.get());
    }
}

1.1 CAS工具類代碼?

?CAS工具類代碼如下:

import sun.misc.Unsafe;

public class CASLock {

    //加鎖標記
    private volatile int state;
    private static final Unsafe UNSAFE;
    private static final long OFFSET;

    static {
        try {
            UNSAFE = UnsafeFactory.getUnsafe();
            OFFSET = UnsafeFactory.getFieldOffset(
                    UNSAFE, CASLock.class, "state");
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    public boolean cas() {
        return UNSAFE.compareAndSwapInt(this, OFFSET, 0, 1);
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

}

import java.lang.reflect.Field;

import sun.misc.Unsafe;

public class UnsafeFactory {

    /**
     * 獲取 Unsafe 對象
     * @return
     */
    public static Unsafe getUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 獲取字段的內存偏移量
     * @param unsafe
     * @param clazz
     * @param fieldName
     * @return
     */
    public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
        try {
            return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }


}

?

2. 什么是CAS?

CAS是一組原子操作通過跟某一時刻內存中的值進行比較,如果相等就賦予新值。

cas硬件上實現了并發三大特性的原子性,通過自旋鎖的機制實現了自增操作。

CAS的三個問題:

????????1.自旋時間過長造成CPU開銷過大

????????2.只能對一個共享變量進行原子操作。

????????3.ABA問題(基本用不到,在CAS更新以后中間可能有線程把值改回原來的值比較過后仍然可以更改,造成數據紊亂)

//偽代碼
if(value==expectValue){
    value == newValue;
}
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest {

    static AtomicInteger sum = new AtomicInteger(0);

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // 原子自增  CAS
                    sum.incrementAndGet();
                    //count++;

                }
            });
            thread.start();
        }

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sum.get());

    }

}

?

2. CAS的java實現

cas中提供了Unsafe類的實現。常用的幾種操作不同類型的方法如下截圖。

compareAndSwapObject compareAndSwapInt compareAndSwapLong

2.1??創建一個工廠類獲取unsafe對象

import java.lang.reflect.Field;

import sun.misc.Unsafe;

//獲取unsafe對象和獲取字段內存偏移量
public class UnsafeFactory {

    /**
     * 獲取 Unsafe 對象
     * @return
     */
    public static Unsafe getUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 獲取字段的內存偏移量
     * @param unsafe
     * @param clazz
     * @param fieldName
     * @return
     */
    public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
        try {
            return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }


}

2.2 測試unsafe的三個方法

import *.UnsafeFactory;
import sun.misc.Unsafe;


public class CASTest {

    public static void main(String[] args) {
        Entity entity = new Entity();

        Unsafe unsafe = UnsafeFactory.getUnsafe();

        //計算偏移量對象頭8+對象指針4
        long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
        System.out.println(offset);
        boolean successful;

        // 4個參數分別是:對象實例、字段的內存偏移量、字段期望值、字段更新值
        successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
        System.out.println(successful + "\t" + entity.x);

        successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
        System.out.println(successful + "\t" + entity.x);

        successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
        System.out.println(successful + "\t" + entity.x);

    }


}


class Entity{
    int x;
}

3.ABA問題

3.1 什么是ABA問題?

ABA問題就是CAS獲取內存值進行比較再進行賦值的過程存在時間差,在這個時間差內可能有線程將其更改為初始值而不被下一個線程察覺,仍然會更改成功。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import lombok.extern.slf4j.Slf4j;

//ABA問題復現
@Slf4j
public class ABATest {

    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(1);

        new Thread(()->{
            int value = atomicInteger.get();
            log.debug("Thread1 read value: " + value);

            // 阻塞1s
            LockSupport.parkNanos(1000000000L);

            // Thread1通過CAS修改value值為3
            if (atomicInteger.compareAndSet(value, 3)) {
                log.debug("Thread1 update from " + value + " to 3");
            } else {
                log.debug("Thread1 update fail!");
            }
        },"Thread1").start();

        new Thread(()->{
            int value = atomicInteger.get();
            log.debug("Thread2 read value: " + value);
            // Thread2通過CAS修改value值為2
            if (atomicInteger.compareAndSet(value, 2)) {
                log.debug("Thread2 update from " + value + " to 2");

                // do something
                value = atomicInteger.get();
                log.debug("Thread2 read value: " + value);
                // Thread2通過CAS修改value值為1
                if (atomicInteger.compareAndSet(value, 1)) {
                    log.debug("Thread2 update from " + value + " to 1");
                }
            }
        },"Thread2").start();
    }
}

?3.1 ABA解決方案

import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.LockSupport;

import lombok.extern.slf4j.Slf4j;


@Slf4j
public class AtomicStampedReferenceTest {

    public static void main(String[] args) {
        // 定義AtomicStampedReference    Pair.reference值為1, Pair.stamp為1, stamp是版本
        AtomicStampedReference atomicStampedReference = new AtomicStampedReference(1,1);

        new Thread(()->{
            int[] stampHolder = new int[1];
            int value = (int) atomicStampedReference.get(stampHolder);
            int stamp = stampHolder[0];
            log.debug("Thread1 read value: " + value + ", stamp: " + stamp);

            // 阻塞1s
            LockSupport.parkNanos(1000000000L);
            // Thread1通過CAS修改value值為3   stamp是版本,每次修改可以通過+1保證版本唯一性
            if (atomicStampedReference.compareAndSet(value, 3,stamp,stamp+1)) {
                log.debug("Thread1 update from " + value + " to 3");
            } else {
                log.debug("Thread1 update fail!");
            }
        },"Thread1").start();

        new Thread(()->{
            int[] stampHolder = new int[1];
            int value = (int)atomicStampedReference.get(stampHolder);
            int stamp = stampHolder[0];
            log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);
            // Thread2通過CAS修改value值為2
            if (atomicStampedReference.compareAndSet(value, 2,stamp,stamp+1)) {
                log.debug("Thread2 update from " + value + " to 2");

                // do something

                value = (int) atomicStampedReference.get(stampHolder);
                stamp = stampHolder[0];
                log.debug("Thread2 read value: " + value+ ", stamp: " + stamp);
                // Thread2通過CAS修改value值為1
                if (atomicStampedReference.compareAndSet(value, 1,stamp,stamp+1)) {
                    log.debug("Thread2 update from " + value + " to 1");
                }
            }
        },"Thread2").start();
    }
}

4.Atomic原子操作類詳解

atomic是java提供的java.util.concurrent.atomic包下的用于解決線程問題的操作類。我們常用的一般都是synchronized關鍵字,但是用此包下面的類會更加高效。

在java.util.concurrent.atomic包里提供了一組原子操作類:

基本類型:AtomicInteger、AtomicLong、AtomicBoolean;

引用類型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;

數組類型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

對象屬性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

原子類型累加器(jdk1.8增加的類):DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

?4.1?AtomicInteger

常用方法總結:

// getAndIncrement() 以原子的方式將實例中的原值加1,返回的是自增前的舊值;

//getAndSet(int newValue):將實例中的值更新為新值,并返回舊值;

//incrementAndGet() :以原子的方式將實例中的原值進行加1操作,并返回最終相加后的結果;

//addAndGet(int delta) :以原子方式將輸入的數值與實例中原本的值相加,并返回最后的結果;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest {

    static AtomicInteger sum = new AtomicInteger(0);

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // 原子自增  CAS
                    sum.incrementAndGet();
                    //count++;

                }
            });
            thread.start();
        }

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sum.get());

    }

}

4.2?AtomicIntegerArray?

//addAndGet(int i, int delta):以原子更新的方式將數組中索引為i的元素與輸入值相加;

//getAndIncrement(int i):以原子更新的方式將數組中索引為i的元素自增加1;

//compareAndSet(int i, int expect, int update):將數組中索引為i的位置的元素進行更新

import java.util.concurrent.atomic.AtomicIntegerArray;


public class AtomicIntegerArrayTest {

    static int[] value = new int[]{ 1, 2, 3, 4, 5 };
    static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(value);


    public static void main(String[] args) throws InterruptedException {

        //設置索引0的元素為100
        atomicIntegerArray.set(0, 100);
        System.out.println(atomicIntegerArray.get(0));
        //以原子更新的方式將數組中索引為1的元素與輸入值相加
        atomicIntegerArray.getAndAdd(1,5);

        System.out.println(atomicIntegerArray);
    }
}

4.3?AtomicReference

AtomicReference作用是對普通對象的封裝,它可以保證你在修改對象引用時的線程安全性。

import java.util.concurrent.atomic.AtomicReference;

import lombok.AllArgsConstructor;
import lombok.Data;

public class AtomicReferenceTest {

    public static void main( String[] args ) {
        User user1 = new User("張三", 23);
        User user2 = new User("李四", 25);
        User user3 = new User("王五", 20);

        //初始化為 user1
        AtomicReference atomicReference = new AtomicReference<>();
        atomicReference.set(user1);

        //把 user2 賦給 atomicReference
        atomicReference.compareAndSet(user1, user2);
        System.out.println(atomicReference.get());

        //把 user3 賦給 atomicReference
        atomicReference.compareAndSet(user1, user3);
        System.out.println(atomicReference.get());

    }

}


@Data
@AllArgsConstructor
class User {
    private String name;
    private Integer age;
}

4.4?AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater可以線程安全地更新對象中的整型變量。

對于AtomicIntegerFieldUpdater 的使用稍微有一些限制和約束,約束如下:

(1)字段必須是volatile類型的,在線程之間共享變量時保證立即可見.eg:volatile int value = 3

(2)字段的描述類型(修飾符public/protected/default/private)與調用者與操作對象字段的關系一致。也就是說調用者能夠直接操作對象字段,那么就可以反射進行原子操作。但是對于父類的字段,子類是不能直接操作的,盡管子類可以訪問父類的字段。

(3)只能是實例變量,不能是類變量,也就是說不能加static關鍵字。

(4)只能是可修改變量,不能使final變量,因為final的語義就是不可修改。實際上final的語義和volatile是有沖突的,這兩個關鍵字不能同時存在。

(5)對于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long類型的字段,不能修改其包裝類型(Integer/Long)。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;


public class AtomicIntegerFieldUpdaterTest {


    public static class Candidate {
        //字段必須是volatile類型
        volatile int score = 0;

        AtomicInteger score2 = new AtomicInteger();
    }

    public static final AtomicIntegerFieldUpdater scoreUpdater =
            AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    public static AtomicInteger realScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

        final Candidate candidate = new Candidate();

        Thread[] t = new Thread[10000];
        for (int i = 0; i < 10000; i++) {
            t[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        candidate.score2.incrementAndGet();
                        scoreUpdater.incrementAndGet(candidate);
                        realScore.incrementAndGet();
                    }
                }
            });
            t[i].start();
        }
        for (int i = 0; i < 10000; i++) {
            t[i].join();
        }
        System.out.println("AtomicIntegerFieldUpdater Score=" + candidate.score);
        System.out.println("AtomicInteger Score=" + candidate.score2.get());
        System.out.println("realScore=" + realScore.get());

    }
}

4.5?LongAdder

為了解決atomicInteger等自旋失敗的問題,創建了longadder等類,原理是通過對atomicInteger的數據進行分散槽位的形式,如果要獲取真實的值進行相加操作即可。

在低并發情況下使用atomicInteger即可。在大量并發情況下使用longadder等類。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderTest {

    public static void main(String[] args) {
        testAtomicLongVSLongAdder(10, 10000);
        System.out.println("==================");
        testAtomicLongVSLongAdder(10, 200000);
        System.out.println("==================");
        testAtomicLongVSLongAdder(100, 200000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times) {
        try {
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            long end = System.currentTimeMillis() - start;
            System.out.println("條件>>>>>>線程數:" + threadCount + ", 單線程操作計數" + times);
            System.out.println("結果>>>>>>LongAdder方式增加計數" + (threadCount * times) + "次,共計耗時:" + end);

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            long end2 = System.currentTimeMillis() - start2;
            System.out.println("條件>>>>>>線程數:" + threadCount + ", 單線程操作計數" + times);
            System.out.println("結果>>>>>>AtomicLong方式增加計數" + (threadCount * times) + "次,共計耗時:" + end2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        AtomicLong atomicLong = new AtomicLong();
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        atomicLong.incrementAndGet();
                    }
                    countDownLatch.countDown();
                }
            }, "my-thread" + i).start();
        }
        countDownLatch.await();
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        LongAdder longAdder = new LongAdder();
        for (int i = 0; i < threadCount; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        longAdder.add(1);
                    }
                    countDownLatch.countDown();
                }
            }, "my-thread" + i).start();
        }

        countDownLatch.await();
    }
}

4.6?LongAccumulator?

LongAccumulator跟longadder原理相似,但是比longadder厲害的一點是可以實現對入參的任意操作。longadder只能實現加減操作。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.stream.IntStream;

public class LongAccumulatorTest {

    public static void main(String[] args) throws InterruptedException {
        // 累加 x+y
        LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);

        ExecutorService executor = Executors.newFixedThreadPool(8);
        // 1到9累加
        IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));

        Thread.sleep(2000);
        System.out.println(accumulator.getThenReset());

    }
}

原文鏈接:https://blog.csdn.net/qq_21575929/article/details/122849263

欄目分類
最近更新