Reactor网络编程模型是java.util.concurrent包的作者Dung Lea在《Scalable IO in Java》中提出的高性能的网络编程服务架构模式,Netty,Mina等高性能的NIO服务框架都采用的是Reactor模式。
1. 网络服务
网络服务以不同的形式存在,如web服务、分布式服务等,但是它们大多具备相同的处理流程。
- 接收请求,从网络I/O中读取字节流数据。
- 对请求进行解码,将字节流转为约定的对象。
- 处理业务逻辑。
- 对响应进行编码,将经过业务逻辑处理后转为字节流。
- 返回响应,发送响应到网络I/O。
2. 传统的服务设计模式
在网络服务中为每一个连接的处理开启一个新的线程,如下图:
在看我如何把NIO拉下神坛中已经分析过这种模式严重依赖线程,系统伸缩性差,无法应对海量的请求。
3. 高性能可伸缩的服务设计模式
在构建高性能可伸缩的网络服务的过程中,我们希望达到以下的目标:
-
在海量请求高负载的情况下能够优雅降级。
-
硬件资源的升级能够持续的为系统带来性能提升(cpu,内存,磁盘,带宽)。
-
还要满足可用性和性能目标
- 低延时
- 高负载
- 可调节的服务质量
分治法通常是最好的实现任何可伸缩性目标的方法。
4. 分治法(Divide and Conquer)
- 将完整的处理过程分解为若干个小任务。
- 每个小任务执行一个动作,并且不产生阻塞。
- 在任务准备好的时候去执行它。这里,一个I/O事件通常被作为触发器。
java.nio提供了实现分治法的基本机制
- 非阻塞的读写。
- 基于I/O事件分发相关的任务(读就绪,写就绪,连接事件)。
基于事件驱动的设计模式,为高性能网络服务架构带来丰富的可扩展性。
5. 基于事件驱动的设计模式(Event-driven Designs)
基于事件驱动的设计模式通常比其他的选择更加有效。
- 占用的资源更少:不需要为每个客户端开启一个新的线程。
- 开销小:更少的线程上下文切换,更少的锁。
但是任务调度相对要慢一些,并且更难编程。因为必须手动将操作绑定到事件,相关的功能必须拆封成简单的非阻塞操作,类似于GUI事件驱动机制,当然不可能消除所有的阻塞,例如GC,页中断(page faults)等。必追踪服务的逻辑状态(因为是事件驱动的,所以需要根据状态判断执行的动作)。
5.1 AWT中的事件驱动设计
6. Reactor模式
- Reactor模式通过分配适当的处理器(Handler)来响应IO事件,类似于AWT的线程。
- 每个Handler执行非阻塞操作,类似于动作监听(ActionListeners)。
- 通过将Handler绑定到事件进行管理,类似于AWT的添加动作监听(addActionListeners)。
Reactor模式核心角色
- Reactor:将IO事件分发给Acceptor或者对应的Handler。
- Acceptor:处理新的客户端连接,创建连接对应的Handler。
- Handler:执行非阻塞的读写任务。
6.1 单线程模式
- Reator线程初始化
@Slf4j
public class ServerReactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
@Setter
private volatile boolean stop = false;
// ************ Reactor 1: Setup ***********************
public ServerReactor(int port, int backlog) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(port), backlog);
serverSocket.setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
// 将channel注册到多路复用器上,并监听ACCEPT事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
log.info("ServerSocket-Key: [{}]",selectionKey);
// 添加Acceptor处理新连接
selectionKey.attach(new Acceptor(selector, selectionKey, serverSocketChannel));
}
@Override
public void run() {
// ************ Reactor 2: Dispatch Loop ***********************
try {
// 无限的接收客户端连接
while (!stop && !Thread.interrupted()) {
int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 移除key,否则会导致事件重复消费
it.remove();
KeyUtil.keyOps(key);
try {
dispatch(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
} catch (IOException e) {
log.error("{}", e);
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
log.error("{}", e);
}
}
}
private void dispatch(SelectionKey key) {
// 如果是连接事件获取是Acceptor
// 如果是是IO读写事件获取是对应的Handler
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
- 第二步:Acceptor创建
@Slf4j
public class Acceptor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
final SelectionKey selectionKey;
// ************ Reactor 3: Acceptor ***********************
public Acceptor(Selector selector, SelectionKey selectionKey, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.selectionKey = selectionKey;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
try {
if (selectionKey.isValid() && selectionKey.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
log.info("channel [{}->{}] establish", socketChannel.getRemoteAddress(), socketChannel.getLocalAddress());
// 创建对应的Handler
new BasicHandler(selector, socketChannel);
}
} catch (IOException e) {
log.error("{}", e);
}
}
}
- 第三步:Handler创建
@Slf4j
public class BasicHandler implements Runnable {
final SelectionKey selectionKey;
final SocketChannel socketChannel;
// ************ Reactor 4: Handler ***********************
public BasicHandler(Selector selector, SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
this.socketChannel.configureBlocking(false);
selectionKey = this.socketChannel.register(selector, 0);
// attach替换
selectionKey.attach(this);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
log.info("Socket-Key: [{}]", selectionKey);
}
@Override
public void run() {
try {
if (selectionKey.isReadable()) {
doRead();
} else if (selectionKey.isWritable()) {
doWrite();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void doWrite() {
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (scanner.hasNext()) {
try {
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(scanner.nextLine().getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
} catch (Exception e) {
}
}
}).start();
selectionKey.interestOps(SelectionKey.OP_READ);
}
protected void doRead() throws IOException {
// setup1: ****一次性读取数据***
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
// 业务处理
process(readBuffer);
selectionKey.interestOps(SelectionKey.OP_WRITE);
} else if (readBytes < 0) {
selectionKey.cancel();
socketChannel.close();
}
}
protected void process(ByteBuffer readBuffer) {
// setup2: ****解码***
// setup3: ****处理数据***
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
log.info("recv client content: " + new String(bytes));
try {
TimeUnit.SECONDS.sleep(10);
log.info("业务处理完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
// setup4: ****编码***
}
}
该模型适用于Handler能快速处理业务逻辑的场景。串行化处理可以最大限度减少锁的竞争,但是不能充分利用CPU多核的资源。
6.2 单Reactor多Work线程模型
为了充分利用多核的优势,我们可以采用多线程模式用于处理非IO操作来获得更高的伸缩性,一般来说业务处理(process方法)会有耗时比较长的可能,而业务处理阻塞会影响Reacotr的性能,所以我们可以把业务处理的**非IO操作(编/解码,业务逻辑)**交给Work线程池来处理。
- 卸载非IO操作来提升Reactor线程的处理性能,类似于POSA2中Proactor的设计。
- 比将非IO操作重新设计为事件驱动的方式更简单。
- 很难与IO重叠处理,最好能先将所有输入读入缓冲区。
- 使用线程池对线程资源进行调优与控制,通常情况下需要的线程数量比客户端数量少很多。
@Slf4j
public class MultiThreadHandler extends BasicHandler {
static final int threadPoolSize = Runtime.getRuntime().availableProcessors() + 1;
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadPoolSize,
threadPoolSize,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000));
public MultiThreadHandler(Selector selector, SocketChannel socketChannel) throws IOException {
super(selector, socketChannel);
}
@Override
protected void doRead() throws IOException {
// setup1: ****一次性读取数据***
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
// 线程池进行业务处理
executor.execute(new Processer(readBuffer));
selectionKey.interestOps(SelectionKey.OP_WRITE);
} else if (readBytes < 0) {
selectionKey.cancel();
socketChannel.close();
}
}
class Processer implements Runnable {
final ByteBuffer buffer;
Processer(ByteBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
process(buffer);
}
}
}
6.3 多Reactor多Worker线程模型
为了进一步协调CPU和IO读写效率,提升系统的资源利用率。可以将Reactor拆封为两个部分,MainReactor负责监听socket,用于处理accept事件,将建立的连接分发给SubReactor,SubReactor由线程池执行,负责处理IO读写事件。Netty采用的就是这种模式。
@Slf4j
public class MainReactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
@Setter
private volatile boolean stop = false;
public MainReactor(int port, int backlog) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(port), backlog);
serverSocket.setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
// 将channel注册到多路复用器上,并监听ACCEPT事件
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
log.info("ServerSocket-Key: [{}]", selectionKey);
// 添加Acceptor处理新连接
selectionKey.attach(new MultiAcceptor(selectionKey, serverSocketChannel));
}
@Override
public void run() {
try {
while (!stop && !Thread.interrupted()) {
int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 移除key,否则会导致事件重复消费
it.remove();
KeyUtil.keyOps(key);
try {
dispatch(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
} catch (IOException e) {
log.error("{}", e);
}
}
private void dispatch(SelectionKey key) {
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
@Slf4j
public class MultiAcceptor implements Runnable {
static final int subReactorSize = Runtime.getRuntime().availableProcessors();
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
subReactorSize,
subReactorSize,
60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(1000), new NameThreadFactory("subReactor")
);
private Selector[] selectors = new Selector[subReactorSize]; // 多路复用器个数和SubReactor线程个数相同
private ServerSocketChannel serverSocketChannel;
final SelectionKey selectionKey;
private volatile int next = 0;
public MultiAcceptor(SelectionKey selectionKey, ServerSocketChannel serverSocketChannel) throws IOException {
this.selectionKey = selectionKey;
this.serverSocketChannel = serverSocketChannel;
init();
}
public void init() throws IOException {
for (int i = 0; i < subReactorSize; i++) {
selectors[i] = Selector.open();
}
}
@Override
public synchronized void run() {
try {
if (selectionKey.isValid() && selectionKey.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
log.info("channel [{}->{}] establish", socketChannel.getRemoteAddress(), socketChannel.getLocalAddress());
if (socketChannel != null) {
executor.execute(new SubReactor(selectors[next], socketChannel));
}
if (++next == selectors.length)
next = 0;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Slf4j
public class SubReactor implements Runnable {
private Selector selector;
public SubReactor(Selector selector, SocketChannel socketChannel) throws IOException {
this.selector = selector;
new MultiThreadHandler(selector, socketChannel);
}
@Setter
private boolean stop = false;
@Override
public void run() {
// ************ Reactor 2: Dispatch Loop ***********************
try {
while (!stop && !Thread.interrupted()) {
int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 移除key,否则会导致事件重复消费
it.remove();
KeyUtil.keyOps(key);
try {
dispatch(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
} catch (IOException e) {
log.error("{}", e);
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
log.error("{}", e);
}
}
}
private void dispatch(SelectionKey key) {
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
拆分后的Reactor模型,扩展性更强。不同的Reactor职责更加明确,一般来说一个MainReactor线程和多个SubReactor线程就能处理百万级别的客户端连接。
参考文献: