Java并发编程系列:深入分析AQS原理_aqs 队列获得锁之后为什么执行thread.currentthread().interrupt()-程序员宅基地

技术标签: Java杂货铺  


AQS又称为队列同步器,它是用来构建锁或其他同步组件的基础框架,它是实现ReentrangLock、Semaphore等同步工具的基础。本文将会详细的阐述AQS实现的细节问题。

数据结构定义

AQS内部通过int类型的state控制锁的状态,当state=0时,则说明没有任何线程占有共享资源的锁,当state>0时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待。同步队列为FIFO的双向队列,竞争失败的线程会被添加至队尾。

// 同步队列的头部
private transient volatile Node head;
// 同步队列的尾部
private transient volatile Node tail;
// 同步状态
private volatile int state;

Node节点的定义:

//标识线程的状态
volatile int waitStatus;
//等待队列的前驱节点
volatile Node prev;
//等待队列的后继节点
volatile Node next;
//当前节点的线程
volatile Thread thread;
//条件队列的等待节点
Node nextWaiter;
//判断当前节点是否是共享节点
final boolean isShared() {
    
 return nextWaiter == SHARED;
}
  • 1 CANCELLED:该节点的线程可能由于超时或被中断而处于被取消(作废)状态,一旦处于这个状态,节点状态将一直 处于CANCELLED,因此应该从队列中移除
  • -1 SIGNAL:表示该节点处于等待唤醒状态,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须唤醒其后继结点
  • -2 CONDITION:该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为0,重新进入阻塞状态
  • 0:新加入的节点

在锁的获取时,并不一定只有一个线程才能持有这个锁,所以此时有了独占模式和共享模式的区别,通过nextWaiter来区分。
还有一个点是公平锁和非公平锁,它是由子类来实现的。在ReentrantLock中有FairSync和NonFairSync来实现。

下面以ReentrantLock为例,解释锁的获取和释放流程。

获取锁

获取锁的流程为Lock.lock -> Sync.lock -> AQS.acquire -> Sync.tryAcquire -> AQS.addWaiter ->AQS.acquireQueued,我们按照这个流程逐步分析。TODO 流程图

# Lock.lock -> Sync.lock

获取锁可以通过ReentrantLock中的lock、lockInterruptibly、tryLock,此三个方法的意义在ReentrantLock的文章中已经详细阐述。

public void lock() {
    
    sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
    
    sync.acquireInterruptibly(1);
}
public boolean tryLock() {
    
    return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

可以看出内部都是通过sync来实现,抽象类Sync继承了AQS,并且Sync的实现类为FairSync和NonFairSync。因此调用根据构造方法实例化出的FairSync或着NonFairSync的lock方法:

// Fair
final void lock() {
    
    acquire(1);
}
// NonFair
final void lock() {
    
    // 以cas方式尝试将AQS中的state从0更新为1
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

公平锁和非公平锁的lock方法这里就有区别,非公共锁先通过CAS操作去竞争锁,然后再去执行AQS实现的acquire方法。
exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用锁的线程。

# AQS.acquire -> Sync.tryAcquire

继续跟进AQS

public final void acquire(int arg) {
    
    if (tryAcquire(arg){
    
        return;
    }
    if(acquireQueued(addWaiter(Node.EXCLUSIVE), arg))) {
    
        selfInterrupt();
    }
}

首先执行tryAcquire方法,由具体的子类实现,不同的子类有不同的实现方式,如果失败,表示该线程获取锁失败,就调用addWaiter方法,将当前线程加入到等待队列中,然后返回当前线程的node节点。将node节点传递给acquireQueued方法,如果node节点的前驱节点是头结点,就再次尝试获取到锁,如果获取锁成功(成功返回的是false不会执行selfInterrupt方法),就将该节点设置为头结点,如果获取失败,就将当前节点的线程挂起。

下面看非公平锁的tryAcquire实现:

// NonFair
protected final boolean tryAcquire(int acquires) {
    
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    
        // state为0,说明当前锁未被任何线程持有
        if (compareAndSetState(0, acquires)) {
    // CAS设置state,如果成功,则设置锁的拥有者为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果是重入得情况
    else if (current == getExclusiveOwnerThread()) {
    
        int nextc = c + acquires;
        setState(nextc);// 更改进入的次数
        return true;
    }
    return false;
}

这里公平锁和非公平锁的实现几乎相同,只是多了一个!hasQueuedPredecessors()判断条件,意思是当前同步队列中如果没有正在排队的线程,才会进行后续的步骤。

# addWaiter

如果执行到addWaiter,则说明前面的tryAcquire没有抢到锁,那么会将将节点加入到等待队列。这里需要注意前面提到独享锁和共享锁,
ReentrantLock属于独享锁,并且AQS通过Node节点也就是线程的封装来表示独享/共享,因此这里传入的Mode的参数为Node.EXCLUSIVE。

private Node addWaiter(Node mode) {
    
    // 将当前线程构造为等待节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
    // 如果尾节点部为空
        node.prev = pred;// 将当前节点添加至队列尾部
        if (compareAndSetTail(pred, node)) {
    
            pred.next = node;
            return node;
        }
    }
    // 如果尾节点为空,或者CAS操作失败,则通过死循环更新尾节点
    enq(node);
    return node;
}

enq方法没什么好说的,死循环+CAS,返回node的前驱节点

private Node enq(final Node node) {
    
    for (;;) {
    
        Node t = tail;
        if (t == null) {
     // 如果尾节点为空,那么初始化尾和头,头节点是一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
     // 
            node.prev = t;
            if (compareAndSetTail(t, node)) {
    
                t.next = node;
                return t;
            }
        }
    }
}
# acquireQueued

在把node插入队列末尾后,它并不立即挂起该节点中线程,因为在插入它的过程中,前面的线程可能已经执行完成,
所以它会先进行自旋操作,尝试让该线程重新获取锁。代码如下:

final boolean acquireQueued(final Node node, int arg) {
    
    boolean failed = true;
    try {
    
        boolean interrupted = false;
        for (;;) {
    
            // 得到前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是head节点并且tryAcquire获取到锁
            if (p == head && tryAcquire(arg)) {
    
                setHead(node); // 设置Head为节点当前节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果当前节点前驱节点不是head或者CAS设置失败,去挂起线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() )
                interrupted = true;
        }
    } finally {
    
    	// 正常情况下failed = false,cancelAcquire的作用是删除节点,
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire()方法的作用是判断当前结点的前驱结点是否为SIGNAL状态,如果是则返回true。
如果为CANCELLED状态(值为1>0),即结束状态,则说明该前驱结点已没有用应该从同步队列移除,直到寻找到非CANCELLED状态的结点。倘若前驱结点的ws值不为CANCELLED,也不为SIGNAL(当从Condition的条件等待队列转移到同步队列时,结点状态为CONDITION因此需要转换为SIGNAL),那么将其转换为SIGNAL状态,以便在下轮循环中将其挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
    int ws = pred.waitStatus;// 前驱节点的状态
    if (ws == Node.SIGNAL)// 如果是等待唤醒状态,返回true,
        return true;
    if (ws > 0) {
     // >0 则为CNACLE状态,被取消了,需要将前驱节点移除
        do {
    
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
    // 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        return Thread.interrupted();
}

parkAndCheckInterrupt()方法挂起当前线程,需要等待一个unpark()操作来唤醒它,调用interrupte方法可以中断,稳定后列表的状态为

  • 除了头节点,剩余节点都被阻塞,线程处于WAITING状态。
  • 除了尾节点,剩余节点都满足waitStatus==SIGNAL,表示释放后需要唤醒后继节点。

到此ReetrantLock内部间接通过AQS的FIFO的同步队列就完成了lock()操作。一张图总结lock的流程:
lock

释放锁

下面继续看unLock的流程:

public final boolean release(int arg) {
    
   if (tryRelease(arg)) {
    
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
   }
   return false;
}

释放锁其实就两个步骤,1.释放锁,2.如果完全释放则唤醒等待的线程。先看释放锁:

protected final boolean tryRelease(int releases) {
    
    int c = getState() - releases;// 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    // 完全释放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

完全释放表示ownerThread的所有重入操作均已结束,接着是唤醒后面的线程,注意这里并没有将head置为null,只是将ExclusiveOwnerThread和state初始化。

private void unparkSuccessor(Node node) {
           
    int ws = node.waitStatus;
    if (ws < 0) // 正常情况下为-1
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    // 如果下一个节点为空或者被取消,继续从尾节点开始找离头结点最近的
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)// 状态<0 的节点
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);// 唤醒下一个节点
}

当最近可用的节点被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,继续开始自旋。

Condition实现原理

在 https://blog.csdn.net/TheLudlows/article/details/76962006 中介绍了Condition的用法,类似于Object的wait和notify。

# await

Condition接口提供了await、signal方法,它的实现为AQS的内部类ConditionObject,在它的内部也有一个队列,称为等待队列,单向列表实现。AQS内部的队列叫做同步队列。同步队列主要用来保存阻塞的线程,而等待队列用来保存调用了await方法的线程。ConditionObject的成员变量如下:

 public class ConditionObject implements Condition, java.io.Serializable {
    
    //等待队列第一个等待结点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
}

等待队列的元素和同步队列中的元素都是Node类型。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。

等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。

下面分析await方法的逻辑:

public final void await() throws InterruptedException {
    
    if (Thread.interrupted())
        throw new InterruptedException();
    // 构建为Node节点,并加入队尾
    Node node = addConditionWaiter();
    // 释放当前线程锁即释放同步状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
    // 阻塞,直到收到信号或被中断
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 唤醒之后执行,同时判断线程是否被中断
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

添加到等待队列:

private Node addConditionWaiter() {
    
    Node t = lastWaiter;
    // lastWaiter初始化为null
    // 清除被唤醒的node
    if (t != null && t.waitStatus != Node.CONDITION) {
    
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 新建node,状态为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)// 初始化队列
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

尽管此处没有任何线程安全的保护,但实际使用时不会出现任何线程安全问题——因为条件队列的使用要求我们在调用await或signal时持有与该条件队列唯一相关的锁。共享锁中没有实现Lock接口,因此没有newCondition方法。

final int fullyRelease(Node node) {
    
    boolean failed = true;
    try {
    
        int savedState = getState();
        if (release(savedState)) {
    // 上节讲过的release,注意参数
            failed = false;
            return savedState;// 返回状态
        } else {
    
            throw new IllegalMonitorStateException();
        }
    } finally {
    
        if (failed)// 正常情况不会进入此分支
            node.waitStatus = Node.CANCELLED;
    }
}
# signal
public final void signal() {
    
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal()方法做了两件事,一是判断当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。二是将等待队列的头节点从等待队列中删除,同时将它加入到同步队列,意思是次线程可以去竞争锁了。

private void doSignal(Node first) {
    
   do {
    
   	   firstWaiter = first.nextWaiter;// 移除首节点
       if (firstWaiter == null)
           lastWaiter = null;
       first.nextWaiter = null; // 将旧的首节点next属性置为null
       
   } while (!transferForSignal(first) && (first = firstWaiter) != null);
}     

transferForSignal 将 first节点移出等待队列,通过时修改状态,加入同步队列,根据在同步队列中的前驱节点的状态和来决定是否唤醒等待阻塞的线程。

final boolean transferForSignal(Node node) {
    
	// 设置节点状态为初始状态
   if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);// 加入同步队列,得到前驱节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

其实这里不唤醒阻塞的线程也是可以的,因为此线程已经加入到同步队列中,同步队列中等待的线程是通过前驱节点的来唤醒的。但是这里为什么要多次一举?能够进入此分支说明前驱节点是CANCELLED状态,那么说明当前节点距离Head又进了一步,早些将此CANCELLED节点清除,因此将次线程唤醒,去竞争锁,同时删除无效的节点。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/TheLudlows/article/details/88696874

智能推荐

Docker 快速上手学习入门教程_docker菜鸟教程-程序员宅基地

文章浏览阅读2.5w次,点赞6次,收藏50次。官方解释是,docker 容器是机器上的沙盒进程,它与主机上的所有其他进程隔离。所以容器只是操作系统中被隔离开来的一个进程,所谓的容器化,其实也只是对操作系统进行欺骗的一种语法糖。_docker菜鸟教程

电脑技巧:Windows系统原版纯净软件必备的两个网站_msdn我告诉你-程序员宅基地

文章浏览阅读5.7k次,点赞3次,收藏14次。该如何避免的,今天小编给大家推荐两个下载Windows系统官方软件的资源网站,可以杜绝软件捆绑等行为。该站提供了丰富的Windows官方技术资源,比较重要的有MSDN技术资源文档库、官方工具和资源、应用程序、开发人员工具(Visual Studio 、SQLServer等等)、系统镜像、设计人员工具等。总的来说,这两个都是非常优秀的Windows系统镜像资源站,提供了丰富的Windows系统镜像资源,并且保证了资源的纯净和安全性,有需要的朋友可以去了解一下。这个非常实用的资源网站的创建者是国内的一个网友。_msdn我告诉你

vue2封装对话框el-dialog组件_<el-dialog 封装成组件 vue2-程序员宅基地

文章浏览阅读1.2k次。vue2封装对话框el-dialog组件_

MFC 文本框换行_c++ mfc同一框内输入二行怎么换行-程序员宅基地

文章浏览阅读4.7k次,点赞5次,收藏6次。MFC 文本框换行 标签: it mfc 文本框1.将Multiline属性设置为True2.换行是使用"\r\n" (宽字符串为L"\r\n")3.如果需要编辑并且按Enter键换行,还要将 Want Return 设置为 True4.如果需要垂直滚动条的话将Vertical Scroll属性设置为True,需要水平滚动条的话将Horizontal Scroll属性设_c++ mfc同一框内输入二行怎么换行

redis-desktop-manager无法连接redis-server的解决方法_redis-server doesn't support auth command or ismis-程序员宅基地

文章浏览阅读832次。检查Linux是否是否开启所需端口,默认为6379,若未打开,将其开启:以root用户执行iptables -I INPUT -p tcp --dport 6379 -j ACCEPT如果还是未能解决,修改redis.conf,修改主机地址:bind 192.168.85.**;然后使用该配置文件,重新启动Redis服务./redis-server redis.conf..._redis-server doesn't support auth command or ismisconfigured. try

实验四 数据选择器及其应用-程序员宅基地

文章浏览阅读4.9k次。济大数电实验报告_数据选择器及其应用

随便推点

灰色预测模型matlab_MATLAB实战|基于灰色预测河南省社会消费品零售总额预测-程序员宅基地

文章浏览阅读236次。1研究内容消费在生产中占据十分重要的地位,是生产的最终目的和动力,是保持省内经济稳定快速发展的核心要素。预测河南省社会消费品零售总额,是进行宏观经济调控和消费体制改变创新的基础,是河南省内人民对美好的全面和谐社会的追求的要求,保持河南省经济稳定和可持续发展具有重要意义。本文建立灰色预测模型,利用MATLAB软件,预测出2019年~2023年河南省社会消费品零售总额预测值分别为21881...._灰色预测模型用什么软件

log4qt-程序员宅基地

文章浏览阅读1.2k次。12.4-在Qt中使用Log4Qt输出Log文件,看这一篇就足够了一、为啥要使用第三方Log库,而不用平台自带的Log库二、Log4j系列库的功能介绍与基本概念三、Log4Qt库的基本介绍四、将Log4qt组装成为一个单独模块五、使用配置文件的方式配置Log4Qt六、使用代码的方式配置Log4Qt七、在Qt工程中引入Log4Qt库模块的方法八、获取示例中的源代码一、为啥要使用第三方Log库,而不用平台自带的Log库首先要说明的是,在平时开发和调试中开发平台自带的“打印输出”已经足够了。但_log4qt

100种思维模型之全局观思维模型-67_计算机中对于全局观的-程序员宅基地

文章浏览阅读786次。全局观思维模型,一个教我们由点到线,由线到面,再由面到体,不断的放大格局去思考问题的思维模型。_计算机中对于全局观的

线程间控制之CountDownLatch和CyclicBarrier使用介绍_countdownluach于cyclicbarrier的用法-程序员宅基地

文章浏览阅读330次。一、CountDownLatch介绍CountDownLatch采用减法计算;是一个同步辅助工具类和CyclicBarrier类功能类似,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。二、CountDownLatch俩种应用场景: 场景一:所有线程在等待开始信号(startSignal.await()),主流程发出开始信号通知,既执行startSignal.countDown()方法后;所有线程才开始执行;每个线程执行完发出做完信号,既执行do..._countdownluach于cyclicbarrier的用法

自动化监控系统Prometheus&Grafana_-自动化监控系统prometheus&grafana实战-程序员宅基地

文章浏览阅读508次。Prometheus 算是一个全能型选手,原生支持容器监控,当然监控传统应用也不是吃干饭的,所以就是容器和非容器他都支持,所有的监控系统都具备这个流程,_-自动化监控系统prometheus&grafana实战

React 组件封装之 Search 搜索_react search-程序员宅基地

文章浏览阅读4.7k次。输入关键字,可以通过键盘的搜索按钮完成搜索功能。_react search