阅读 247

高性能 Netty 之 TCP 粘包/拆包

粘包/拆包

上一篇文章讲了关于 Netty 初体验。这篇文章主要是讲解 Netty 是如何解决 TCP 粘包/拆包 的问题。

TCP 粘包/拆包 其实是网络编程最经常用到的。只要一天依赖着 TCP,那么你就跑不掉。例如你做网络应用,如果你使用 Socket 进行网络通信,那你其实相当于在传输层操作(因为 Socket 是应用层与 TCP/IP 协议族通信的中间软件抽象层,它是一组接口)。所以你要注意 TCP 粘包/拆包 的问题;或者你做硬件通信,目前挺多硬件公司喜欢利用 TCP 的优势创建物联网,所以这个问题也需要注意。

下面讲讲什么是 TCP 粘包/拆包!

TCP 拆包

传输控制协议 TCP 是 Internet 一个重要的传输协议。TCP 提供了面向连接/可靠/有序/字节流传输服务。所谓的字节流,因为 TCP 协议是一个流协议。所谓的流,就是没有界限的一串数据。由于 TCP 处于传输层,它并不明白上层的业务(也不需要,因为一般是上层依赖的下层而不是互相依赖),所以 TCP 传送的过程中,只管根据网络情况,传输介质等因素来找到一次传输的最佳长度

所以这样就延伸出来一个问题: 既然 TCP 不了解上层业务数据的具体含义,它是怎么对缓冲区的数据进行划分发送呢?如果随机划分是不是会造成一个请求被切割成两半或者更多?

答案是确定的,这种情况之下由于 TCP 会根据自己的策略/根据当前的环境去计算每次发送的字节流的长短,然后以最合适的长度拆分进行发送。这样会导致一个完整的包可能被 TCP 拆分一次或多次进行发送。这就是 TCP 拆包 行为。

上面只是讲了大致上一个原理,但如果讲得更细的话 TCP 会进行拆包的原因从各个网络层来看有以下几种:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区大小。在开发者开发的过程中,最常见的就是代码通过Socket来编程式发起与远程主机的链接。当写入套接字缓冲区的字节大小超越了最大值,就会被拆包。
  2. 进行MSS大小的TCP分段。由于TCP连接的两端都设有发送缓存与接受缓存,用来临时存放双向通信的数据,在发送时候,应用程序在把数据传给TCP缓存后,就可以做自己的事情,而TCP在适当的时机进行发送。但是TCP每次从缓存获取的数据量受限于一个参数-最大报文段长度(Maximum Segment Size,MSS)。这个参数的大小通常是与本地主机发送的最大链路层帧长度来设置,设置MSS要保证一个TCP报文段(封装在一个IP的数据报中)加上TCP与IP首部长度(通常40个字节)将适合单个链路层。需要注意的是,MSS指的是报文段中应用层的数据的最大长度而不包括TCP首部的TCP报文段的最大长度。所以MSS限制了报文段数据字段的最大长度导致拆包。
  3. 以太网帧的payload大于MTU进行IP分片。这个属于链路层的数据分片。不同的数据链路层协议所能够承受的网络层数据报的最大长度不尽相同,以太网帧可以承载的数据量最大为 1500 字节,而有一些数据链路层协议所能承载的数据最大长度远小于这个值。一个数据链路层所能承载的最大数据量称为最大传输单元(Maxumum Transmission Unit,MTU)。当承载的数据超过了这个长度,则会将IP数据进行分片。

可能你会问,如果每个请求数据量并不大,那不就避免了 TCP 进行拆包吗?那就是我们将要讲的另外一个问题: TCP 粘包

TCP 粘包

事实上,我们在不同的业务操作,每次发送的数据量是不定的。有可能仅仅是为了发送一个心跳,下一秒就是一个大文件传输。所以我们只能面对去解决多变的业务场景。TCP 协议为了提高传输效率,同时兼容更加复杂的环境,往往做了这方面的优化。例如说,发送端为了提高网络传输的成功率(),一般都是收集到足够的数据才发送一个 TCP 段。这样如果你有多个请求的数据量极小,那么 TCP 发送出去的数据可能包含了多个请求的数据,这就导致了多个数据包在一起了;而同样,由于接收方不及时接收缓冲区的包,造成多个包接收(客户端发送了一段数据,服务端只收了一小部分,服务端下次再收的时候还是从缓冲区拿上次遗留的数据,产生粘包)。这就是 TCP 的粘包行为。

我们已经讲了 TCP 拆包/粘包 的大致原理。下面我们讲讲我们该怎么解决。

拆包/粘包的解决方案

TCP 这方面,其实发送端真的很爽。它可以随便发,它发长发短并没有人管它。但是苦的是接收方,因为它鬼知道从哪里到哪里才是一个完整的包。所以,解决的核心在于,在传输层的上层,我们要做好约定,签好协议,大家都按照这个走,约束了发送方的行为同时给了一颗定心丸给接收方,那就皆大欢喜了。

一般这个问题的解决方案有以下几种:

  1. 传输的消息是定长的。只有长度都知道了,接收方就可以每次获取指定长度就完事了
  2. 在数据包的尾部加上回车符作为分割条件(结束条件)
  3. 将消息分为消息头和消息体。消息头中包含了消息的总长度。(一般来说消息头的第一个字段可以被设计表示为消息总长度)
  4. 更加复杂的应用层协议

Netty 如何应对?

其实问题找出来了,那么只剩下解决问题的手段了。很明显,我们可以在通过 Socket 将数据传进来的时候就开始分析数据流,然后封装成对应的数据报即可。上篇文章说过,Netty 支持扩展性事件驱动模型,仅仅需要通过对每一种解决方案做出处理就行了。例如 Netty 实现的编解码器 LineBasedFrameDecoderStringDecoder 是通过换行符号 \n\r\n 来作为结束符号结束读取。

那我们来写示范 demo。

示范DEMO[通过换行符作为结束标志位]

示范 demo 的核心思想很简单:我们要做到每个请求的请求数据都能够明确的分开,也就是无论是服务端还是客户端都能从数据中分辨出单个请求并输出返回。

服务端

首先是服务端的启动代码 TimeServer

TimeServer.java

public class TimeServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap s = new ServerBootstrap();
            s.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            ChannelFuture f = s.bind(port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e) {

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new TimeServer().bind(8080);
    }
}
复制代码

TimeServer 和上一篇文章的代码类似,不同的地方在于其 childHandler 初始化的内容 ChildChannelHandler

ChildChannelHandler.java

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel ch) throws Exception {
        //1 
        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
        //2
        ch.pipeline().addLast(new StringDecoder());
        //3
        ch.pipeline().addLast(new TimeServerHandler());
    }
}
复制代码

上面分为了三步:

  1. 以行为编解码基础的 LineBasedFrameDecoder,并设置最大行字节为 1024
  2. 使用 StringDecoder 将链条中的前结点编解码结果解码为字符串
  3. 根据链条中的前结点的编解码结果进行业务逻辑处理
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 由于 StringDecoder 已经解码称为 string,所以这里可以直接转换
        String body = (String) msg;
        System.out.println("the time server receive order " + body + "; the counter is : " + ++counter);
        // 判断
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
        //获取事件
        currentTime = currentTime + System.getProperty("line.separator");
        // 根据大小初始化 ByteBuf 并刷到缓存
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
复制代码

客户端

客户端代码给服务端代码类似,先看下 TimeClient.java

public class TimeClient {

    public void connect(int port, String host) {
        // 创建客户端处理 IO 读写的 NioEventLoopGroup 线程组
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            // 创建客户端辅助启动类
            Bootstrap b = new Bootstrap();

            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //调用connect发起异步请求,调用同步方法等待成功
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e ) {

        }finally {
            group.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        new TimeClient().connect(8080, "127.0.0.1");
    }
}
复制代码

同样,由于当服务器在返回数据的时候,客户端也要做数据解析防止粘包/拆包。客户端直接通过匿名函数来添加 LineBasedFrameDecoder / LineBasedFrameDecoder / TimeClientHandler

所以我们直接看下 TimeClientHandler 里面有哪些改变。

TimeClientHandler.java

public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = Logger.getLogger(title04.TimeClientHandler.class.getName());

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i=0;i<100;i++) {
            message= Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    //当客户端和服务端tcp链路建立成功之后,netty的nio线程会调用channelActive方法
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("now is : " + body + " ; the counter is : " + ++counter);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warning("unexpected exception from downstream: " + cause.getMessage());
        ctx.close();
    }
}

复制代码

TimeClientHandler 在其 channelActive 方法是写了一段循环一百次的代码,那就客户端相当于发送了 100 次请求,那么服务端也相当于应该能够拆分 100 次请求的数据包。

那么可能有人问,如果我并不是使用换行符来作为数据报的结束标志位,那不是不能用了?其实很简单,既然 Netty 都能使用换行符来拆分了,那么肯定也能抽象出一套逻辑来实现自定义符号。下面来讲讲对应的实现类!

示范DEMO[自定义分隔符作为结束标志位]

自定义分隔符就是避免开发面对的使用场景并不是使用换行符作为标志位。Netty 通过实现了 DelimiterBasedFrameDecoder 来自动完成以分隔符做为结束标志的消息解码。

服务端

我们直接上 demo。服务端依旧分为两部分:启动器处理器 启动器 EchoServer.java

public class EchoServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap s = new ServerBootstrap();
            s.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //添加日志
                    .handler(new LoggingHandler())
                    //处理
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //根据 "$_" 作为分隔符,然后进行分割
                            ByteBuf delimitoer = Unpooled.copiedBuffer("$_".getBytes());
                            //根据指定分隔符来切割信息流的开始和结束
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimitoer));
                            //字符串解码
                            ch.pipeline().addLast(new StringDecoder());
                            //将字符传递给服务端处理器
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });

            ChannelFuture f = s.bind(port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e) {

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer().bind(8080);
    }
}
复制代码

上面的代码与之前的例子区别不大,主要是在初始化 ChannelHandler 的地方加多了 DelimiterBasedFrameDecoder 进行解决粘包/拆包问题。服务端大致上已经将数据处理整齐了,接下来服务端处理器会比较轻松 EchoServerHandler.java

EchoServerHandler.java

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    int count = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("this is "+ ++count + " times receive client:[" + body + "]" );
        body += "$_";
        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
        ctx.writeAndFlush(echo);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

客户端

在客户端的改动与服务端一致,主要还是 DelimiterBasedFrameDecoder 的添加。

public class EchoClient {

    public void connect(int port, String host) {
        // 创建客户端处理 IO 读写的 NioEventLoopGroup 线程组
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            // 创建客户端辅助启动类
            Bootstrap b = new Bootstrap();

            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimitoer = Unpooled.copiedBuffer("$_".getBytes());
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimitoer));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //调用connect发起异步请求,调用同步方法等待成功
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e ) {

        }finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new EchoClient().connect(8080, "127.0.0.1");
    }
}
复制代码

而客户端处理器需要输出结果,查看是不是有分隔符没分割成功的问题

public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    private int counter;
    static final String ECHO_REQ = "Hi, Lilinfeng. welcome to Netty.$_";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i=0;i < 100;i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("this is "+ ++counter + " times receive server :[" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

示范DEMO[定长编解码作为结束标志位]

定长编解码相当于解决 TCP 解决粘包/拆包问题的第三种方案。服务端依旧与之前的例子相似,添加上对应的编解码器即可。

服务端

EchoServer


public class EchoServer {

    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap s = new ServerBootstrap();
            s.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception { 
                            //添加定长编解码器
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });

            ChannelFuture f = s.bind(port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e) {

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer().bind(8080);
    }
}
复制代码

EchoServerHandler 处理器比较简单,直接输出就可以了。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    int count = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("receive client :[ " + msg + "]");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

客户端

至于客户端也同样操作,两者差距不大,这里就不贴代码了。

结语

我们整篇文章记录了以下内容

  1. TCP粘贴/拆包 是什么
  2. TCP粘贴/拆包 是因为什么而出现的
  3. TCP粘贴/拆包 的解决方案是什么
  4. Netty 对于 TCP粘贴/拆包 给出的解决方案

同时我们发现,在 Netty 当中,如果我们有了解决问题新的替代方案,我们在代码层面的改动是非常小的。这一点也体现了 Netty 事物驱动强大的扩展性。

完!