免费成品网站模板下载,重庆招工招聘信息查询,为何用wdcp建立网站连不上ftp,自我介绍的网页目录
队列同步器#xff08;AQS#xff09;
独占锁示例
AQS之同步队列结构
解析AQS实现 队列同步器#xff08;AQS#xff09;
队列同步器AbstractQueuedSynchronizer#xff08;以下简称同步器#xff09;#xff0c;是用来构建锁或者其他同步组 件的基础框架AQS
独占锁示例
AQS之同步队列结构
解析AQS实现 队列同步器AQS
队列同步器AbstractQueuedSynchronizer以下简称同步器是用来构建锁或者其他同步组 件的基础框架它使用了一个int成员变量表示同步状态通过内置的FIFO队列来完成资源获 取线程的排队工作并发包的作者Doug Lea期望它能够成为实现大部分同步需求的基础。 同步器的主要使用方式是继承子类通过继承同步器并实现它的抽象方法来管理同步状态。 使用同步器提供的3 个方法getState()、setState(int newState)和compareAndSetState(int expect,int update)对同步状态进行更改因为它们能够保证状态的改变是安全的。
同步器是实现锁也可以是任意同步组件的关键在锁的实现中聚合同步器利用同步器实现锁的语义。可以这样理解二者之间的关系锁是面向使用者的它定义了使用者与锁交 互的接口比如可以允许两个线程并行访问隐藏了实现细节同步器面向的是锁的实现者 它简化了锁的实现方式屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
独占锁示例
Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程访问控制的通过独占锁了解下队列同步器(AQS)。
public class Mutex implements Lock {
// 子类推荐被定义为自定义同步组件的静态内部类// 同步器自身没有实现任何同步接口它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用private static class Sync extends AbstractQueuedSynchronizer {// 是否处于独占状态protected boolean isHeldExclusively() {return getState() 1;}// 当状态为0的时候获取锁public boolean tryAcquire(int acquire) {if (compareAndSetState(0, 1)) {return true;}return false;}// 释放锁将状态设置为0protected boolean tryRelease(int release) {if (getState() 0) {throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}// 返回一个Condition每一个condition都包含了一个condition队列Condition newCondition() {return new ConditionObject();}}
// 通过Sync进行代理操作,实现Lock接口的APIprivate final Sync sync new Sync();
// 获取锁Overridepublic void lock() {sync.acquire(1);}
// 可中断地获取锁Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}
// 尝试非阻塞的获取锁Overridepublic boolean tryLock() {return sync.tryAcquire(1);}
/*超时的获取锁当前线程在以下3种情况下会返回1.当线程在超时时间获得了锁2.当线程在超时时间被中断3.超时时间结束返回false*/Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}
// 释放锁Overridepublic void unlock() {sync.release(1);}
/*获取等待通知组件该组件和当前的锁绑定当前线程只有获得了锁才能调用该组件的wait()方法而调用用当前线程将释放锁*/Overridepublic Condition newCondition() {return sync.newCondition();}
}
AQS之同步队列结构
同步器依赖内部的同步队列一个FIFO双向队列来完成同步状态的管理当前线程获取同步状态失败时同步器会将当前线程以及等待状态等信息构造成为一个节点Node并将其 加入同步队列同时会阻塞当前线程当同步状态释放时会把首节点中的线程唤醒使其再次尝试获取同步状态。
节点属性
属性类型与名称描述int waitStatus等待状态包含如下状态 CANCELLED值为1表示节点已取消通常是因为线程被中断或者等待超时而被取消。 SIGNAL值为-1表示后继节点需要被唤醒即当前节点的释放signal会通知后继节点继续尝试获取锁或资源。 CONDITION值为-2表示节点在等待队列中节点线程等待在Condition上当其他线程对Condition调用了signal()方法后该节点将会从等待队列中转移到同步队列中加入到对同步状态的获取中 PROPAGATE值为-3表示释放共享锁时需要向后继节点传播共享性质以确保后继节点可以被唤醒。这在CountDownLatch等场景中会使用到。 INITIAL值为0初始状态。Node prev前驱节点当节点加入同步队列时被设置尾部添加Node next后继节点Node nextWaiter等待队列中的后继节点。如果当前节点是共享的那么这个字段将是一个SHARED常量也就是说节点类型独占和共享和等待队列中的后继节点共用同一个字段Thread thread获取同步状态的线程
节点是构成同步队列的基础同步器拥有首节点(head)和尾结点(tail)没有成功获取同步状态的线程会成为节点加入该队列的尾部。 当一个线程成功地获取了同步状态或者锁其他线程将无法获取到同步状态转而被构造成为节点并加入到同步队列中而这个加入队列的过程必须要保证线程安全因此同步器提供了一个基于CAS的设置尾节点的方法compareAndSetTail(Node expect,Node update)它需要传递当前线程“认为”的尾节点和当前节点只有设置成功后当前节点才正式与之前的尾节点建立关联。 首节点是获取同步状态成功的节点首节点的线程在释放同步状态时将会唤醒后继节点而后继节点将会在获取同步状态成功时将自己设置为首节点设置首节点是通过获取同步状态成功的线程来完成的由于只有一个线程能够成功获取到同步状态因此设置头节点的方法并不需要使用CAS来保证它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可 解析AQS实现
以ReentrantLock的非公平锁为例看看lock的实现。 ReentrantLock.lock()—获取锁的入口 public void lock() {sync.lock();} sync 实际上是一个抽象的静态内部类它继承了 AQS 来实现重入锁的逻辑。 Sync 有两个具体的实现类分别是 NofairSync表示可以存在抢占锁的功能也就是说不管当前队列上是否存在其他 线程等待新线程都有机会抢占锁 FailSync: 表示所有线程严格按照 FIFO 来获取锁。 ReentrantLock的无参构造函数默认创建的是非公平锁。 public ReentrantLock() {sync new NonfairSync();} NonfairSync.lock()—获取同步状态/锁。 static final class NonfairSync extends Sync {private static final long serialVersionUID 7316153563782823691L;
/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}} 非公平锁的特点抢占锁的逻辑不管有没有线程排队上来先CAS抢占一下。 CAS成功表示成功获得锁。 CAS失败调用获取独占锁acquire()走锁竞争逻辑。 AQS.acquire(1)—尝试获取独占锁or加入同步队列自旋获取锁。 public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();} 通过 tryAcquire 尝试获取独占锁如果成功返回 true失败返回 false 如果 tryAcquire 失败则会通过 addWaiter 方法将当前线程封装成 Node 添加 到 AQS 队列尾部。 acquireQueued()将 Node 作为参数通过自旋去尝试获取锁。 NonfairSync.tryAcquire(1)—尝试获取独占锁 protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);} 它是重写 AQS 类中的 tryAcquire 方法 ReentrantLock.nofairTryAcquire(1)—尝试获取独占锁 final boolean nonfairTryAcquire(int acquires) {final Thread current Thread.currentThread(); // 获取当前执行的线程int c getState(); // 获取state的值if (c 0) { // 表示无锁状态if (compareAndSetState(0, acquires)) { // CAS替换state的值case成功表示获取锁成功setExclusiveOwnerThread(current); // 保存当前获得锁的线程下次再来的时候不用尝试竞争锁return true;}}else if (current getExclusiveOwnerThread()) { // 如果同一线程竞争锁直接增加重入次数int nextc c acquires;if (nextc 0) // overflowthrow new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;} 获取当前线程判断当前的锁的状态 如果 state0 表示当前是无锁状态通过 cas 更新 state 状态的值 当前线程是属于重入则增加重入次数 AQS.addWaiter(Node.EXCLUSIVE) —线程构造成节点加入同步队列 (static final Node EXCLUSIVE null;) private Node addWaiter(Node mode) {Node node new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred tail;if (pred ! null) {node.prev pred;if (compareAndSetTail(pred, node)) {pred.next node;return node;}}enq(node);return node;} 当 tryAcquire 方法获取锁失败以后则会先调用 addWaiter 将当前线程封装成 Node. 入参 mode 表示当前节点的状态传递的参数是 Node.EXCLUSIVE表示独占状 态。意味着重入锁用到了 AQS 的独占锁功能 将当前线程封装成 Node 当前链表中的 tail 节点是否为空如果不为空则通过 cas 操作把当前线程的 node 添加到 AQS 队列 如果为空或者 cas 失败调用 enq 将节点添加到 AQS 队列 enq(node)—通过自旋把当前节点加入到队列中 private Node enq(final Node node) {for (;;) {Node t tail;if (t null) { // Must initializeif (compareAndSetHead(new Node()))tail head;} else {node.prev t;if (compareAndSetTail(t, node)) {t.next node;return t;}}}} 图解分析 8.AQS.acquireQueued(node, 1)—把node加入到链表去争抢锁 获取当前节点的 prev 节点 如果 prev 节点为 head 节点那么它就有资格去争抢锁调用 tryAcquire 抢占锁 抢占锁成功以后把获得锁的节点设置为 head并且移除原来的初始化 head 节点 如果获得锁失败则根据 waitStatus 决定是否需要挂起线程 最后通过 cancelAcquire 取消获得锁的操作 final boolean acquireQueued(final Node node, int arg) {boolean failed true;try {boolean interrupted false;for (;;) {final Node p node.predecessor(); // 获取当前节点的prev节点if (p head tryAcquire(arg)) { // 如果是head节点说明有资格去争抢锁setHead(node); // 获取锁成功也就是ThreadA已经释放了锁然后设置head为ThreadB获得执行权限p.next null; // help GCfailed false;return interrupted;}// ThreadA可能还没释放锁使得ThreadB在执行tryAcquire返回falseif (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt())interrupted true;}} finally {if (failed)cancelAcquire(node);}} 9.shouldParkAfterFailedAcquire—竞争锁失败后应该挂起 这个方法的主要作用是通过 Node 的状态来判断ThreadA 竞争锁失败以后是 否应该被挂起。 如果 ThreadA 的 pred 节点状态为 SIGNAL那就表示可以放心挂起当前线程 通过循环扫描链表把 CANCELLED 状态的节点移除 修改 pred 节点的状态为 SIGNAL,返回 false. 返回 false 时也就是不需要挂起返回 true则需要调用 parkAndCheckInterrupt 挂起当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // pred是前置节点int ws pred.waitStatus; // 前置节点的waitStatusif (ws Node.SIGNAL) // 如果前置节点为 SIGNAL意味着只需要等待其前置节点的线程被释放return true;if (ws 0) { // ws大于 0意味着prev节点取消了排队直接移除这个节点就行do {node.prev pred pred.prev;} while (pred.waitStatus 0);pred.next node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 利用cas设置prev节点的状态为SIGNAL(-1)}return false;} 图解分析 waitStatus -1SIGNAL值为-1表示后继节点需要被唤醒即当前节点的释放会通知后继节点继续尝试获取锁或资源。 10.parkAndCheckInterrupt Thread.interrupted返回当前线程是否被其他线程触发过中断请求也就是thread.interrupt(); 如果有触发过中断请求那么这个方法会返回当前的中断标识 true并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true意味着在acquire方法中会执行 selfInterrupt()。 private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); // 1.中断 2.复位} selfInterrupt 标识如果当前线程在 acquireQueued 中被中断过则需要产生一 个中断请求原因是线程在调用 acquireQueued 方法的时候是不会响应中断请求的。 11.ReentrantLock.unlock()—锁释放 public void unlock() {sync.release(1);} public final boolean release(int arg) {if (tryRelease(arg)) { // 释放锁成功Node h head; // 获取aqs中的head节点if (h ! null h.waitStatus ! 0) // 如果head节点不为空且状态0.调用unparkSuccessor(h)唤醒后续节点unparkSuccessor(h);return true;}return false;} 12.ReentrantLock.tryRelease()—设置锁状态 这个方法可以认为是一个设置锁状态的操作通过将 state 状态减掉传入的参数值 参数是 1如果结果状态为 0就将排它锁的 Owner 设置为 null以使得其它 的线程有机会进行执行。 在排它锁中加锁的时候状态会增加 1当然可以自己修改这个值在解锁的时 候减掉 1同一个锁在可以重入后可能会被叠加为 2、3、4 这些值只有 unlock() 的次数与 lock()的次数对应才会将 Owner 线程设置为空而且也只有这种情况下才会返回 true. protected final boolean tryRelease(int releases) {int c getState() - releases;if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;if (c 0) {free true;setExclusiveOwnerThread(null);}setState(c);return free;} 13.AQS.unparkSuccessor()—唤醒后续节点 private void unparkSuccessor(Node node) {int ws node.waitStatus; // 获取head节点的状态if (ws 0)compareAndSetWaitStatus(node, ws, 0); // 设置head节点状态为0Node s node.next; // 得到head节点的下一个节点//如果下一个节点为 null 或者 status0 表示 cancelled 状态.if (s null || s.waitStatus 0) { s null;//通过从尾部节点开始扫描找到距离head最近的一个waitStatus0 的节点for (Node t tail; t ! null t ! node; t t.prev)if (t.waitStatus 0)s t;}if (s ! null) // next节点不为空直接唤醒这个线程即可LockSupport.unpark(s.thread);}