AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,JUC中锁的底层是使用AQS实现的。

一、FIFO队列

AQS是一个FIFO(first input first output)队列
FIFO.jpg
线程获取资源发现该资源已经被其他线程持有时,将该线程添加至队列末尾。

二、State变量

AQS中存在一个被volatile修饰的int变量 state,该变量用于标识资源的占用与否。state为0时表示资源处于空闲状态,否则为占用状态。
state的访问方式:

  • int getState();
  • void setState(int newState);
  • boolean compareAndSetState(int expect, int update);

上述三种访问方式均为原子性操作。
源码如下:

private volatile int state;

// 具有内存读可见性语义
protected final int getState() {
    return state;
}

// 具有内存写可见性语义
protected final void setState(int newState) {
    state = newState;
}

// 具有内存读/写可见性语义
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

三、独占模式与共享方式

对于AQS来说,线程操作 state的方式分为独占模式和共享模式。

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
 }

1. EXCLUSIVE

AQS队列中的元素Node内部的EXCLUSIVE用来标记该线程是获取独占资源时被挂起后放入AQS队列中的。
使用独占模式获取的资源是与具体线程绑定的。

2. SHARED

AQS队列中的元素Node内部的EXCLUSIVE用来标记该线程是获取共享资源时被挂起后放入AQS队列中的。
使用共享模式获取的资源是只要当前资源还能满足线程的需要时,当前线程即可通过CAS方式进行获取。

四、获取锁与释放锁源码分析

1. acquire(int arg)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

通过源码可以发现获取锁分以下几步:

  1. 调用 tryAcquire(int arg)方法尝试获取锁;
  2. 如果第一步中的 tryAcquire(int arg)返回 false,则将当前节点添加至队列尾部并标记为独占模式;
  3. 调用 selfInterrupt()中断自己。

2. tryAcquire(int arg)

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

具体操作由AQS的子类去实现。

3. release(int arg)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

通过源码可以看出释放锁的操作分为以下几步:

  1. 调用 tryRelease(int arg)尝试释放锁;
  2. 如果 tryRelease(int arg)返回 true,且AQS队列中还存在其他等待获取锁的线程,则通过 unparkSuccessor(Node node)唤醒该线程,然后返回成功,否则直接返回成功。
  3. 如果 tryRelease(int arg)返回 false,直接返回失败。

4. tryRelease(int arg)

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

具体释放锁的操作由AQS的子类去实现。

5. acquireShared(int arg)

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

acquire(int arg)release(int arg)方法是针对独占模式的线程获取锁和释放锁操作的。
同样的,针对共享模式的线程获取锁的方法为 acquireShared(int arg)

通过源码可以发现共享模式下获取锁的过程分为以下几步:

  1. 调用 tryAcquireShared(int arg)方法尝试获取资源;
  2. 如果第一步中的返回值小于0,说明还有资源可供获取。然后调用 doAcquireShared(int arg)方法获取资源。

6. doAcquireShared(int arg)

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面提到共享模式下真正执行获取资源的操作是在 doAcquireShared(int arg)方法中实现的。
共享模式下执行获取资源的具体过程分为以下几步:

  1. 把当前节点添加至队列尾部,并将其标记为共享模式;
  2. 判断当前节点的前置节点是否为头节点;
  3. 如果第2步中的判断为 true,当前节点再次尝试获取资源(即此处进行自旋锁操作);
  4. 如果第3步中获取资源操作成功,且返回值大于0,说明还有多余的资源可供获取。此时调用 setHeadAndPropagate(Node node, int propagate)方法然后返回;
  5. 如果第2步中的判断为 false,调用 shouldParkAfterFailedAcquire(Node pred, Node node)方法;
  6. 如果第5步中返回 true,调用 parkAndCheckInterrupt()方法;

7. setHeadAndPropagate(Node node, int propagate)

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

setHeadAndPropagate(Node node, int propagate)执行分为以下几步:

  1. 记录当前队列的头节点;
  2. 将当前节点设置为队列头节点;
  3. propagate > 0表示可以唤醒后续节点,waitStatus < 0表示 头节点的后续节点需要被唤醒;
  4. 如果当前节点的后一个节点是共享节点,执行 doReleaseShared()获取资源的操作。

8. shouldParkAfterFailedAcquire(Node pred, Node node)

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

该方法是判断当前节点是否应该在获取资源失败后挂起。具体分为以下几步:

  1. 记录前置节点的 waitStatus
  2. 如果前置节点被设置为获取资源后通知后续节点继续获取资源,则当前节点获取资源失败后应该被挂起,即返回 true
  3. 如果前置节点被设置为取消获取资源,依次往前找节点,直到找到不是被设置为取消获取资源的节点,然后将当前节点设置为该节点的下一个节点。当前节点获取资源失败后不被挂起,即返回 false
  4. 如果前置节点设置为前两种状态之外的其他状态,通过CAS方法将前置节点设置为获取资源后通知后续节点继续获取资源(该操作可能会失败)。当前节点获取资源失败后不被挂起,即返回 false

9. parkAndCheckInterrupt()

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

如果 shouldParkAfterFailedAcquire(Node pred, Node node)返回为 true,当前节点执行该方法。分为以下几步:

  1. 挂起当前节点;
  2. 检查当前节点是否被中断,并返回。

10. cancelAcquire(Node node)

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

关于作者:NekoChips
本文地址:https://chenyangjie.com.cn/articles/2020/01/24/1579867525855.html
版权声明:本篇所有文章仅用于学习和技术交流,本作品采用 BY-NC-SA 4.0 许可协议,如需转载请注明出处!
许可协议:知识共享许可协议