阅读 9

常见的并发模式

Immutablility模式

解决并发问题最粗暴的方式是让共享变量只有读操作,没有写操作。

Immutablility模式要求对象一旦被创建后,状态就不能再发生改变。要创建不可变对象,就要设计不可变类。将类设计为final类,类中的属性都使用final修饰,然后只提供获取属性的方法。这个类就是不可变类。JDK中的String和基础类型的包装类都是不可变类。

public final class String {
  private final char value[];
  // 字符替换
  String replace(char oldChar, 
      char newChar) {
    // 无需替换,直接返回 this  
    if (oldChar == newChar){
      return this;
    }

    int len = value.length;
    int i = -1;
    /* avoid getfield opcode */
    char[] val = value; 
    // 定位到需要替换的字符位置
    while (++i < len) {
      if (val[i] == oldChar) {
        break;
      }
    }
    // 未找到 oldChar,无需替换
    if (i >= len) {
      return this;
    } 
    // 创建一个 buf[],这是关键
    // 用来保存替换后的字符串
    char buf[] = new char[len];
    for (int j = 0; j < i; j++) {
      buf[j] = val[j];
    }
    while (i < len) {
      char c = val[i];
      buf[i] = (c == oldChar) ? 
        newChar : c;
      i++;
    }
    // 创建一个新的字符串返回
    // 原字符串不会发生任何变化
    return new String(buf, true);
  }
}
复制代码

涉及到需要修改不可变类的属性时,其实都是返回这个类的新对象。

如果担心修改太多导致频繁创建不可变类,占用太多的内存,那么可以结合享元模式进行对象的共享。JDK中的基础类型包装类都使用了享元模式。

享元模式就是一个对象池,创建对象之前,现在对象池里面查找是否存在,如果存在,就用已存在的对象,如果不存在,那就创建新对象,然后加入对象池中。

public final class Long {
    Long valueOf(long l) {
      final int offset = 128;
      // [-128,127] 直接的数字做了缓存
      if (l >= -128 && l <= 127) { 
        return LongCache
          .cache[(int)l + offset];
      }
      return new Long(l);
    }
    // 缓存,等价于对象池
    // 仅缓存 [-128,127] 直接的数字
    static class LongCache {
      static final Long cache[] 
        = new Long[-(-128) + 127 + 1];
    
      static {
        for(int i=0; i<cache.length; i++)
          cache[i] = new Long(i-128);
      }
    }
}
复制代码

正是因为使用了享元模式,String和所有基础类型的包装类,都不适合作为锁。这会导致看上去好像私有的锁,其实是共有的。

class A {
  Long al=Long.valueOf(1);
  public void setAX(){
    synchronized (al) {
      // 省略代码无数
    }
  }
}
class B {
  Long bl=Long.valueOf(1);
  public void setBY(){
    synchronized (bl) {
      // 省略代码无数
    }
  }
}
复制代码

上面代码中,al和bl是同一把锁。会导致相互阻塞。

如何解决不可变对象的原子性问题。

public class SafeWM {

  class WMRange{
    final int upper;
    final int lower;
    WMRange(int upper,int lower){
    // 省略构造函数实现
    }
  }
  
  final AtomicReference<WMRange>
    rf = new AtomicReference<>(
      new WMRange(0,0)
    );
    
  // 设置库存上限
  void setUpper(int v){
    while(true){
      WMRange or = rf.get();
      // 检查参数合法性
      if(v < or.lower){
        throw new IllegalArgumentException();
      }
      WMRange nr = new
          WMRange(v, or.lower);
      if(rf.compareAndSet(or, nr)){
        return;
      }
    }
  }
}
复制代码

Copy-on-Write模式

如果需要修改不可变对象的属性,参考String的replace方法,其实就是创建一个新的String对象,然后返回。也就是Copy-on-Write的方式。

不仅仅是对于不可变对象属性的写操作可以使用Copy-on-Write模式,其他的比如:

  1. CopyOnWriteArrayList和CopyOnWriteArraySet这两个容器也是使用Copy-on-Write模式,由于读操作无锁,所以读操作的性能很优越。
  2. 类Unix系统使用fork操作创建新进程时,在父子进程需要对各自的进程进行写操作之前,它们共享同一个内存地址。这样子可以起到延迟复制内存的作用。
  3. 函数式编程的基础是不可变性,所以它也基于Copy-on-Write模式,再需要改变的时候进行复制。
// 路由信息
public final class Router{
  private final String  ip;
  private final Integer port;
  private final String  iface;
  // 构造函数
  public Router(String ip, 
      Integer port, String iface){
    this.ip = ip;
    this.port = port;
    this.iface = iface;
  }
  // 重写 equals 方法
  public boolean equals(Object obj){
    if (obj instanceof Router) {
      Router r = (Router)obj;
      return iface.equals(r.iface) &&
             ip.equals(r.ip) &&
             port.equals(r.port);
    }
    return false;
  }
  public int hashCode() {
    // 省略 hashCode 相关代码
  }
}
// 路由表信息
public class RouterTable {
  //Key: 接口名
  //Value: 路由集合
  ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> 
    rt = new ConcurrentHashMap<>();
  // 根据接口名获取路由表
  public Set<Router> get(String iface){
    return rt.get(iface);
  }
  // 删除路由
  public void remove(Router router) {
    Set<Router> set=rt.get(router.iface);
    if (set != null) {
      set.remove(router);
    }
  }
  // 增加路由
  public void add(Router router) {
    Set<Router> set = rt.computeIfAbsent(
      route.iface, r -> 
        new CopyOnWriteArraySet<>());
    set.add(router);
  }
}
复制代码

ThreadLocal模式

解决并发问题的根本可以消除对象间的共享性。使用局部变量和ThreadLocal可以避免变量共享,实现线程封闭。

例如,

  1. 为每个线程分配一个独立的id:
static class ThreadId {
  static final AtomicLong 
  nextId=new AtomicLong(0);
  // 定义 ThreadLocal 变量
  static final ThreadLocal<Long> 
  tl=ThreadLocal.withInitial(
    ()->nextId.getAndIncrement());
  // 此方法会为每个线程分配一个唯一的 Id
  static long get(){
    return tl.get();
  }
}
复制代码
  1. 解决SimpleDateFormat的线程安全问题:
static class SafeDateFormat {
  // 定义 ThreadLocal 变量
  static final ThreadLocal<DateFormat>
  tl=ThreadLocal.withInitial(
    ()-> new SimpleDateFormat(
      "yyyy-MM-dd HH:mm:ss"));
      
  static DateFormat get(){
    return tl.get();
  }
}
// 不同线程执行下面代码
// 返回的 df 是不同的
DateFormat df =
  SafeDateFormat.get();
复制代码

JDK中ThreadLocal的实现方式:

class Thread {
  // 内部持有 ThreadLocalMap
  ThreadLocal.ThreadLocalMap 
    threadLocals;
}
class ThreadLocal<T>{
  public T get() {
    // 首先获取线程持有的
    //ThreadLocalMap
    ThreadLocalMap map =
      Thread.currentThread()
        .threadLocals;
    // 在 ThreadLocalMap 中
    // 查找变量
    Entry e = 
      map.getEntry(this);
    return e.value;  
  }
  static class ThreadLocalMap{
    // 内部是数组而不是 Map
    Entry[] table;
    // 根据 ThreadLocal 查找 Entry
    Entry getEntry(ThreadLocal key){
      // 省略查找逻辑
    }
    //Entry 定义
    static class Entry extends
    WeakReference<ThreadLocal>{
      Object value;
    }
  }
}
复制代码

可以看到,ThreadLocal只是一个代理工具类,他并不持有线程的相关信息。所有和线程相关的信息都保存在Thread类中,包括ThreadLocalMap,而且ThreadLocalMap里面的ThreadLocal是弱引用,只要Thread被回收,ThreadLocalMap就能被回收。

那么这里出现一个问题了,就是,如果Thread长时间没有被回收,那么会导致,ThreadLocalMap也不能被回收,ThreadLocalMap容器的内容也不能被回收。此时看看ThreadLocalMap的key和value,key是ThreadLocal,是弱引用,所以当ThreadLocal自己回收的时候,不会造成内存泄漏,但是value是强引用,不能被回收。需要手动回收value。

ExecutorService es;
ThreadLocal tl;
es.execute(()->{
  //ThreadLocal 增加变量
  tl.set(obj);
  try {
    // 省略业务逻辑代码
  }finally {
    // 手动清理 ThreadLocal 
    tl.remove();
  }
});
复制代码

Guarded Suspension模式

Guarded Suspension模式提供了异步转同步的方式。

Guarded Suspension模式由上图组成,首先,一个对象GuardedObject,内部有一个受保护的成员变量,以及两个方法get(Predicate p)和onChanged(T obj)。GuardedObject的get方法对应一个请求,方法中的参数p代表了请求完成的前提条件,当和onChanged方法触发时,会改变前提条件。

class GuardedObject<T>{
  // 受保护的对象
  T obj;
  final Lock lock = 
    new ReentrantLock();
  final Condition done =
    lock.newCondition();
  final int timeout=2;
  // 保存所有 GuardedObject
  final static Map<Object, GuardedObject> 
  gos=new ConcurrentHashMap<>();
  // 静态方法创建 GuardedObject
  static <K> GuardedObject 
      create(K key){
    GuardedObject go=new GuardedObject();
    gos.put(key, go);
    return go;
  }
  static <K, T> void 
      fireEvent(K key, T obj){
    GuardedObject go=gos.remove(key);
    if (go != null){
      go.onChanged(obj);
    }
  }
  // 获取受保护对象  
  T get(Predicate<T> p) {
    lock.lock();
    try {
      //MESA 管程推荐写法
      while(!p.test(obj)){
        done.await(timeout, 
          TimeUnit.SECONDS);
      }
    }catch(InterruptedException e){
      throw new RuntimeException(e);
    }finally{
      lock.unlock();
    }
    // 返回非空的受保护对象
    return obj;
  }
  // 事件通知方法
  void onChanged(T obj) {
    lock.lock();
    try {
      this.obj = obj;
      done.signalAll();
    } finally {
      lock.unlock();
    }
  }
}
复制代码

外部使用例子:

// 处理浏览器发来的请求
Respond handleWebReq(){
  int id= 序号生成器.get();
  // 创建一消息
  Message msg1 = new 
    Message(id,"{...}");
  // 创建 GuardedObject 实例
  GuardedObject<Message> go=
    GuardedObject.create(id);  
  // 发送消息
  send(msg1);
  // 等待 MQ 消息
  Message r = go.get(
    t->t != null);  
}
void onMessage(Message msg){
  // 唤醒等待的线程
  GuardedObject.fireEvent(
    msg.id, msg);
}
复制代码

Balking模式

当流程的执行顺序依赖于某个共享变量的场景,可以归纳为多线程if模式。

Balking模式的经典实现:

boolean changed=false;
// 自动存盘操作
void autoSave(){
  synchronized(this){
    if (!changed) {
      return;
    }
    changed = false;
  }
  // 执行存盘操作
  // 省略且实现
  this.execSave();
}
// 编辑操作
void edit(){
  // 省略编辑逻辑
  ......
  change();
}
// 改变状态
void change(){
  synchronized(this){
    changed = true;
  }
}
复制代码

如果对于共享变量不要求原子性,那么可以使用volitile来实现。

使用volatile实现Balking模式:

// 路由表信息
public class RouterTable {
  //Key: 接口名
  //Value: 路由集合
  ConcurrentHashMap<String, CopyOnWriteArraySet<Router>> 
    rt = new ConcurrentHashMap<>();    
  // 路由表是否发生变化
  volatile boolean changed;
  // 将路由表写入本地文件的线程池
  ScheduledExecutorService ses=
    Executors.newSingleThreadScheduledExecutor();
  // 启动定时任务
  // 将变更后的路由表写入本地文件
  public void startLocalSaver(){
    ses.scheduleWithFixedDelay(()->{
      autoSave();
    }, 1, 1, MINUTES);
  }
  // 保存路由表到本地文件
  void autoSave() {
    if (!changed) {
      return;
    }
    changed = false;
    // 将路由表写入本地文件
    // 省略其方法实现
    this.save2Local();
  }
  // 删除路由
  public void remove(Router router) {
    Set<Router> set=rt.get(router.iface);
    if (set != null) {
      set.remove(router);
      // 路由表已发生变化
      changed = true;
    }
  }
  // 增加路由
  public void add(Router router) {
    Set<Router> set = rt.computeIfAbsent(
      route.iface, r -> 
        new CopyOnWriteArraySet<>());
    set.add(router);
    // 路由表已发生变化
    changed = true;
  }
}
复制代码

双重检查实现Balking模式:

class Singleton{
  private static volatile 
    Singleton singleton;
  // 构造方法私有化  
  private Singleton() {}
  // 获取实例(单例)
  public static Singleton 
  getInstance() {
    // 第一次检查
    if(singleton==null){
      synchronize{Singleton.class){
        // 获取锁后二次检查
        if(singleton==null){
          singleton=new Singleton();
        }
      }
    }
    return singleton;
  }
}
复制代码

双重检查中使用volatile关键字来禁止重排序,然后第一次检查是为了性能考量,第二次检查是为了安全性考量。

Thread-Per-Message模式

并发模式中用于解决分工的模式通常有:Thread-Per-Message模式,Worker Thread模式,生产者-消费者模式。

Thread-Per-Message模式的思想是,为每个任务分配一个线程。

final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(
    new InetSocketAddress(8080));
// 处理请求    
try {
  while (true) {
    // 接收请求
    SocketChannel sc = ssc.accept();
    // 每个请求都创建一个线程
    new Thread(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模拟处理请求
        Thread.sleep(2000);
        // 写 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip();
        sc.write(wb);
        // 关闭 Socket
        sc.close();
      }catch(Exception e){
        throw new UncheckedIOException(e);
      }
    }).start();
  }
} finally {
  ssc.close();
}   
复制代码

但是这样再高并发条件下,会创建大量线程,造成内存飙升,甚至造成OOM。

解决方法是引入线程池,或者引入轻量级线程。

使用Fiber轻量级线程实现Thread-Per-Message模式:

final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(
    new InetSocketAddress(8080));
// 处理请求
try{
  while (true) {
    // 接收请求
    final SocketChannel sc = 
      serverSocketChannel.accept();
    Fiber.schedule(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模拟处理请求
        LockSupport.parkNanos(2000*1000000);
        // 写 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip()
        sc.write(wb);
        // 关闭 Socket
        sc.close();
      } catch(Exception e){
        throw new UncheckedIOException(e);
      }
    });
  }//while
}finally{
  ssc.close();
}
复制代码

Worker Thread模式

Worker Thread模式中的Worker Thread相当于上图中的工人。所以Worker Thread模式相当于JDK中的线程池的实现。可以避免重复创建,销毁线程,控制线程的上限以及接收指定任务的设置。

Worker Thread模式的经典实现:

ExecutorService es = new ThreadPoolExecutor(
  50, 500,
  60L, TimeUnit.SECONDS,
  // 注意要创建有界队列
  new LinkedBlockingQueue<Runnable>(2000),
  // 建议根据业务需求实现 ThreadFactory
  r->{
    return new Thread(r, "echo-"+ r.hashCode());
  },
  // 建议根据业务需求实现 RejectedExecutionHandler
  new ThreadPoolExecutor.CallerRunsPolicy());

final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(
    new InetSocketAddress(8080));
// 处理请求    
try {
  while (true) {
    // 接收请求
    SocketChannel sc = ssc.accept();
    // 将请求处理任务提交给线程池
    es.execute(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模拟处理请求
        Thread.sleep(2000);
        // 写 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip();
        sc.write(wb);
        // 关闭 Socket
        sc.close();
      }catch(Exception e){
        throw new UncheckedIOException(e);
      }
    });
  }
} finally {
  ssc.close();
  es.shutdown();
}   
复制代码

当往线程池里面提交任务时,需要注意任务间不要有依赖性。

如上图,第一阶段的任务需要等待第二阶段的任务完成才能返回。

//L1、L2 阶段共用的线程池
ExecutorService es = Executors.
  newFixedThreadPool(2);
//L1 阶段的闭锁    
CountDownLatch l1=new CountDownLatch(2);
for (int i=0; i<2; i++){
  System.out.println("L1");
  // 执行 L1 阶段任务
  es.execute(()->{
    //L2 阶段的闭锁 
    CountDownLatch l2=new CountDownLatch(2);
    // 执行 L2 阶段子任务
    for (int j=0; j<2; j++){
      es.execute(()->{
        System.out.println("L2");
        l2.countDown();
      });
    }
    // 等待 L2 阶段任务执行完
    l2.await();
    l1.countDown();
  });
}
// 等着 L1 阶段任务执行完
l1.await();
System.out.println("end");
复制代码

上面的程序会停留在l2.await();这里,因为线程池中只有两个线程,在for的外循环就会创建两个线程,然后线程池会等带外循环的两个任务执行完后,在执行内循环的两个子任务。所以会一直阻塞在l2.await();这里。

尽量为不同的任务创建线程池。

生产者-消费者模式

Java线程池的本质就是生产者-消费者模式。

生产者-消费者模式的核心是一个任务队列,生产者线程产生任务,并将任务添加到任务队列中,消费者线程从任务队列中获取任务,并执行。

生产者-消费者模式可以实现系统之间的解偶。生产者线程和消费者线程之间不直接接触,它们之间的通信通过消息队列来进行。

生产者-消费者模式还可以实现异步调用,还能够平衡生产者和消费者线程之间的速度。生产者只需要将任务添加到任务队列,无需等待任务被消费者线程执行,由此实现生产者和消费者之间的异步。

要是不需要平衡生产者和消费者线程之间的速度,那么可以使用线程池的方式来执行任务。假设生产者和消费者的效率为1:3,那么就需要创建3个生产者线程来应付1个消费者。使用生产者-消费者模式,可以通过控制消费者的效率,这样就能控制创建的线程数。

class Task
var bq = LinkedBlockingQueue<Task>(2000)//指定任务队列容量,避免任务太多OOM

fun start() {
    var es = Executors.newFixedThreadPool(5)//启动5个消费者线程,执行批量任务
    for (i in 0..5) {
        es.execute {
            while (true) {
                var ts = pollTasks()//获取批量任务
                execTasks(ts)//执行任务
            }
        }
    }
}

fun pollTasks(): List<Task> {
    var ts = LinkedList<Task>()
    var t = bq.take()//阻塞式获取任务
    while (t != null) {
        ts.add(t)
        t = bq.poll()//非阻塞式获取任务
    }
    return ts
}

fun execTasks(ts: List<Task>) {

}
复制代码

之所以首先采用阻塞式获取,是为了避免如果任务队列中没有任务,这样的方式可以避免无线循环。

生产者-消费者模式还支持分阶段提交任务。 比如,提交日志模块的操作要求:

  1. ERROR 级别日志马上刷盘
  2. 日志积累到500条立即刷盘
  3. 5秒刷盘一次
enum class LEVEL {
    INFO, ERROR
}

class LogMsg(var level: LEVEL, msg: String)

val bq = LinkedBlockingQueue<LogMsg>()

val batchSize = 500

val es = Executors.newFixedThreadPool(1)

fun start() {
    val file = File.createTempFile("log", ".log")
    val writer = FileWriter(file)

    es.execute {
        try {
            var curIdx = 0//未刷盘日志数量
            var preFT = System.currentTimeMillis()
            while (true) {
                val log = bq.poll(5, TimeUnit.SECONDS)
                //写日志
                if (log != null) {
                    writer.write(log.toString())
                    ++curIdx
                }
                //如果不存在未刷盘数据,则无需刷盘
                if (curIdx <= 0) {
                    continue
                }
                //根据规则刷盘
                if (log != null && log.level == LEVEL.ERROR || curIdx == batchSize || System.currentTimeMillis() - preFT > 5000) {
                    writer.flush()
                    curIdx = 0
                    preFT = System.currentTimeMillis()
                }
            }
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            try {
                writer.flush()
                writer.close()
            } catch (e: IOException) {
                e.printStackTrace()
            }
        }
    }
}
复制代码

使用方式

建议针对不同情况使用不同的解决方式。利于沟通与代码的理解性。

避免共享的设计模式

Immutability模式,Copy-on-Write模式,ThreadLocal模式本质上都是为了避免共享。

使用时需要注意Immutability模式的属性的不可变性,Copy-on-Write模式需要注意拷贝的性能问题,,ThreadLocal模式需要注意异步执行问题。

多线程版本的if模式

Guarded Suspension模式和Balking模式属于多线程版本的if模式。

Guarded Suspension模式需要注意性能。

// 获取受保护对象  
T get(Predicate<T> p) {
  try {
    //obj 的可见性无法保证
    while(!p.test(obj)){
      TimeUnit.SECONDS
        .sleep(timeout);
    }
  }catch(InterruptedException e){
    throw new RuntimeException(e);
  }
  // 返回非空的受保护对象
  return obj;
}
// 事件通知方法
void onChanged(T obj) {
  this.obj = obj;
}
复制代码

由于obj没有使用volatile修饰,可能导致可见性问题,会等带更长的时间。

Balking模式需要注意竞态问题。

class Test{
  volatile boolean inited = false;
  int count = 0;
  void init(){
    // 存在竞态条件
    if(inited){
      return;
    }
    // 有可能多个线程执行到这里
    inited = true;
    // 计算 count 的值
    count = calc();
  }
}  
复制代码

分工模式

Thread-Per-Message 模式、Worker Thread 模式和生产者 - 消费者模式属于最简单实用的多线程分工方法。

Thread-Per-Message 模式需要注意线程的创建,销毁以及是否会导致OOM。

Worker Thread 模式需要注意死锁问题,提交的任务之间不要有依赖性。

ExecutorService pool = Executors
  .newSingleThreadExecutor();
// 提交主任务
pool.submit(() -> {
  try {
    // 提交子任务并等待其完成,
    // 会导致线程死锁
    String qq=pool.submit(()->"QQ").get();
    System.out.println(qq);
  } catch (Exception e) {
  }
});
复制代码

生产者 - 消费者模式可以直接使用线程池来实现。停止方式如下:

class Logger {
  // 用于终止日志执行的“毒丸”
  final LogMsg poisonPill = 
    new LogMsg(LEVEL.ERROR, "");
  // 任务队列  
  final BlockingQueue<LogMsg> bq
    = new BlockingQueue<>();
  // 只需要一个线程写日志
  ExecutorService es = 
    Executors.newFixedThreadPool(1);
  // 启动写日志线程
  void start(){
    File file=File.createTempFile(
      "foo", ".log");
    final FileWriter writer=
      new FileWriter(file);
    this.es.execute(()->{
      try {
        while (true) {
          LogMsg log = bq.poll(
            5, TimeUnit.SECONDS);
          // 如果是“毒丸”,终止执行  
          if(poisonPill.equals(logMsg)){
            break;
          }  
          // 省略执行逻辑
        }
      } catch(Exception e){
      } finally {
        try {
          writer.flush();
          writer.close();
        }catch(IOException e){}
      }
    });  
  }
  // 终止写日志线程
  public void stop() {
    // 将“毒丸”对象加入阻塞队列
    bq.add(poisonPill);
    es.shutdown();
  }
}
复制代码
关注下面的标签,发现更多相似文章
评论