并发优化 - 降低锁颗粒

2,628 阅读3分钟

github

java并发模型是基于内存共享,多线程共享变量一般就会涉及到加锁,这里介绍几种降低锁竞争的方式,最终效果都是降低锁的颗粒或者降低锁的竞争次数。

常见锁优化方式

  1. 减少锁的持有时间。

    例如对一个方法加锁不如对其中的同步代码行加锁。

  2. 读写锁。

    可只对锁操作加锁,读不加锁。这样读、读之间不互斥, 读、写和写、读互斥,可使用J.U.C中的ReadWriteLock

  3. 减少锁颗粒。

    如ConcurrentHashMap中对segment加锁,而不是整个map加锁。比如map长度为128,分成8份,8份之间不互斥,8份内部才互斥,可以有效降低锁的竞争。

  4. 乐观锁。

    使用CAS算法,和期待值对比,如果一样则执行,不一样则重试等方式。

  5. 锁粗化。

    如果一个方法有好几行都是同步代码,对这几行单独加锁,不如对这个方法加锁,可以减少锁的竞争次数。

场景

服务A和服务B都会调用服务C的接口poll,同一个服务调用时互斥需要加锁,不同服务之间调用不互斥。 比如A服务有两个实例A1、A2,B服务有两个实例B1、B2, A1和B1共同调用poll接口时可以并行访问,A1和A2共同访问时必须串行执行, 即必须有一个请求执行完才可以执行完下一个。

  • 方式一:弱引用+synchronized

弱引用作用可以加快垃圾回收,这里的服务名很少,所以生成的锁对象很少可以不使用弱引用。可是其他场景可能要生成的锁对象有很多,可以使用弱引用加快垃圾回收。

@Component
public class StringLockProvider {

    private final Map<Mutex, WeakReference<Mutex>> mutexMap = new WeakHashMap<>();

    public Mutex getMutex(String id) {
        if (id == null) {
            throw new NullPointerException();
        }
        Mutex key = new MutexImpl(id);
        synchronized (mutexMap) {
            return mutexMap.computeIfAbsent(key, WeakReference::new).get();
        }
    }

    public interface Mutex {
    }

    private static class MutexImpl implements Mutex {
        private final String code;

        private MutexImpl(String id) {
            this.code = id;
        }

        public boolean equals(Object o) {
            if (o == null) {
                return false;
            }
            if (this.getClass() == o.getClass()) {
                return this.code.equals(o.toString());
            }
            return false;
        }

        public int hashCode() {
            return code.hashCode();
        }

        public String toString() {
            return code;
        }
    }
}

@RestController
public class PollController {

    @Autowired
    private StringLockProvider lockProvider;

    /**
     * @param service 拉取的服务名
     */
    @GetMapping("/v1/data/{service}")
    public List<String> poll(@PathVariable("service") String service) {
        List<String> data = new ArrayList<>(2 << 4);
        synchronized (lockProvider.getMutex(service)) {
            //同步代码,data.add
        }
        return data;
    }
}

  • 方式二:弱引用+ReentrantLock

此种效果和弱引用+synchronized一致, 此处不同主要在于ReentrantLock支持公平锁, 谁也等待谁先获取, 而synchronized非公平锁,随机选一个获取锁,当一个线程一直获取不到锁,需要等待较长时间,可能造成该接口超时。

@RestController
public class PollController {

    private final Map<String, WeakReference<ReentrantLock>> mutexMap = new ConcurrentHashMap<>();

    /**
     * @param service 拉取的服务名
     */
    @GetMapping("/v1/data/{service}")
    public List<String> poll(@PathVariable("service") String service) {
        List<String> data = new ArrayList<>(2 << 4);
        ReentrantLock lock = getReentrantLock(service);
        lock.lock();
        try {
            //同步代码。data.add
        } finally {
            lock.unlock();
        }
        return data;
    }

    private ReentrantLock getReentrantLock(String id) {
        if (id == null) {
            throw new NullPointerException();
        }
        return mutexMap.computeIfAbsent(id, it -> new WeakReference<>(new ReentrantLock(true))).get();
    }
}
  • 方式三:CAS乐观锁+循坏

此处使用了while循环,使用不当会造成线程阻塞,阻塞过多可能会造成死机!

@RestController
public class PollController {

    private final Map<String, WeakReference<AtomicBoolean>> atomicMap = new ConcurrentHashMap<>();

    /**
     * @param service 拉取的服务名
     */
    @GetMapping("/v1/data/{service}")
    public List<String> poll(@PathVariable("service") String service) {
        List<String> data = new ArrayList<>(2 << 4);
        AtomicBoolean atomic = getAtomicBoolean(service);
        //直到预期的值为true,才会成功,否则循环
        while (!atomic.compareAndSet(true, false)) {
           //Thread.sleep(100)
        }
        try {
            //同步代码。data.add
        }finally {
            atomic.set(true);
        }
        return data;
    }

    private AtomicBoolean getAtomicBoolean(String id) {
        if (id == null) {
            throw new NullPointerException();
        }
        return atomicMap.computeIfAbsent(id, it -> new WeakReference<>(new AtomicBoolean(true))).get();
    }
}