并发程序的测试

1,059 阅读7分钟

并发测试的分类

安全性测试(不发生任何错误的行为)

指标

不变性条件
后验条件

活跃度测试(某个良好的行为终究会发生)

指标

吞吐量:指一组并发任务中已经完成任务所占的比例

响应性:指请求从出发到完成之间的时间(也称延迟)

可伸缩性:指在增加更多资源(CPU)的情况下,吞吐量的提升情况

测试用例

    public class BoundedBuffer<E> {
       private final Semaphore availableItems, availableSpaces;
       private final E[] items;
       private int putPosition = 0, takePosition = 0;

       public BoundedBuffer(int capacity) {
              availableItems = new Semaphore(0);
              availableSpaces = new Semaphore(capacity);
              items = (E[]) new Object[capacity];
       }

       public boolean isEmpty() {
              return availableItems.availablePermits() == 0;
       }

       public boolean isFull() {
              return availableSpaces.availablePermits() == 0;
       }

       public void put(E x) throws InterruptedException {
              availableSpaces.acquire();
              doInsert(x);
              availableItems.release();
       }

       public E take() throws InterruptedException {
              availableItems.acquire();
              E item = doExtract();
              availableSpaces.release();
              return item;
       }

       private synchronized void doInsert(E x) {
              int i = putPosition;
              items[i] = x;
              putPosition = (++i == items.length) ? 0 : i;
       }

       private synchronized E doExtract() {
              int i = takePosition;
              E x = items[i];
              items[i] = null;
              takePosition = (++i == items.length) ? 0 : i;
              return x;
       }
    }

正确性测试

不变性条件:元素个数与剩余空间之和为数组大小

后验条件:数组为空,take操作阻塞;数组为满,put操作阻塞

单元测试

    public class BoundedBufferTest extends TestCase {
       public void testIsEmptyWhenConstructed() {
              BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
              assertTrue(bb.isEmpty());
              assertFalse(bb.isFull());
        }

       public void testIsFullAfterPuts() throws InterruptedException {
              BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
              for (int i = 0; i < 10; i++)
                     bb.put(i);
              assertTrue(bb.isFull());
              assertFalse(bb.isEmpty());
       }
    }

对阻塞操作的测试

public void testTakeBlocksWhenEmpty() {
    final BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
    Thread taker = new Thread() {
        public void run() {
            try {
                int unused = bb.take();
                fail(); 
            } catch (InterruptedException  success) { }
        }};
    try {
        taker.start();
        Thread.sleep(1);
        taker.interrupt();
        taker.join(10);
        assertFalse(taker.isAlive());
    } catch (Exception unexpected) {
        fail();
    }
}

安全性测试

在构建对并发类的安全性测试中,需要解决的关键问题在于,要找出那些容易检查的属性,这些属性在发生错误的情况下极有可能失败,同时又不会使得错误检查代码人为地限制并发性。理想的情况是,在测试属性中不需要任何同步机制。

public class PutTakeTest extends TestCase {
      protected static final ExecutorService pool = Executors
                    .newCachedThreadPool();
      protected CyclicBarrier barrier;
      protected final BoundedBuffer<Integer> bb;
      protected final int nTrials, nPairs;
      protected final AtomicInteger putSum = new AtomicInteger(0);
      protected final AtomicInteger takeSum = new AtomicInteger(0);

      public static void main(String[] args) throws Exception {
             new PutTakeTest(10, 10, 100000).test(); // sample parameters
             pool.shutdown();
      }

      public PutTakeTest(int capacity, int npairs, int ntrials) {
             this.bb = new BoundedBuffer<Integer>(capacity);
             this.nTrials = ntrials;
             this.nPairs = npairs;
             this.barrier = new CyclicBarrier(npairs * 2 + 1);
      }

      void test() {
             try {
                    for (int i = 0; i < nPairs; i++) {
                           pool.execute(new Producer());
                           pool.execute(new Consumer());
                    }
                    barrier.await(); 
                    barrier.await(); 
                    assertEquals(putSum.get(), takeSum.get());
             } catch (Exception e) {
                    throw new RuntimeException(e);
             }
      }

      class Producer implements Runnable {
             public void run() {
                    try {
                           barrier.await();
                           int seed = (this.hashCode() ^ (int) System.nanoTime());
                           int sum = 0;
                           for (int i = nTrials; i > 0; --i) {
                                  bb.put(seed);
                                  sum += seed;
                                  seed = xorShift(seed);
                           }
                           putSum.getAndAdd(sum);
                           barrier.await();
                    } catch (Exception e) {
                           throw new RuntimeException(e);
                    }
             }
      }

      class Consumer implements Runnable {
             public void run() {
                    try {
                           barrier.await();
                           int sum = 0;
                           for (int i = nTrials; i > 0; --i) {
                                  sum += bb.take();
                           }
                           takeSum.getAndAdd(sum);
                           barrier.await();
                    } catch (Exception e) {
                           throw new RuntimeException(e);
                    }
             }
      }

      static int xorShift(int y) {
             y ^= (y << 6);
             y ^= (y >>> 21);
             y ^= (y << 7);
             return y;
      }
}

testLeak方法将多个大型对象插入到一个有界缓存中,然后将它们移除。第2个堆快照中的内存用量应该与第1个堆快照中的内存用量基本相同。然而,doExtract如果忘记将返回元素的引用置为空(items[i] = null),那么在两次快照中报告的内存用量将明显不同。(这是为数不多几种需要显式地将变量置空的情况之一。大多数情况下,这种做法不仅不会带来帮助,甚至还会带来负面作用。)

使用回调

class TestingThreadFactory implements ThreadFactory {
    public final AtomicInteger numCreated = new AtomicInteger();
    private final ThreadFactory factory
            = Executors.defaultThreadFactory();

    public Thread newThread(Runnable r) {
        numCreated.incrementAndGet();
        return factory.newThread(r);
    }
}
public class TestThreadPool extends TestCase {

    private final TestingThreadFactory threadFactory = new TestingThreadFactory();

    public void testPoolExpansion() throws InterruptedException {
        int MAX_SIZE = 10;
        ExecutorService exec = Executors.newFixedThreadPool(MAX_SIZE);

        for (int i = 0; i < 10 * MAX_SIZE; i++)
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(Long.MAX_VALUE);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        for (int i = 0;
             i < 20 && threadFactory.numCreated.get() < MAX_SIZE;
             i++)
            Thread.sleep(100);
        assertEquals(threadFactory.numCreated.get(), MAX_SIZE);
        exec.shutdownNow();
    }
}

产生更多的交替操作

public synchronized void transferCredits(Account from,
                                         Account to,
                                         int amount) {
    from.setBalance(from.getBalance() - amount);
    if (random.nextInt(1000) > THRESHOLD)
        Thread.yield();//切换到另一线程
    to.setBalance(to.getBalance() + amount);
}

性能测试

性能测试通常是功能测试的延伸。事实上,在性能测试中应该包含一些基本的功能测试,从而确保不会对错误地代码进行性能测试。 虽然在性能测试与功能测试之间肯定会存在重叠之处,但它们的目标是不同的。性能测试将衡量典型测试用例中的端到端性能。通常,要获得一组合理地使用场景并不容易,理想情况下,在测试中应该反映被测试对象在应用程序中的实际用法。

在PutTakeTest中增加计时功能

public class TimedPutTakeTest extends PutTakeTest {
       private BarrierTimer timer = new BarrierTimer();

       public TimedPutTakeTest(int cap, int pairs, int trials) {
              super(cap, pairs, trials);
              barrier = new CyclicBarrier(nPairs * 2 + 1, timer);
       }

       public void test() {
              try {
                     timer.clear();
                     for (int i = 0; i < nPairs; i++) {
                            pool.execute(new PutTakeTest.Producer());
                            pool.execute(new PutTakeTest.Consumer());
                     }
                     barrier.await();//等待所有线程都准备好后开始往下执行
                     barrier.await();//等待所有线都执行完后开始往下执行
                     //每个元素完成处理所需要的时间
                     long nsPerItem = timer.getTime() / (nPairs * (long) nTrials);
                     System.out.print("Throughput: " + nsPerItem + " ns/item");
                     assertEquals(putSum.get(), takeSum.get());
              } catch (Exception e) {
                     throw new RuntimeException(e);
              }
       }

       public static void main(String[] args) throws Exception {
              int tpt = 100000; // 每对线程(生产-消费)需处理的元素个数
              //测试缓存容量分别为1、10、100、1000的情况
              for (int cap = 1; cap <= 1000; cap *= 10) {
                     System.out.println("Capacity: " + cap);
                     //测试工作线程数1、2、4、8、16、32、64、128的情况
                     for (int pairs = 1; pairs <= 128; pairs *= 2) {
                            TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt);
                            System.out.print("Pairs: " + pairs + "\t");

                            //测试两次                         
                            t.test();//第一次
                            System.out.print("\t");
                            Thread.sleep(1000);

                            t.test();//第二次
                            System.out.println();
                            Thread.sleep(1000);
                     }
              }
              PutTakeTest.pool.shutdown();
       }

       //关卡动作,在最后一个线程达到后执行。在该测试中会执行两次:
       //一次是执行任务前,二是所有任务都执行完后
       static class BarrierTimer implements Runnable {
              private boolean started;//是否是第一次执行关卡活动
              private long startTime, endTime;

              public synchronized void run() {
                     long t = System.nanoTime();
                     if (!started) {//第一次关卡活动走该分支
                            started = true;
                            startTime = t;
                     } else
                            //第二次关卡活动走该分支
                            endTime = t;
              }

              public synchronized void clear() {
                     started = false;
              }

              public synchronized long getTime() {//任务所耗时间
                     return endTime - startTime;
              }
       }
}

多种算法的比较

BoundedBuffer性能不高的主要原因:put和take操作分别都有多个操作可能遇到竞争——获取一个信号量,获取一个锁、释放信号量 在测试的过程中发现LinkedBlockingQueue的伸缩性好于ArrayBlockingQueue,这主要是因为链接队列的put和take操作允许有比基于数组的队列更好的并发访问,好的链接队列算法允许队列的头和尾彼此独立地更新。LinkedBlockingQueue中好的并发算法抵消了创建节点元素的开销,那么这种算法通常具有更高的可伸缩性。这似乎与传统性能调优相违背

响应性衡量

所谓的公平信号量是获得锁的顺序与线程启动顺序有关,但不代表100%地获得信号量,仅仅是在概率上能得到保证。而非公平信号量就是无关的了。

缓存过小,将导致非常多的上下文切换次数,这即是在非公平模式中也会导致很低的吞吐量。

因此,***除非线程由于密集的同步需求而被持续地阻塞,否则非公平的信号量通常能实现更好的吞吐量,而公平的信号量则实现更低的变动性。***因为这些结果之间的差异非常大,所以Semaphore要求客户选择针对哪个特性进行优化。

避免性能测试的陷阱

垃圾回收

由于执行时序无法预测,因此增长测试时间,可以引发多次垃圾回收,从而得到更精确的结果。

动态编译

由于编译的执行时机无法预测,增长程序运行时间,这样编译过程以及解释执行总是总运行时间的很小一部分。还可以,放弃程序预热阶段,只测试编译后的程序。

对代码路径的不真实采样

不真实的竞争程度

无用代码的消除

其他的测试方法

代码审查

静态分析工具

不一致的同步

调用Thread.run

未被释放的锁

空的同步块

双重检查加锁

在构造函数中启动另一个线程

通知错误

条件等待中的错误

对Lock和Condition的误用

在休眠或者等待的同时持有一个锁

自旋循环

面向方面的测试技术

AOP

分析与检测工具