并发测试的分类
安全性测试(不发生任何错误的行为)
指标
不变性条件
后验条件
活跃度测试(某个良好的行为终究会发生)
指标
吞吐量:指一组并发任务中已经完成任务所占的比例
响应性:指请求从出发到完成之间的时间(也称延迟)
可伸缩性:指在增加更多资源(CPU)的情况下,吞吐量的提升情况
测试用例
public class BoundedBuffer<E> {
private final Semaphore availableItems, availableSpaces;
private final E[] items;
private int putPosition = 0, takePosition = 0;
public BoundedBuffer(int capacity) {
availableItems = new Semaphore(0);
availableSpaces = new Semaphore(capacity);
items = (E[]) new Object[capacity];
}
public boolean isEmpty() {
return availableItems.availablePermits() == 0;
}
public boolean isFull() {
return availableSpaces.availablePermits() == 0;
}
public void put(E x) throws InterruptedException {
availableSpaces.acquire();
doInsert(x);
availableItems.release();
}
public E take() throws InterruptedException {
availableItems.acquire();
E item = doExtract();
availableSpaces.release();
return item;
}
private synchronized void doInsert(E x) {
int i = putPosition;
items[i] = x;
putPosition = (++i == items.length) ? 0 : i;
}
private synchronized E doExtract() {
int i = takePosition;
E x = items[i];
items[i] = null;
takePosition = (++i == items.length) ? 0 : i;
return x;
}
}
正确性测试
不变性条件:元素个数与剩余空间之和为数组大小
后验条件:数组为空,take操作阻塞;数组为满,put操作阻塞
单元测试
public class BoundedBufferTest extends TestCase {
public void testIsEmptyWhenConstructed() {
BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
assertTrue(bb.isEmpty());
assertFalse(bb.isFull());
}
public void testIsFullAfterPuts() throws InterruptedException {
BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
for (int i = 0; i < 10; i++)
bb.put(i);
assertTrue(bb.isFull());
assertFalse(bb.isEmpty());
}
}
对阻塞操作的测试
public void testTakeBlocksWhenEmpty() {
final BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
Thread taker = new Thread() {
public void run() {
try {
int unused = bb.take();
fail();
} catch (InterruptedException success) { }
}};
try {
taker.start();
Thread.sleep(1);
taker.interrupt();
taker.join(10);
assertFalse(taker.isAlive());
} catch (Exception unexpected) {
fail();
}
}
安全性测试
在构建对并发类的安全性测试中,需要解决的关键问题在于,要找出那些容易检查的属性,这些属性在发生错误的情况下极有可能失败,同时又不会使得错误检查代码人为地限制并发性。理想的情况是,在测试属性中不需要任何同步机制。
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors
.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final BoundedBuffer<Integer> bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // sample parameters
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new BoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await();
barrier.await();
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
class Producer implements Runnable {
public void run() {
try {
barrier.await();
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
}
testLeak方法将多个大型对象插入到一个有界缓存中,然后将它们移除。第2个堆快照中的内存用量应该与第1个堆快照中的内存用量基本相同。然而,doExtract如果忘记将返回元素的引用置为空(items[i] = null),那么在两次快照中报告的内存用量将明显不同。(这是为数不多几种需要显式地将变量置空的情况之一。大多数情况下,这种做法不仅不会带来帮助,甚至还会带来负面作用。)
使用回调
class TestingThreadFactory implements ThreadFactory {
public final AtomicInteger numCreated = new AtomicInteger();
private final ThreadFactory factory
= Executors.defaultThreadFactory();
public Thread newThread(Runnable r) {
numCreated.incrementAndGet();
return factory.newThread(r);
}
}
public class TestThreadPool extends TestCase {
private final TestingThreadFactory threadFactory = new TestingThreadFactory();
public void testPoolExpansion() throws InterruptedException {
int MAX_SIZE = 10;
ExecutorService exec = Executors.newFixedThreadPool(MAX_SIZE);
for (int i = 0; i < 10 * MAX_SIZE; i++)
exec.execute(new Runnable() {
public void run() {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
for (int i = 0;
i < 20 && threadFactory.numCreated.get() < MAX_SIZE;
i++)
Thread.sleep(100);
assertEquals(threadFactory.numCreated.get(), MAX_SIZE);
exec.shutdownNow();
}
}
产生更多的交替操作
public synchronized void transferCredits(Account from,
Account to,
int amount) {
from.setBalance(from.getBalance() - amount);
if (random.nextInt(1000) > THRESHOLD)
Thread.yield();//切换到另一线程
to.setBalance(to.getBalance() + amount);
}
性能测试
性能测试通常是功能测试的延伸。事实上,在性能测试中应该包含一些基本的功能测试,从而确保不会对错误地代码进行性能测试。 虽然在性能测试与功能测试之间肯定会存在重叠之处,但它们的目标是不同的。性能测试将衡量典型测试用例中的端到端性能。通常,要获得一组合理地使用场景并不容易,理想情况下,在测试中应该反映被测试对象在应用程序中的实际用法。
在PutTakeTest中增加计时功能
public class TimedPutTakeTest extends PutTakeTest {
private BarrierTimer timer = new BarrierTimer();
public TimedPutTakeTest(int cap, int pairs, int trials) {
super(cap, pairs, trials);
barrier = new CyclicBarrier(nPairs * 2 + 1, timer);
}
public void test() {
try {
timer.clear();
for (int i = 0; i < nPairs; i++) {
pool.execute(new PutTakeTest.Producer());
pool.execute(new PutTakeTest.Consumer());
}
barrier.await();//等待所有线程都准备好后开始往下执行
barrier.await();//等待所有线都执行完后开始往下执行
//每个元素完成处理所需要的时间
long nsPerItem = timer.getTime() / (nPairs * (long) nTrials);
System.out.print("Throughput: " + nsPerItem + " ns/item");
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws Exception {
int tpt = 100000; // 每对线程(生产-消费)需处理的元素个数
//测试缓存容量分别为1、10、100、1000的情况
for (int cap = 1; cap <= 1000; cap *= 10) {
System.out.println("Capacity: " + cap);
//测试工作线程数1、2、4、8、16、32、64、128的情况
for (int pairs = 1; pairs <= 128; pairs *= 2) {
TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt);
System.out.print("Pairs: " + pairs + "\t");
//测试两次
t.test();//第一次
System.out.print("\t");
Thread.sleep(1000);
t.test();//第二次
System.out.println();
Thread.sleep(1000);
}
}
PutTakeTest.pool.shutdown();
}
//关卡动作,在最后一个线程达到后执行。在该测试中会执行两次:
//一次是执行任务前,二是所有任务都执行完后
static class BarrierTimer implements Runnable {
private boolean started;//是否是第一次执行关卡活动
private long startTime, endTime;
public synchronized void run() {
long t = System.nanoTime();
if (!started) {//第一次关卡活动走该分支
started = true;
startTime = t;
} else
//第二次关卡活动走该分支
endTime = t;
}
public synchronized void clear() {
started = false;
}
public synchronized long getTime() {//任务所耗时间
return endTime - startTime;
}
}
}
多种算法的比较
BoundedBuffer性能不高的主要原因:put和take操作分别都有多个操作可能遇到竞争——获取一个信号量,获取一个锁、释放信号量 在测试的过程中发现LinkedBlockingQueue的伸缩性好于ArrayBlockingQueue,这主要是因为链接队列的put和take操作允许有比基于数组的队列更好的并发访问,好的链接队列算法允许队列的头和尾彼此独立地更新。LinkedBlockingQueue中好的并发算法抵消了创建节点元素的开销,那么这种算法通常具有更高的可伸缩性。这似乎与传统性能调优相违背
响应性衡量
所谓的公平信号量是获得锁的顺序与线程启动顺序有关,但不代表100%地获得信号量,仅仅是在概率上能得到保证。而非公平信号量就是无关的了。
缓存过小,将导致非常多的上下文切换次数,这即是在非公平模式中也会导致很低的吞吐量。
因此,***除非线程由于密集的同步需求而被持续地阻塞,否则非公平的信号量通常能实现更好的吞吐量,而公平的信号量则实现更低的变动性。***因为这些结果之间的差异非常大,所以Semaphore要求客户选择针对哪个特性进行优化。
避免性能测试的陷阱
垃圾回收
由于执行时序无法预测,因此增长测试时间,可以引发多次垃圾回收,从而得到更精确的结果。
动态编译
由于编译的执行时机无法预测,增长程序运行时间,这样编译过程以及解释执行总是总运行时间的很小一部分。还可以,放弃程序预热阶段,只测试编译后的程序。
对代码路径的不真实采样
不真实的竞争程度
无用代码的消除
其他的测试方法
代码审查
静态分析工具
不一致的同步
调用Thread.run
未被释放的锁
空的同步块
双重检查加锁
在构造函数中启动另一个线程
通知错误
条件等待中的错误
对Lock和Condition的误用
在休眠或者等待的同时持有一个锁
自旋循环
面向方面的测试技术
AOP