网站建设岗位所需技能seo sem论坛
文章目录
- 1.状态依赖性的管理
- 1.1 示例:将前提条件的失败传递给调用者
- 1.2 示例:通过轮询与休眠来实现简单的阻塞
- 1.3 条件队列
- 2.使用条件队列
- 2.1 条件谓词
- 2.2 过早唤醒
- 2.3 丢失的信号
- 2.4 通知
- 2.5 示例:阀门类
- 2.6 子类的安全问题
- 2.7 入口协议与出口协议
- 3.显式的Condition对象
- 4.Synchronizer剖析
- 5.AbstractQueuedSynchronizer(AQS)
- 6.JUC同步器类中的AQS
- 6.1 ReentrantLock
- 6.2 Semaphore和CountDownLatch
- 6.3 FutureTask
- 6.4 ReentrantReadWriteLock
- 小结
1.状态依赖性的管理
针对缓存类的几种改造
public abstract class BaseBoundedBuffer<V>{private final V[] buf;private int tail;private int head;private int count;protected BaseBoundedBuffer(int capacity) {this.buf = (V[]) new Object[capacity];}protected synchronized final void doPUt(V v) {buf[tail] = v;if (++tail == buf.length) {tail = 0;}++ count;}protected synchronized final V doTake() {V v = buf[head];buf[head] = null;if (++ head == buf.length) {head = 0;}--count;return v;}public synchronized final boolean isFull() {return count == buf.length;}public synchronized final boolean isEmpty() {return count == 0;}
}
1.1 示例:将前提条件的失败传递给调用者
/*** 当不满足前提条件时,有界缓存不会执行相应的操作* 先检查,再运行*/
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V>{protected GrumpyBoundedBuffer(int capacity) {super(capacity);}public synchronized void put(V v) {if (isFull()) {throw new BufferOverflowException();}doPUt(v);}public synchronized V take() {if (isEmpty()) throw new BufferUnderflowException();return doTake();}
}
异常应该用于发生异常条件的情况中。缓存已满或缓存为空并不是一个异常条件,而且调用者必须捕获异常进一步处理,不太好。
/*** 对上面代码的引用,捕获异常后重试,称为自旋等待或忙等待* 不推荐*/public void test() {GrumpyBoundedBuffer<V> buffer = new GrumpyBoundedBuffer<>(20);while (true) {try {V item = buffer.take();// 对于item执行一些操作break;} catch (BufferUnderflowException e){try {Thread.sleep(2000);} catch (InterruptedException ex) {}}}}
1.2 示例:通过轮询与休眠来实现简单的阻塞
采用简单的轮询与休眠重试机制改造
/*** put如果缓存满了,会阻塞其他put操作,take同理* 每次循环休眠一段时间,减轻cpu压力* 休眠时间越少,响应性就越高,cpu压力也就越高,所以需要权衡*/
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V>{protected SleepyBoundedBuffer(int capacity) {super(capacity);}public void put(V v) throws InterruptedException {while (true) {synchronized (this) {if (!isFull()) {doPUt(v);return;}}Thread.sleep(3000);}}public V take() throws InterruptedException {while (true) {synchronized (this) {if (!isEmpty()) {return doTake();}}Thread.sleep(3000);}}
}
当轮询时,容易发生不必要的等待时间,例如刚进入轮询,锁释放了,此时还需要等休眠完才能持有锁。
1.3 条件队列
Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当贝挂起的线程醒来时,它将在返回之前从新获取锁。从直观上来理解,调用wait意味着“我要去休息了,但当发生特定的事情时唤醒我”,而调用通知方法就意味着“特定的事情发生了”。
public class BoundedBuffer <V> extends BaseBoundedBuffer<V>{protected BoundedBuffer(int capacity) {super(capacity);}//条件谓词: not-full (!isFull())//条件谓词: not-empty(!isEmpty())//阻塞,并直到: not-fullpublic synchronized void put(V v) throws InterruptedException {while (isFull()) {wait();}doPUt(v);notifyAll();}//阻塞,并直到: not-emptypublic synchronized V take() throws InterruptedException {while (isEmpty()) {wait();}V v = doTake();notifyAll();return v;}
}
推荐使用,在正式版本中,还应该包括限时版本的put和take,这样当阻塞操作不能在预计时间内完成时,可以因超时而返回。通过使用定时版本的Object.wait(),可以很容易实现这些方法。
2.使用条件队列
2.1 条件谓词
没有条件谓词,条件等待机制将无法发挥作用
有界缓存中,只有队列不为空时,take才能成功,反之阻塞。那么“队列不为空”就是它的条件谓词。
将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。
条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(调用wait和notify等方法所在的对象)必须是同一个对象。
每一次wait调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。
2.2 过早唤醒
wait方法的返回并不一定意味着线程正在等待的条件谓词已经变成真了。
线程被唤醒到重新获取锁的这段时间,可能有其他线程已经修改了条件谓词
每当线程从wait中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待。
条件等待的标准形式
void stateDependentMethod() throws InterruptedException {//必须通过一个锁来保护条件谓词synchronized (lock) {while (!conditionPredicate()) {lock.wait();//现在对象处于合适的状态}}}
使用条件等待时(例如Object.wait()或Condition.wait()):
- 通常都有一个条件谓词–包括一些对象状态测试,线程在执行前必须首先通过这些测试。
- 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
- 在一个循环中调用wait。
- 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
- 当调用wait、notify、或notifyAll等方法时,一定要持有与条件队列相关的锁。
- 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。
2.3 丢失的信号
活跃性故障包括死锁和活锁,另一种形式的活跃性故障是丢失的信号量。
丢失的信号是指:线程必须等待一个已为真的条件,但在开始等待之前没有检查条件谓词。
线程A通知了一个条件队列,而线程B随后在这个条件队列上等待,那么线程B将不会立即醒来,而是需要另一个线程去唤醒它。
如果像上面代码那样,循环中检查条件谓词,就不会发生丢失信号。
2.4 通知
上面当调用take时,如果缓存为空,将阻塞。当缓存变为非空时,为了使take解除阻塞,必须确保在每条使缓存变为非空的代码中都发出一个通知。上面put在成功放一个元素后,将调用notifyAll。take同理
每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。
无论是notify或notifyAll,都必须持有与条件队列对象相关联的锁。notify会在这个条件队列上等待的多个线程中选择一个来唤醒,notifyAll会唤醒所有在这个条件队列上的线程。
发出通知的线程应该尽快的释放锁,从而确保正在等待的线程尽可能快的解除阻塞。
多个线程可以基于不同的条件谓词在同一个条件队列上等待,因此如果使用notify而不是notifyAll,将是一种危险的操作,因为单一的通知很容易导致类似于信号丢失的问题。
BoundedBuffer中,条件队列用于两个不同的条件谓词:“非空”和“非满”。如线程A正在等待条件谓词PA,线程B正在等待PB,假设PB变成真,此时线程C调用notify,选择了唤醒线程A,A的条件谓词依然为假,而本应该为真的线程B,还在等待被唤醒。
只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:
- 所有等待的线程类型都相同:只有一个条件谓词与条件队列相关,并且每个线程在从wait返回后将执行相同的操作。
- 单进单出:在条件变量上的每次通知,最多只能唤醒一个线程来执行。
改进:
public synchronized void put(V v) throws InterruptedException {while (isFull()) {wait();}boolean wasEmpty = isEmpty();doPUt(v);if (wasEmpty) notifyAll();}
只有当put或take影响到这些状态转换时,才唤醒。
首先要保证程序正确运行,其次再考虑性能问题。
2.5 示例:阀门类
实现可重新关闭的阀门
public class ThreadGate {//条件谓词: opened-since(n) (isOpen || generation>n)private boolean isOpen;private int generation;public synchronized void close() {isOpen = false;}public synchronized void open() {++generation;isOpen = true;notifyAll();}//阻塞并直到:opened-since(generation on entity)public synchronized void await() throws InterruptedException {int arrivalGeneration = generation;/*** 当阀门打开时,有N个线程正在等待它,那么这些线程都应该被允许执行。* 然而如果阀门在打开后又非常快速的关闭了,并且await方法只检查isOpen,那么所有的线程可能都无法释放:当所有线程收到通知时* 将重新请求锁并推出wait,而此时阀门可能已经再次关闭了。* 每次阀门关闭时,递增一个“Generation”计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过await。*/while (!isOpen && arrivalGeneration == generation) {wait();}}
}
2.6 子类的安全问题
想要支持子类化,设计类时需要保证:如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。
对于状态依赖的类,要么将其等待和通知完全想子类公开,要么完全阻止子类参与到等待和通知等过程中。
2.7 入口协议与出口协议
每个以来状态的操作,以及每个修改其他操作以来状态的操作,都应该定义一个入口协议和出口协议。
入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认他们是否使某个其他的条件谓词变为真,如果是则通知相关的条件队列。
3.显式的Condition对象
Condition是一种广义的内置条件队列。
内置条件队列存在一些缺陷。每个内置锁都只能有一个相关联的条件队列,多个线程可能在同一个条件队列上等待不同的条件谓词(如BoundedBuffer中的put和take中的两个条件谓词),并且在最常见的加锁模式下公开条件队列对象。
如果想要编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显式的Lock和Condition而不是内置锁和条件队列,这是一种更灵活的选择。
一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。
要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。
Condition:在每个锁上可存在多个等待、条件等待可以是可中断的、基于时限的等待,以及公平或非公平的队列操作。
对于每个Lock,可以有任意数量的Condition对象。对于公平锁,线程会依照FIFO顺序从Condition.await中释放。
特别注意:在Condition对象中,与await、notify和notifyAll方法对应的分别是await、signal、signalAll。但是,Condition对Object进行了扩展,因而它也包含wait和notify方法。一定要确保使用正确的版本–await和signal。
public class ConditionBoundedBuffer <T>{protected final Lock lock = new ReentrantLock();private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();private final T[] items = (T[]) new Object[20];private int tail ,head ,count;//阻塞直到notFullpublic void put(T x) throws InterruptedException {lock.lock();try {while (count == items.length) {notFull.await();}items[tail] = x;if (++ tail == items.length) {tail =0;}++ count;notEmpty.signal();} finally {lock.unlock();}}//阻塞直到 notEmptypublic T take() throws InterruptedException {lock.lock();try {while (count == 0) {notEmpty.await();}T x = items[head];items[head] = null;if (++head == items.length) {head = 0;}--count;notFull.signal();return x;} finally {lock.unlock();}}
}
4.Synchronizer剖析
ReentrantLock和Semaphore有很多共同点,也可以通过ReentrantLock来实现Semaphore相同的功能。
public class SemaphoreOnLock {private final Lock lock = new ReentrantLock();private final Condition permitsAvailable = lock.newCondition();private int permits;public SemaphoreOnLock(int initialPermits) {lock.lock();try {permits = initialPermits;} finally {lock.unlock();}}public void acquire() throws InterruptedException {lock.lock();try {while (permits < 0) {permitsAvailable.await();}--permits;} finally {lock.unlock();}}public void release() {lock.lock();try {++permits;permitsAvailable.signal();} finally {lock.unlock();}}
}
因为它们有一个共同的基类AbstractQueuedSynchronizer
(AQS),这个类也是其它许多同步类的基类。
AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。
5.AbstractQueuedSynchronizer(AQS)
基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。
- 获取操作:获取的是锁或者许可,调用者可能会一直等待直到同步器类处于可悲获取的状态。CounDownLatch获取操作意味着“等待并直到闭锁到达结束状态”;FutureTask意味着“等待并直到任务已经完成”。
- 释放操作:并不是一个可阻塞的操作,执行释放操作时,所有在请求时被阻塞的线程都会开始执行。
如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState以及compareAndSetState等proteced类型方法来进行操作。这个整数可以表示任意状态,如ReentrantLock用它来表示所有者线程已经重复获得该锁的次数,Semaphore表示剩余许可数量,FutureTask表示任务状态(尚未开始、正在运行、已经完成以及已取消)
AQS中获取操作和释放操作的标准形式
boolean acquire() throws InterruptedException {while (当前状态不允许获取操作) {if (需要阻塞获取请求) {如果当前线程不在队列中,则将其插入队列阻塞当前线程} else {返回失败}}可能更新同步器的状态如果线程位于队列中,则将其移出队列返回成功
}
void release() {更新同步器状态if (新的状态允许某个被阻塞的线程获取成功) {解除队列中一个或多个线程的阻塞状态}
}
如果某个同步器支持独占的获取操作,那么需要一些保护方法,包括 tryAcquire、tryRelease和isHeldExclusively等
对于支持共享获取的同步器,应该实现tryAcquireShared和tryReleaseShared等方法。
AQS中acquire、acquireShared、release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。
子类中可以根据获取操作和释放操作的语义使用getState、setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告知基类“获取”或“释放”同步器的操作是否成功。例如,tryAcquireShared返回一个负值,表示获取操作失败,返回零值表示同步器通过独占方式被获取,返回正值表示同步器通过非独占方式被获取。
AQS提供了一些机制来构造与同步器相关联的条件变量
一个简单的闭锁
public class OneShotLatch {private final Sync sync = new Sync();public void signal() {/*** public final boolean releaseShared(int arg) {* if (tryReleaseShared(arg)) {* doReleaseShared();* return true;* }* return false;* }*/sync.releaseShared(0);}public void await() throws InterruptedException {/*** 接着会执行Sync中的tryAcquireShared* public final void acquireSharedInterruptibly(int arg)* throws InterruptedException {* if (Thread.interrupted())* throw new InterruptedException();* if (tryAcquireShared(arg) < 0)* doAcquireSharedInterruptibly(arg);* }* 该方法处理失败的方式是吧这个线程放入等待线程队列中*/sync.acquireSharedInterruptibly(0);}private class Sync extends AbstractQueuedSynchronizer {@Overrideprotected int tryAcquireShared(int arg) {//如果闭锁是开的,那么这个操作将成功,否则失败return (getState() == 1) ? 1 : -1;}@Overrideprotected boolean tryReleaseShared(int arg) {//打开闭锁setState(1);//其他线程可以获得该闭锁return true;}}
}
JUC中的所有同步器类都没有直接扩展AQS,而是将他们的相应功能委托给私有的AQS来实现。
6.JUC同步器类中的AQS
6.1 ReentrantLock
只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively
ReentrantLock中非公平锁tryAcquire实现
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//如果锁未被持有,将尝试更新锁的状态以表示锁已经被持有if (c == 0) {//由于状态有可能在检查后被立即修改,因此tryAcquire使用CAS来原子地更新状态,便是这个锁已经被占有并确保状态在最后一次检查以后就没有被修改过。if (compareAndSetState(0, acquires)) {//维护了一个owner变量,只有当线程刚刚获得锁//或者正要释放锁时,才会修改这个变量。setExclusiveOwnerThread(current);return true;}}//区分操作是重入的还是竞争的//如果锁状态表明它已经被当前线程持有,那么获取计数会增加else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}//当前线程没有持有锁,失败return false;}
6.2 Semaphore和CountDownLatch
Semaphore将AQS的同步状态用于保存当前可用许可的数量。
final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();//计算剩余许可的数量int remaining = available - acquires;//如果还有剩余许可,通过CAS以原子方式来降低许可的计数。//当没有足够的许可,或CAS更新许可的计数以响应操作时,循环将终止if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();//增加许可计数int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
CountDownLatch和Semaphore的实现很类似:同步状态中保存的是当前的计数值。countDown方法调用release,导致计数值递减,并且当计数值为0时解除所有等待线程的阻塞。await调用acquire,当计数器为0时acquire将立即返回,否则将阻塞。
6.3 FutureTask
Future.get语义类似于闭锁的语义–如果发生了某个事件,那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
AQS同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。
FutureTask还保存额外的状态变量,用来保存计算结果或者抛出异常。
还维护了一个引用,指向正在执行计算任务的线程,如果任务取消,该线程就会中断。
6.4 ReentrantReadWriteLock
基于AQS实现的ReentrantReadWriteLock,单个AQS子类将同时管理读取加锁和写入加锁。
ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。
读取锁上的操作将使用共享的获取方法与释放方法,写入锁上的操作将使用独占的获取方法与释放方法。
AQS在内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。
当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。
小结
当现有库类不能提供足够的功能,可以使用内置条件队列、显式的Condition对象或者AQS来构建自己的同步器。内置条件队列与内置锁是紧密绑定在一起的。显式的Condition与显式的Lock也是紧密绑定一起的,并且与内置条件队列相比,还提供了一个扩展的功能集,包括每个锁对应于多个等待线程集,可中断或不可中断的条件等待,公平或非公平的队列操作,以及基于时限的等待。