Java高性能编程之CAS与ABA及解决方法

Java高性能编程之CAS与ABA及解决方法

前言

如果喜欢暗色调的界面或者想换换界面,可以看看我在个人博客发布的 Java高性能编程之CAS与ABA及解决方法

CAS概念

CAS,全称Compare And Swap,比较与交换。

属于硬件级别的同步原语,从处理器层面提供了内存操作的原子性。

从概念上,我们可以得出三点。第一,CAS的运作方式(通过比较与交换实现)。第二,硬件层面支持,性能肯定不低(当然它也不是银弹)。第三,提供原子性,那么它的功能肯定是确保原子性,从而确保线程安全。

实际使用中,CAS操作需要输入两个数值,一个旧值A(期望操作前的值)和一个新值B, 在操作期间先将旧值A与实际内存中的值进行比较,如果没有发生变化,才将实际内存中的值交换成新值B ,如果发生了变化则不交换。

CAS应用场景

既然CAS的功能是提供原子性,那么从这个角度出发思考,如计数器,账户转账等。

那么提到计数器,就不得不提到JUC包下的atomic包了。其中提供了大量原子操作了,如Integer类型的值变化,Long类型的值变化,Boolean类型的值变化。

说到这里,某些人就要一句“球多麻袋”,Integer类型的值变化,不就一句代码嘛(如i = i + 1;),不就是原子操作嘛。即使有赋值操作,也可以写成(i++;),这样不就一个操作了嘛。当然学习过汇编或对计算机指令有一定了解的朋友可能就知道原因了。很多时候,我们在程序中的一段代码,编译到底层执行时,往往是多个语句(谁让CPU只能执行非常简单的操作呢)。如i++操作编译成Java指令后是以下四句:

  1. getfield
  2. iconst_1
  3. iadd
  4. putfield

具体的意思,我就不解读了(不是今天的重点),感兴趣的,可以百度或者@我。

话题回到JUC包下的atomic包,我之所以提它,就是因为其原子性的实现就是依靠CAS实现的。

AtomicInteger类:

/**
     * Atomically sets to the given value and returns the old value.
     *
     * @param newValue the new value
     * @return the previous value
     */
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }

Unsafe类:

public final int getAndSetInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var4));

        return var5;
    }
public final native boolean compareAndSwapInt(Object var1,
         long var2, int var4, int var5);

通过上述三段源码,可以清楚看出,AtomicInteger中getAndSet这一原子方法是通过Unsafe中的原生方法compareAndSwapInt方法完成CAS机制,从而确保操作的原子性。

CAS还涉及到Java中锁的实现,这个也留到锁专题再细说,毕竟这次的主题是CAS,ABA及解决之道。

Why need CAS

那么为什么需要CAS呢,毕竟Java已经有了多种手段来保证线程安全的原子性问题,最广为人知的除了Atomic包(底层是CAS),就是synchronized锁了。

原因很简单,因为synchronized锁什么的太重了。这里所说的重,是指其消耗的系统资源较多(所以又称为重量级锁)。所以有着底层硬件支持的CAS才会那么受欢迎。当然CAS也有着自己的问题,这个后面会谈到。

CAS应用:

说得再多,不如来点实际代码,看看具体效果。

以下代码,包含四个类:一个主类,用于调用实现类,展示效果(注释中有执行结果)。三个实现类,分别展示了没有处理,使用Atomic包,使用CAS三种方式来多线陈增加全局计数器的效果。

AtomicityWithNoDeal

未做任何处理,通过100个子线程分别执行10000次计数器+1操作。

package tech.jarry.learning.netease;
    
    /**
     * @Description:
     * @Author: jarry
     */
    public class AtomicityWithNoDeal {
    
        private volatile int i = 0;
    
        private void add(){
            i++;
        }
    
        public void run() throws InterruptedException {
            for (int j = 0; j < 100; j++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int m = 0; m< 10000; m++){
                            add();
                        }
                        System.out.println(Thread.currentThread().getName()+" has run finished !");
                    }
                }).start();
            }
    
            Thread.sleep(2000);
            System.out.println("i: "+i);
        }
    }

AtomicityWithAtomic

进行Atomic包处理,通过100个子线程分别执行10000次计数器+1操作。

package tech.jarry.learning.netease;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @Description:
     * @Author: jarry
     */
    public class AtomicityWithAtomic {
    
        private AtomicInteger atomicInteger = new AtomicInteger(0);
    
        private void add(){
            atomicInteger.incrementAndGet();
        }
    
        public void run() throws InterruptedException {
    
            for (int j = 0; j < 100; j++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int m = 0; m< 10000; m++){
                            add();
                        }
                        System.out.println(Thread.currentThread().getName()+" has run finished !");
                    }
                }).start();
            }
    
            Thread.sleep(2000);
            System.out.println("atomicInteger: "+atomicInteger.get());
        }
    }

AtomicityWithCAS

进行手写的CAS处理,通过100个子线程分别执行10000次计数器+1操作。

package tech.jarry.learning.netease;
    
    import sun.misc.Unsafe;
    
    import java.lang.reflect.Field;
    
    /**
     * @Description:
     * @Author: jarry
     */
    public class AtomicityWithCAS {
    
        // 建立全局计数器,用于观察CAS原子性特点
        volatile int k = 0;
        // 定义Unsafe引用对象
        private static Unsafe unsafe = null;
        // 定义k的内存偏移量(可以理解为k在内存中地址,当然实际与C指针的内存地址是完全不同的)
        private static long valueOffset;
    
        static {
            try {
                // 利用反射获取Unsafe实例对象(正常途径是无法获取的)
                Field field = Unsafe.class.getDeclaredField("theUnsafe");
                field.setAccessible(true);
                // 由于unsafe是静态对象,所以传入null。想想也对,毕竟不同的实例对象的非静态对象当然是不同的,当然需要传入实例对象作为参数喽。
                // 另外吐槽一句,我查看这段资料的时候,发现百度第一页的各个博客,几乎都是一样的示例代码。。。
                unsafe = (Unsafe)field.get(null);
    
                // 获取当前对象中全局计数器k的内存地址偏移
                Field kField = AtomicityWithCAS.class.getDeclaredField("k");
                kField.setAccessible(true);
                valueOffset = unsafe.objectFieldOffset(kField);
    
            } catch (NoSuchFieldException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 执行全局计数器k+1的方法
         */
        private void add(){
            // 当CAS执行失败时,需要重新执行相关操作,直到执行成功。故CAS是一个自旋锁。
            while(true) {
                // 获取CAS操作所需的旧值
                int current = unsafe.getIntVolatile(this,valueOffset);
                // 进行CAS操作
                if (unsafe.compareAndSwapInt(this,valueOffset,current,current+1)){
                    // 执行成功,就跳出循环
                    break;
                }
            }
        }
    
        /**
         * 为了体现效果,这里开启了100个线程循环执行add()操作
         * @throws InterruptedException
         */
        public void run() throws InterruptedException {
            for (int j = 0; j < 100; j++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        // 每个线程执行10000次add()操作
                        for (int m = 0; m< 10000; m++){
                            add();
                        }
                        System.out.println(Thread.currentThread().getName()+" has run finished !");
                    }
                }).start();
            }
    
            // 当前线程休眠2s,确保所有子线程执行完毕
            Thread.sleep(2000);
            System.out.println("k: "+k);
        }
    }

Main主函数

主线程调用同一包下的AtomicityWithNoDeal,AtomicityWithAtomic,AtomicityWithCAS三个类,观察运行效果。

package tech.jarry.learning.netease;
    
    public class Main {
    
        public static void main(String[] args) throws InterruptedException {
    
    //      (new AtomicityWithNoDeal()).run();
            /**
             * 运行结果:
             * Thread-1 has run finished !
             * 。。。。。。(略98个线程)
             * Thread-83 has run finished !
             * i: 440239
             */
    
    //      (new AtomicityWithAtomic()).run();
            /**
             * 运行结果:
             * Thread-0 has run finished !
             * 。。。。。。(略98个线程)
             * Thread-69 has run finished !
             * atomicInteger: 1000000
             */
    
    //      (new AtomicityWithCAS()).run();
            /**
             * 运行结果:
             * Thread-1 has run finished !
             * 。。。。。。(略98个线程)
             * Thread-80 has run finished !
             * k: 1000000
             */
    
        }
    }

注释

其实上述代码中,重要的地方,我都写上了相关的注释。如果还有什么不清楚的地方,可以@我。

CAS缺点

CAS当然是有缺点的,否则就没Synchronized什么事情了。

  1. 从概念及代码示例中可以看出,当CAS操作执行失败时,会继续进入下一个循环执行,直到CAS操作执行成功,这种行为称为自旋。自旋的实现让所有线程都处于高频运行,争抢CPU执行时间的状态。如果操作长时间不成功,会带来很大的CPU资源消耗(所以Java有锁的粗化/升级)。
  2. CAS仅能针对单个变量进行操作,不能用于多个变量来实现原子操作。
  3. ABA问题。

正如,我之前所提到的,看待技术问题要找到其特性的最初来源。如第二点中CAS之所以不能支持多个变量的原子操作,是因为CAS操作的原子性来源于硬件的支撑,而硬件只支持单个变量的原子操作,故CAS只能针对单个变量的原子操作进行操作。而 有些文章或代码中提到通过CAS执行多个变量的原子操作,其实本质并不是针对多个变量,而是针对这些变量的集合或者总的对象的Reference操作的 。这有点抽象,举个栗子。我将通过CAS操作转变了某个数组的引用变量的指向,看起来我实现了整个数组内多个元素转变的原子操作。但实际是我通过改变当前引用变量的指向实现的,CAS的原子操作针对的是这个指向Reference。具体代码可以参照Atomic包中的AtomicIntegerArray与AtomicReference等实现。

至于第一点,细究起来有非常多的内容,如锁的粗化,自旋是否可以优化等。其实CAS的自旋操作实际是确保了一定有CAS操作在执行,但这是通过牺牲CPU实现的。举个栗子,为了能够监听硬件串口返回的消息,我通过while(true)来不断获取串口发送过来的数据,直到我获得了一个完整数据包。

话头收回来,让我们谈谈第三点-ABA问题。

ABA概念

ABA问题,说白了就是钻了CAS机制的空子。

为了更好地说明这个问题,我们设定两个线程,同时对变量i进行操作。

正常场景:

初始i=0;

线程-1(打算对i进行CAS操作)

线程-1:获取i的旧值-0;

线程-1:设定i的新值-2;

线程-1:对i进行CAS操作,旧值i=0符合实际内存中i现有的值,执行swap操作,i=2;

看似正常的场景:

线程-1(打算对i进行CAS操作)

线程-1:获取i的旧值-0;

线程-2:对i进行了CAS操作,将i改为10;

线程-1:设定i的新值-2;

线程-2:对i进行了CAS操作,将i重新改为0;

线程-1:对i进行CAS操作,旧值i=0符合实际内存中i现有的值,执行swap操作,i=2;

上述的两个场景中线程-1都完成了想要完成的CAS操作,区别就是其中线程2曾经进行过一些操作。

当然这里肯定有朋友要说,这对程序的结果没有任何的影响。是的,在现有的例子中确实对程序的运行结果毫无影响。

这里我举出两个大佬给出的非常经典的例子,分别是极简与复杂的代表。

极简:你从银行取出一箱子钱,放在了车上。结果你一个转头,小偷将装满钱的箱子拿走,并在原来的位置放了一个看起来一模一样,但装满废纸的箱子。你并没有发现这一切,拿着这个箱子开开心心地回家了。囧。

复杂: 通过单向链表展现ABA的潜在威胁 。由于例子比较复杂,我就不在这里赘述。感兴趣的朋友,可以看看。

其实这两个例子本质都是一样的,想表达的就是我们CAS操作的不是简简单单的数值,更有着其背后的深层信息(然后通过内存,链表,引用来证明观点)。

这里我要开始表达我的观点了:现有的大部分博客或者文章都解释得或多或少有一定问题。。只有部分大佬的博客提到了核心,但是为了说了核心,又举了很多例子(ABA问题的例子本来就不好举,例子大多容易被误解,后面会谈到),导致核心论点被忘却。然后又有很多人去借鉴,或者直接拿来这些例子,但是又不能很好地通过这些例子说明ABA,然后就通过自己的理解解释了一番(更好理解,但是却开始歪了),不断有人进入这个圈子,然后解释越来越歪。造成很多刚了解ABA的小白一脸茫然,看着那套看似正确的解释,就入坑(虽然这个坑影响不那么大,起码对于绝大部分人员都没太大影响。就像很多Java开发者不懂JMM,工作做得不也还可以嘛)了。。。

(这里插句题外话,那就是有关博客抄袭复制的问题。其实我不反对技术的之间的借鉴,毕竟重复造轮子是不可取的,只有有效的思想碰撞才可以产生推动力嘛。但是通过爬虫无脑爬取,或者直接复制粘贴全文,就真的有些过分了。之所以有这种感慨,是因为我现在有时候在百度查询一些资料,十多篇博客看下来,居然大部分都是一样的,太影响效率了)

当然,说话要讲道理的,不能只做“批评家”。就上述两个经典例子存在一个很大的问题,那就是 即使脱离了CAS,上述两个例子中存在的问题,还是存在 。另外一点佐证就是 很多时候,我们需要解决的就是简单的数值的CAS问题 ,这个数值不牵涉什么复杂的依赖关系。关于这点佐证的最有力说明就是 轻量级锁的CAS为什么不需要考虑ABA问题 (因为其根本就不涉及什么复杂依赖)。

话说回来,其实上述两个例子,以及我的两点说明,其实更倾向于表现ABA,距离ABA问题的本质,还差了那么一句画龙点睛的总结。

总结:ABA问题的本质就是由于对多线程下CAS流程控制的缺乏,而导致的信息缺失。表现出来的就是由于缺乏必要信息(小偷对箱子进行了操作),而产生了隐患。

如果你还是有些无法理解这个结论,那你还记得程序的一个重要原则-程序置于控制下。如果你都无法控制你的程序的行为,那么无疑,你的程序是有问题的。

ABA示例:

接下来通过银行非法洗钱的例子,来简单阐述由信息缺失,造成的问题。

ABATest

这是一个产生了ABA问题的示例。示例中银行无法发现客户账户上的非法洗钱行为。

package tech.jarry.learning.netease.casWithABA;
    
    import sun.misc.Unsafe;
    import tech.jarry.learning.netease.test.CounterUnsafe;
    
    import java.lang.reflect.Field;
    
    /**
     * @Description:
     * @Author: jarry
     */
    public class ABATest {
    
        volatile int k = 10;
        private static Unsafe unsafe = null;
        private static long valueOffset;
    
        static {
            try {
                Field field = Unsafe.class.getDeclaredField("theUnsafe");
                field.setAccessible(true);
                unsafe = (Unsafe)field.get(null);
    
                Field iField = CounterUnsafe.class.getDeclaredField("i");
                iField.setAccessible(true);
                valueOffset = unsafe.objectFieldOffset(iField);
            } catch (NoSuchFieldException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
    
        private void transferOld() throws InterruptedException {
            System.out.println("开始转账(旧系统:存在ABA问题)");
            while(true) {
                int current = unsafe.getIntVolatile(this,valueOffset);
                System.out.println("由于CPU抢占问题,转账程序阻塞100ms(为了将可能出现的ABA问题,变成肯定出现)");
                Thread.sleep(100);
                if (unsafe.compareAndSwapInt(this,valueOffset,current,current+1)){
                    System.out.println("银行转账"+1+"元,成功。余额:"+k);
                    break;
                }
                System.err.println("警告:账户存在交易记录以外的资金流动");
            }
        }
    
        private void cleanMoneySub() {
            while(true) {
                int current = unsafe.getIntVolatile(this,valueOffset);
                if (unsafe.compareAndSwapInt(this,valueOffset,current,current-2)){
                    break;
                }
            }
            System.out.println("非法组织洗钱,盗走2元,余额:"+k);
        }
    
        private void cleanMoneyAdd(){
            while(true) {
                int current = unsafe.getIntVolatile(this,valueOffset);
                if (unsafe.compareAndSwapInt(this,valueOffset,current,current+2)){
                    break;
                }
            }
            System.out.println("非法组织洗钱,加入2元,余额:"+k);
        }
    
        public void oldSystemTransfer() throws InterruptedException {
            ABATest abaTest = new ABATest();
            System.out.println("账户余额:"+k);
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        abaTest.transferOld();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            Thread.sleep(20);
            abaTest.cleanMoneyAdd();
            Thread.sleep(20);
            abaTest.cleanMoneySub();
    
            Thread.sleep(200);
            System.out.println("银行卡原来余额为10,接收转账1元,故期望余额为11元。实际余额:"+abaTest.k);
        }
    
        public static void main(String[] args) throws InterruptedException {
            ABATest abaTest = new ABATest();
            abaTest.oldSystemTransfer();
            /**
             * 运行结果:
             * 账户余额:10
             * 开始转账(旧系统:存在ABA问题)
             * 由于CPU抢占问题,转账程序阻塞100ms(为了将可能出现的ABA问题,变成肯定出现)
             * 非法组织洗钱,加入2元,余额:12
             * 非法组织洗钱,盗走2元,余额:10
             * 银行转账1元,成功。余额:11
             * 银行卡原来余额为10,接收转账1元,故期望余额为11元。实际余额:11
             */
        }
    }

ABAResolveTest

这是一个修复了ABA问题的示例。示例中银行正常发现客户账户上的非法洗钱行为。

package tech.jarry.learning.netease.casWithABA;
    
    import java.util.concurrent.atomic.AtomicStampedReference;
    
    /**
     * @Description:
     * @Author: jarry
     */
    public class ABAResolveTest {
    
        private AtomicStampedReference<Integer> kReference = new AtomicStampedReference<>(10,0);
    
        private void transferNew() throws InterruptedException {
            System.out.println("开始转账(新系统:解决了ABA问题)");
            while(true) {
                Integer currentReference = kReference.getReference();
                int stamp = kReference.getStamp();
                System.out.println("由于CPU抢占问题,转账程序阻塞100ms");
                Thread.sleep(100);
                if (kReference.compareAndSet(currentReference,currentReference+1,stamp,stamp+1)){
                    System.out.println("银行转账"+1+"元,成功。余额:"+kReference.getReference());
                    break;
                }
                System.err.println("警告:账户存在交易记录以外的资金流动");
            }
        }
    
        private void cleanMoneySub(){
            int stamp = kReference.getStamp();
            kReference.set(kReference.getReference()+2,stamp+1);
            System.out.println("非法组织洗钱,盗走2元,余额:"+kReference.getStamp());
        }
    
        private void cleanMoneyAdd(){
            int stamp = kReference.getStamp();
            kReference.set(kReference.getReference()-2,stamp+1);
            System.out.println("非法组织洗钱,加入2元,余额:"+kReference.getStamp());
        }
    
        private void newSystemTransfer() throws InterruptedException {
            ABAResolveTest abaResolveTest = new ABAResolveTest();
            System.out.println("账户余额:"+abaResolveTest.kReference.getReference());
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        abaResolveTest.transferNew();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            Thread.sleep(20);
            abaResolveTest.cleanMoneyAdd();
            Thread.sleep(20);
            abaResolveTest.cleanMoneySub();
    
            Thread.sleep(200);
            System.out.println("银行卡原来余额为10,接收转账1元,故期望余额为11元。实际余额:"+abaResolveTest.kReference.getReference());
        }
    
        public static void main(String[] args) throws InterruptedException {
            ABAResolveTest abaResolveTest = new ABAResolveTest();
            abaResolveTest.newSystemTransfer();
            /**
             * 运行结果:
             * 账户余额:10
             * 开始转账(新系统:解决了ABA问题)
             * 由于CPU抢占问题,转账程序阻塞100ms
             * 非法组织洗钱,加入2元,余额:1
             * 非法组织洗钱,盗走2元,余额:2
             * 警告:账户存在交易记录以外的资金流动
             * 由于CPU抢占问题,转账程序阻塞100ms
             * 银行转账1元,成功。余额:11
             * 银行卡原来余额为10,接收转账1元,故期望余额为11元。实际余额:11
             */
        }
    }

上述的两个例子,也许不是最适合的,但确实阐述了我想要表达的想法。

话说回来,只有到自己写demo时,才能理解大佬写ABA的demo时内心的挣扎啊。囧

ABA问题的解决

ABA问题的解决,说白了就是通过引入版本号,从而解决ABA问题的造成的隐患。

用我的话说呢,就是通过引入版本号,了解到线程执行操作时,是否有别的线程做了类似ABA的事情,从而使得本线程的CAS操作重新执行。这里为什么重新执行,因为简单啊。当然,也可以如我那样打个输出或者注释什么的(可能会浪费系统资源)。不管怎么处理,起码这次我知道有这么个问题了。囧。

小结

至此,CAS机制,ABA问题及解决方案,都已经叙述完毕了。

核心总结:

ABA问题的本质就是由于对多线程下CAS流程控制的缺乏,而导致的信息缺失。表现出来的就是由于缺乏必要信息,而产生了隐患

该说的差不多都说了,简单回顾一下:

  • 凡事都有其利弊,往往弊端就是由于其优点带来的。如CAS的硬件支持。
  • 技术的学习,需要追寻技术特性的真正来源,才可以一步步走向架构师。
  • 学习,一方面需要寻求多方资料,另一方面也需要自己的理解与验证。
  • 遇到无法理解或者无法解读的事物时,就去寻找它的定义,它的原则。
我来评几句
登录后评论

已发表评论数()

相关站点

热门文章