温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

AbstractQueuedSynchroizer(AQS) 同步器详解详解

发布时间:2020-10-19 15:18:10 来源:网络 阅读:1221 作者:wx5c78c8b1dbb1b 栏目:编程语言

一、什么是同步器
同步器是用来构建锁或者其他同步组件的基础框架,它使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,它能实现大部分的同步需求。
同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程的并行访问),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了状态管理、线程的排队、等待与唤醒等底层操作。锁和同步容器很好地隔离了使用者和实现者所需要关注的领域。
二、同步器的基本成员(介绍常用的类好方法)
Node 是AQS的内部类构成AQS队列的一种数据结构。

成员变量 作用
waitStatus 记录节点的等待状态。包括如下状态:① CANCELLED,值为1,由于同步队列中等待线程超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化。② SIGNAL值为-1,后继节点的线程处于等待状态,而当前线程如果释放了同步状态或者取消,将会通知后继节点,使得后继节点得以运行。③ CONDITION值为-2,节点在等待队列中,节点等待在Condtion上,当其他线程对Condtion调用了signal方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。④ PROPAGATE值为-3,表示下一次共享式同步状态将会无条件地被传播下去。⑤ INITAL,值为0,初始状态
SHARED = new Node() 表示共享式的node
EXCLUSIVE = null 独占式的node
Node prev Node的前节点
Node next Node的后节点
nextWaitert 等待队列的中node的下一个节点

ConditionObject是AQS的内部类构成类似Object的等待/通知机制。

成员/方法 作用
Node firstWaiter 等待队列的头节点
Node lastWaiter 等待队列的尾节点
await() 当前线程进入等待状态知道被通知或中断,当前线程进入运行状态且从await()返回的情况如下,包括:① 其它线程调用Interrupt()方法中断当前线程。② 如果当前线程从await()方法返回,那么表明该线程已经获取了Condtion对象锁对应的锁
awaitUninterruptibly() 当前线程进入等待直到被通知,该方法对中断不敏感
awaitNanos(long nanosTimeout)) 当前线程进入等待状态直到被通知、中断或者超时。返回值表示剩余的时间,如果在nanosTimeout纳秒之前被唤醒,那么返回就是(nanosTimeout-实际耗时);如果返回是0或者负数,那么可以认定已经超时了
awaitUntil(Date deadline) 当前线程进入等待状态直到被通知、中断或者某个时间。如果没有到指定时间就被通知,方法返回true,否则,表示超时,方法返回false
signal() 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关的锁
signalAll 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁

AQS主要成员

成员变量 作用
state 维护锁的一个变量(同步状态,很重要)① setState 。② getState。 ③ compareAndSetState。
Node head FIFO同步队列的头结点 。
Node tail FIFO同步队列的尾结点 。

AQS主要方法

方法名 作用
acquire() 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法会调用重写的tryAcquire(arg)方法(需要锁自己实现)
release(int arg) 独占式释放状态,如果释放状态成功,则会去唤醒头结点;释放状态调用tryRelease(arg)方法(需要自己实现)
acquireShared(int arg) 共享式获取同步状态,也就是说可以几个线程同时获取同步状态,如果当前线程未获取同步状态,将会进入同步队列。
releaseShared() 共享式释放状态,释放之后会唤醒头结点
acquireInterruptibly() 响应中断的独占式获取同步状态,当前线程未获取同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出中断异常,并返回
tryAcquireNanos() 在acquireInterruptibly()基础上增加超时限制,如果当前线程在超时时间内没有获取同步状态,那么将返回false,如果获取到了返回true
acquireSharedInterruptibly() 响应中断的共享式获取同步状态
tryAcquireSharedNanos() 在acquireSharedInterruptibly()的基础上增加超时限制

以上就是AQS的一些基本成员和方法,下面主要从现实的角度分析这些方法,理解这些方法的实现,能刚好的帮助我们去理解锁。
三、AQS的方法实现分析
1)、独占系列的方法
①、acquire()独占式获取同步状态,表示只会有一个线程获取,其它线程进入同步队列。
源代码如下:

// 获取锁的方法(独占模式)
    public final void acquire(int arg) {
        // tryAcquire(arg) 这个方法需要我们自己去实现,如果获取失败,
        // 调用addWaiter构造节点
        // acquireQueued
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我们可以看见acquire方法内部调用了tryAcquire(arg)方法,这个方法需要构造同步组件的类自己去实现,不过返回值已经被AQS定义好了,返回true代表获取同步状态成功,返回false代表失败,需要将线程构造节点加入同步队列,就是调用acquireQueued这个方法。
acquireQueued这个方法实际是先去调用了addWaiter方法。
addWaiter(),这个方法其实就是把当前节点加入到同步队列,加入成功才返回,其实队列初始化时会制造一个空的节点,然后在空的节点后面设置同步节点(可以理解为每次获取获取锁的那个线程其实就是头结点,它是一个空的节点,结合acquireQueued方法,每次获取锁之后,该节点就会升级为头节点,并且变成一个空节点。)

private Node addWaiter(Node mode) {
        // 构造一个节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 获取尾节点
        Node pred = tail;
        // 判断尾节点是否为空
        if (pred != null) {
            // 不为空,设置node节点的上一个节点为pred(也就是尾节点)
            node.prev = pred;
            // cas设置尾节点
            if (compareAndSetTail(pred, node)) {
                // 成功后,设置pred节点的next节点为node,返回
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

acquireQueued()方法,通过上面的addWaiter方法我们已经把这个节点加入同步队列,接下来需要处理这个节点。首先判断自己的前节点是否是头结点,自己是否获取到同步状态,如果满足,把自己设置尾头结点,返回,如果不是,进入shouldParkAfterFailedAcquire(详情见后面方法分析)方法主要作用是判断自己的前置节点是否是SIGNAL状态,是的话自己就可以阻塞自己了,调用parkAndCheckInterrupt(详情见后面方法分析)方法,直到被唤醒或者中断。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前一个节点
                final Node p = node.predecessor();
                // p是头结点,自己获取锁成功
                if (p == head && tryAcquire(arg)) {
                                // 设置自己为头节点,变成一个空节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

                // 找到前节点为signal,然后阻塞自己
                // 清理等待超时或者中断的节点
                // 尝试设置线程的状态为signal
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 出现异常
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire方法,这个方法主要做三件事,判断自己的前置节点是否是SIGNAL,返回true,就可以阻塞了,不是如果状态大于0,证明前面的节点被中断或者超时了,需要从队列清理了,不是大于0,就利用cas设置前置节点为SIGNAL,返回false。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取node前节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            // 如果pred节点释放了状态,会通知自己
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                // 大于0证明,前面的线程等待超时或者已经被中断,需要从节点中移除
                // 需要找到不大于0的那个节点
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 找到小于等于0的前节点,设置为SIGNAL
            // 这个地方ws值只会为PROPAGATE或者0
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt方法,这个方法需要前面的方法返回true才会执行,它会阻塞node的这个线程,返回线程的中断中断状态并清理Thread.interrupted(),所以独占式获取同步状态对中断不响应的。

private final boolean parkAndCheckInterrupt() {
        // 阻塞线程
        LockSupport.park(this);
        return Thread.interrupted();
    }

cancelAcquire方法,在finally块里面,出现异常就会执行这个方法,做一些处理当前node的操作。

// 异常后,finally里面执行的方法
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        // node是为节点,设置尾节点是pred
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            // 不是头结点和尾节点,前节点是SIGNAL
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

附上一张acquire独占式获取同步状态的流程图:
AbstractQueuedSynchroizer(AQS) 同步器详解详解

②、release()独占式释放同步状态,释放线程后唤醒节点。
源代码:
其中tryRelease也需要同步组件自己去实现,语义也被AQS所定义,true代表释放成功,false代表失败,如果为true,就需要决定是否去唤醒节点,首先获取同步队列的头节点,判断头结点不是空,证明有同步对别有节点才需要唤醒,判断头结点不是刚刚初始化,如果是刚刚初始化,就还没有阻塞,请参考acquire的acquireQueued处理节点的逻辑,都为true执行unparkSuccessor方法,false返回。

 public final boolean release(int arg) {
        // 释放锁
        if (tryRelease(arg)) {
            // 获取头结点
            Node h = head;
            // 头结点不为空,证明初始化了
            // 证明头结点不是刚刚创建
            // 那就可以去唤醒头结点或者它的后继节点
                        // 为0就证明没有其他节点了,不需要唤醒
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor()方法,唤醒节点,唤醒的node的后置节点,因为在获取同步状态是我们阻塞的也是后置节点,唤醒后置节点后,会去找到前节点,也就是当前的结点去获取同步状态,然后再把自己变成头结点。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
                // 没有这个节点或者超时或者被中断了,查找一个可以用的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
                // 证明有这个节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

独占式释放同步状态的流程图:
AbstractQueuedSynchroizer(AQS) 同步器详解详解
③、acquireInterruptibly()响应中断的独占式获取同步状态
可以看出如果线程中断立马返回异常,然后再去执行tryAcquire()获取同步状态,获取失败执行doAcquireInterruptibly方法。

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

doAcquireInterruptibly()方法,和acquire的acquireQueued的方法差不多,区别就是在parkAndCheckInterrupt这个方法如果返回true,就会抛异常InterruptedException,说明这个方法响应异常。

private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

④、tryAcquireNanos()带时间的获取同步状态,在时间内获取到,返回true,超时返回false,首先判断线程中断状态,为true就抛异常,为false就尝试获取同步状态tryAcquire,获取失败执行doAcquireNanos方法。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

doAcquireNanos()方法,其实这个都是在acquire方法上的改进,我们看看这个方法,首先算下时间也就是deadline,然后加入同步队列addWaiter方法,然后判断node的前节点是否为头结点,是就尝试获取同步状态,都为true就返回,为false就接着算下时间,判断node前节点是否为SIGNAL,也就是shouldParkAfterFailedAcquire这个方法,为true,线程阻塞计算的时间,然后true(等待阻塞时间到)和false都判断线程中断状态,中断就抛出异常,执行异常方法,不为true,继续循环,直到获取锁或者超时。

 private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2)、共享系列的方法
①、acquireShared共享式获取同步状态,获取失败就加入同步队列
AQS也把语义指定好了,返货负数证明没有了,就执行doAcquireShared方法

 public final void acquireShared(int arg) {
        // 返回负数就证明没有锁了,加入同步队列
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

doAcquireShared()方法,首先构造节点加入队列addWaiter,然后获取node的前节点,判断node的前节点是否为头结点,如果是,获取资源的个数,如果资源大于等于0,调用setHeadAndPropagate方法,然后返回,不满足,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法和独占式一样。

private void doAcquireShared(int arg) {
        // 构建共享节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取node的前节点
                final Node p = node.predecessor();
                // 前节点是否是头节点
                if (p == head) {
                    // 获取锁的个数
                    int r = tryAcquireShared(arg);
                    // 大于等于0,获取锁成功
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 设置头结点,如果有多余资源接着唤醒
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate()方法,设置头结点,设置waitStatus为Propagate,为如果还有资源,唤醒后面的节点,调用doReleaseShared方法(这个方法会在共享式释放同步状态详解)

private void setHeadAndPropagate(Node node, int propagate) {
        // 头结点
        Node h = head; // Record old head for check below
        // 设置头结点为node
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        // 还有资源
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 当前节点的next节点是共享或者没有next节点
            if (s == null || s.isShared())
                // 唤醒后置节点
                doReleaseShared();
        }
    }

②、releaseShared()共享式释放状态
tryReleaseShared是需要同步组件自己去实现,释放成功调用doReleaseShared唤醒节点

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

doReleaseShared()方法,方法有些复杂,不好理解,我们主要来分析三个if的含义,第一个 if (ws == Node.SIGNAL) 表示当前node需要被唤醒,然后后面利用cas设置waitStatus为0,因为是共享模式可能有多个线程同时来释放同步状态,所以只能有一个释放成功,另外一个重试;第二个else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)),其实也是用来处理并发的,当第一次并发失败的线程第二次进入时,可能会看到ws等于0(因为成功的线程设置的),所以利用cas设置为PROPAGATE,表示传递,这里补充一下不管是0或者PROPAGATE,都会被唤醒的线程利用cas设置为SIGNAL(参考shouldParkAfterFailedAcquire方法);第三个(h == head)

  1. head.waitStatus的初始值必然为SIGNAL,因此在并发时,必然只有一个线程A能将等待状态由 SIGNAL CAS更新为 0,
    该线程A会唤醒其他线程B
  2. 被唤醒的线程B会首先执行setHead
    因此如果最后h!=head,说明新一轮的唤醒竞争已经开始,当前线程c已经觉察到,因此继续参与竞争,加快唤醒
    因此如果最后h==head,说明新一轮的唤醒竞争尚未开始,而被唤醒的线程B必然会开启新一轮的唤醒竞争,而当前线程c可以安心退出唤醒竞选

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            // 头结点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 表示后面节点需要唤醒
                if (ws == Node.SIGNAL) {
                    // 多线程控制并发 ,可能存在多个线程同时来修改
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 如果ws等于0,尝试把cas设置waitStatus为PROPAGATE,传递下去
                // 请联系shouldParkAfterFailedAcquire方法一起看
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果头结点没有发生变化,表示设置完成,退出循环
            // 如果发生变化,加入唤醒的过程(加速唤醒,可能存在多个线程在唤醒这些node,速度比一个接一个要快)
            if (h == head)                   // loop if head changed
                break;
        }
    }

    ③、acquireSharedInterruptibly()和tryAcquireSharedNanos()一个响应中断,一个响应中断支持添加获取的超时时间(参考独占模式的这些方法)
    3)、ConditionObject系列方法
    ①、await()方法,类似Object的await方法,阻塞线程释放锁。
    我们可以看见await的第一步是调用addConditionWaiter方法,它的作用是构建等待节点加入队列的尾部,使用的也是AQS的Node,队列里面顺便也会清理清除Node不为CONDITION的节点;第二步需要释放线程获取的同步状态fullyRelease方法;第三步:阻塞线程,找到线程中断时机,也就是调用signal方法的前后顺序;第四步:调用acquireQueued方法处理节点(阻塞还是其它);第五步:清理节点unlinkCancelledWaiters方法(清除Node不为CONDITION的节点);第六步:响应await语义,await阻塞线程时调用interrupt方法会抛异常reportInterruptAfterWait方法。

    public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 加入等待队列,清除节点
            Node node = addConditionWaiter();
            // 释放状态
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                // 阻塞线程
                LockSupport.park(this);
                // 线程是被中断唤醒的
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 加入同步队列
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                // 清理节点
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                // 响应中断  await语义
                reportInterruptAfterWait(interruptMode);
        }

    addConditionWaiter方法每次都是加入队列的尾部

    private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
                private void unlinkCancelledWaiters() {
            // 获取第一个
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                // 获取第一个的下一个
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    // t需要断开连接
                    t.nextWaiter = null;
                    // 第一次trail = null
                    // firstWaiter = next
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

    isOnSyncQueue方法判断node是否在同步队列中

    final boolean isOnSyncQueue(Node node) {
        // 节点状态为CONDITION ,或者node.prev == null 等待节点没有前置节点
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 等待节点没有next节点
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        // 循环查找
        return findNodeFromTail(node);

    checkInterruptWhileWaiting 方法,判断是否是中断唤醒,这方法就是为了确认中断的时机是在signal的前面还是后面signal,因为需要响应中断

    private int checkInterruptWhileWaiting(Node node) {
            // 判断是否是线程中断唤醒
            return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }
                final boolean transferAfterCancelledWait(Node node) {
        // 设置成功表示在signal 执行之前
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        // 设置成功表示在signal 执行之后
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

    acquireQueued和unlinkCancelledWaiters方法前面都介绍过了,一个是加入同步队列,一个是清理节点,介绍下reportInterruptAfterWait方法,它是我为了响应线程Interrupt方法,interruptMode == THROW_IE只在在signal方法后调用Interrupt方法才满足,线程阻塞时调用Interrupt方法会抛异常,这是Object.await里面满足的,请参考checkInterruptWhileWaiting方法里面的transferAfterCancelledWait方法理解其实现。

    private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

    ②、awaitNanos(long nanosTimeout)和awaitUntil(Date deadline)都是提供了超时时间,和await方法类似,只是加入了时间机制。
    ③、awaitUninterruptibly不响应中断方法,发现里面都没有判断是都发生中断的标记,只有调用signal唤醒node,循环才会结束,然后调用acquireQueued处理这个节点(阻塞还是其它)

    public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

    ④、signal方法,唤醒第一个等待队列的node。

    public final void signal() {
            // 判断是否获取锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 初始化
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

    doSignal方法,首先把这个first节点和等待队列断开连接,然后把调用transferForSignal方法把节点从等待队列加入同步队列,唤醒节点的线程,然后被唤醒的线程就会在await方法里面执行acquireQueued这个方法。

    private void doSignal(Node first) {
            do {
                // 队列里面即将没有节点,所以首尾都要为null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 把first 断开连接
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }
                final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    ⑤、signalAll唤醒所有等待队列的节点加入同步队列,并且清空等待队列

    public final void signalAll() {
            // 判断是否获取锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 获取第一个节点
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
                private void doSignalAll(Node first) {
            // 队列设置为null
            lastWaiter = firstWaiter = null;
            // 从首节点开始加入同步队列,知道队列为空
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

    四、总结

    我们学习AQS其实我觉得主要从三个方面,也就是本文的第三部分,从独占式获取和释放同步状态、共享式获取和释放同步状态和ConditionObject里面的等待/通知机制;这里在说一下独占式释放锁和共享式释放锁,独占式因为只会有一个线程获取同步状态,所以释放时也只会有一个,但是在共享这一块,我们在释放同步同步状态时可能会有多个线程同时来释放,可能出现并发的情况,理解doReleaseShared是理解共享式释放的重点;学习获取和释放同步状态,理解同步队列节点的变化是重点;学习等待/通知理解等待队列和同步队列的关系和节点的转换;只有学习好了AQS才能更好的学习后面JUC的那些锁。
    最后感慨下AQS里面的逻辑是真心有些绕,本人有些理解的可能有些不够。
    参考《Java 并发编程的艺术》

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI