Netty实战之使用Netty解析交通部JT808协议

8,706 阅读11分钟

1.写此文的目的

使用Netty也有一段时间了,对Netty也有个大概的了解。回想起刚刚使用Netty的时候踩了很多坑,很多Netty的组件也不会使用,或者说用得不够好,不能称之为"最佳实践"。此文的目的便是带领大家使用Netty构建出一个完整的项目,将自己在实际开发经验中整理出的一些最佳实践分享出来,当然这些最佳实践不一定就是真正的最佳实践,只是自己在开发中整理的,或者参考其他优秀的代码一起整理出的,大家如果有什么不同意见或者更好的实践,欢迎大家在评论区分享,大家一起学习一起进步!

2. 项目准备

先奉上完整版代码 zpsw/jt808-netty

开发环境:IDEA+JDK1.8+Maven

使用框架: Netty + Spring Boot + Spring Data JPA

其他工具: lombok(没用过的同学建议了解一下,很方便)

3. 开发过程

3.1.认识JT808协议
3.2.构建编/解码器
3.3.构建业务Handler
3.4.Channel的高效管理方式
3.5.一些改进

3.1 认识JT808协议

下面简单介绍一下JT808协议的格式说明,完全版在JT808协议技术规范.pdf

其中消息体属性中我们先只关注消息体长度,不关注其他,分包情况先不考虑。

根据消息头和消息体我们可以抽象出一个最基本的数据结构

@Data
public class DataPacket {

    protected Header header = new Header(); //消息头
    protected ByteBuf byteBuf; //消息流

    @Data
    public static class Header {
        private short msgId;// 消息ID 2字节
        private short msgBodyProps;//消息体属性 2字节
        private String terminalPhone; // 终端手机号 6字节
        private short flowId;// 流水号 2字节

        //获取包体长度
        public short getMsgBodyLength() {
            return (short) (msgBodyProps & 0x3ff);
        }

        //获取加密类型 3bits
        public byte getEncryptionType() {
            return (byte) ((msgBodyProps & 0x1c00) >> 10);
        }

        //是否分包
        public boolean hasSubPackage() {
            return ((msgBodyProps & 0x2000) >> 13) == 1;
        }
    }
}

我们可以先将Header解析出来,然后由子类自己解析包体

 public void parse() {
        try{
            this.parseHead();
            //验证包体长度
            if (this.header.getMsgBodyLength() != this.byteBuf.readableBytes()) {
                throw new RuntimeException("包体长度有误");
            }
            this.parseBody();//由子类重写
        }finally {
            ReferenceCountUtil.safeRelease(this.byteBuf);//注意释放
        }
    }

    protected void parseHead() {
        header.setMsgId(byteBuf.readShort());
        header.setMsgBodyProps(byteBuf.readShort());
        header.setTerminalPhone(BCD.BCDtoString(readBytes(6)));
        header.setFlowId(byteBuf.readShort());
    }
    protected void parseBody() {

    }

其中readByte(int length)方法是对ByteBuf.readBytes(byte[] dst)的一个简单封装

public byte[] readBytes(int length) {
        byte[] bytes = new byte[length];
        this.byteBuf.readBytes(bytes);
        return bytes;
}

因为没有在Netty官方的Api中找到类似的方法,所以自己定义了一个

另外定义一个方法用于响应重写。

响应重写:

 public ByteBuf toByteBufMsg() {
        ByteBuf bb = ByteBufAllocator.DEFAULT.heapBuffer();
        bb.writeInt(0);//先占4字节用来写msgId和msgBodyProps
        bb.writeBytes(BCD.toBcdBytes(StringUtils.leftPad(this.header.getTerminalPhone(), 12, "0")));
        bb.writeShort(this.header.getFlowId());
        return bb;
}
**
"最佳实践":尽量使用内存池分配ByteBuf,效率相比非池化Unpooled.buffer()高很多,但是得注意释放,否则会内存泄漏
在ChannelPipeLine中我们可以使用ctx.alloc()或者channel.alloc()获取Netty默认内存分配器,
其他地方不一定要建立独有的内存分配器,可以通过ByteBufAllocator.DEFAULT获取,跟前面获取的是同一个(不特别配置的话)。
**

这里当我们将响应转化为ByteBuf写出去的时候,此时并不知道消息体的具体长度,所有此时我们先占住位置,回头再来写。

所有的消息都继承自DataPacket,我们挑出一个字段相对较多的-》 位置上报消息

然后我们建立位置上报消息的数据结构,先看位置消息的格式

建立结构如下:

@Data
public class LocationMsg extends DataPacket {

    private int alarm; //告警信息 4字节
    private int statusField;//状态 4字节
    private float latitude;//纬度 4字节
    private float longitude;//经度 4字节
    private short elevation;//海拔高度 2字节
    private short speed; //速度 2字节
    private short direction; //方向 2字节
    private String time; //时间 6字节BCD

    public LocationMsg(ByteBuf byteBuf) {
        super(byteBuf);
    }
    
    @Override
    public void parseBody() {
        ByteBuf bb = this.byteBuf;
        this.setAlarm(bb.readInt());
        this.setStatusField(bb.readInt());
        this.setLatitude(bb.readUnsignedInt() * 1.0F / 1000000);
        this.setLongitude(bb.readUnsignedInt() * 1.0F / 1000000);
        this.setElevation(bb.readShort());
        this.setSpeed(bb.readShort());
        this.setDirection(bb.readShort());
        this.setTime(BCD.toBcdTimeString(readBytes(6)));
    }
}

所有的消息如果没有自己的应答的话,需要默认应答,默认应答格式如下

@Data
public class CommonResp extends DataPacket {

    private short replyFlowId; //应答流水号 2字节
    private short replyId; //应答 ID  2字节
    private byte result;    //结果 1字节

    public CommonResp() {
        this.getHeader().setMsgId(JT808Const.SERVER_RESP_COMMON);
    }

    @Override
    public ByteBuf toByteBufMsg() {
        ByteBuf bb = super.toByteBufMsg();
        bb.writeShort(replyFlowId);
        bb.writeShort(replyId);
        bb.writeByte(result);
        return bb;
    }
}

3.2 构建编/解码器

解码器

前面协议可以看到,标识位为0x7e,所以我们第一个解码器可以用Netty自带的DelimiterBasedFrameDecoder,其中的delimiters自然就是0x7e了。(Netty有很多自带的编解码器,建议先确认Netty自带的不能满足需求,再自己自定义)

经过DelimiterBasedFrameDecoder帮我们截断之后,信息就到了我们自己的解码器中了,我们的目的是将ByteBuf转化为我们前面定义的数据结构。 定义解码器

public class JT808Decoder extends ByteToMessageDecoder {
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    
    }
}

第一步:转义还原,转义规则如下

0x7d 0x01 -> 0x7d

0x7d 0x02 -> 0x7e

public ByteBuf revert(byte[] raw) {
        int len = raw.length;
        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(len);//DataPacket parse方法回收
        for (int i = 0; i < len; i++) {
            if (raw[i] == 0x7d && raw[i + 1] == 0x01) {
                buf.writeByte(0x7d);
                i++;
            } else if (raw[i] == 0x7d && raw[i + 1] == 0x02) {
                buf.writeByte(0x7e);
                i++;
            } else {
                buf.writeByte(raw[i]);
            }
        }
        return buf;
    }

第二步:校验

    byte pkgCheckSum = escape.getByte(escape.writerIndex() - 1);
    escape.writerIndex(escape.writerIndex() - 1);//排除校验码
    byte calCheckSum = JT808Util.XorSumBytes(escape);
    if (pkgCheckSum != calCheckSum) {
        log.warn("校验码错误,pkgCheckSum:{},calCheckSum:{}", pkgCheckSum, calCheckSum);
        ReferenceCountUtil.safeRelease(escape);//一定不要漏了释放
        return null;
    }

第三步:解码

 public DataPacket parse(ByteBuf bb) {
        DataPacket packet = null;
        short msgId = bb.getShort(bb.readerIndex());
        switch (msgId) {
            case TERNIMAL_MSG_HEARTBEAT:
                packet = new HeartBeatMsg(bb);
                break;
            case TERNIMAL_MSG_LOCATION:
                packet = new LocationMsg(bb);
                break;
            case TERNIMAL_MSG_REGISTER:
                packet = new RegisterMsg(bb);
                break;
            case TERNIMAL_MSG_AUTH:
                packet = new AuthMsg(bb);
                break;
            case TERNIMAL_MSG_LOGOUT:
                packet = new LogOutMsg(bb);
                break;
            default:
                packet = new DataPacket(bb);
                break;
        }
        packet.parse();
        return packet;
    }

然后我们将消息out.add(msg)就可以让消息到我们的业务Handler中了。

编码器

编码器需要讲我们的DataPacket转化为ByteBuf,然后再转义发送出去。 定义编码器


public class JT808Encoder extends MessageToByteEncoder<DataPacket> {
    protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out) throws Exception {
    
    }
}

第一步:转换

ByteBuf bb = msg.toByteBufMsg();

还记得我们DataPacket转换header时占用了4个字节等到后面覆盖吗

        bb.markWriterIndex();//标记一下,先到前面去写覆盖的,然后回到标记写校验码
        short bodyLen = (short) (bb.readableBytes() - 12);
        short bodyProps = createDefaultMsgBodyProperty(bodyLen);
        //覆盖占用的4字节
        bb.writerIndex(0);
        bb.writeShort(msg.getHeader().getMsgId());
        bb.writeShort(bodyProps);
        bb.resetWriterIndex();
        bb.writeByte(JT808Util.XorSumBytes(bb));

第二步:转义

 public ByteBuf escape(ByteBuf raw) {
        int len = raw.readableBytes();
        ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(len + 12);//假设最多有12个需要转义
        buf.writeByte(JT808Const.PKG_DELIMITER);
        while (len > 0) {
            byte b = raw.readByte();
            if (b == 0x7e) {
                buf.writeByte(0x7d);
                buf.writeByte(0x02);
            } else if (b == 0x7d) {
                buf.writeByte(0x7d);
                buf.writeByte(0x01);
            } else {
                buf.writeByte(b);
            }
            len--;
        }
        ReferenceCountUtil.safeRelease(raw);
        buf.writeByte(JT808Const.PKG_DELIMITER);
        return buf;
    }
**
"最佳实践":我们这里返回ByteBuf是写出去的,所以采用directBuffer效率更高
**

转义完成,就直接发送出去了,当然不能忘了释放。

        ByteBuf escape = escape(bb);
        out.writeBytes(escape);
        ReferenceCountUtil.safeRelease(escape);

3.3 构建业务Handler

解码器中我们返回的是DataPacket对象,所以编写Handler此时我们有两种选择:

一种是定义一个Handler接收DataPacket然后判断具体类型,如下图

@Component
@ChannelHandler.Sharable
public class JT808ServerHandler extends SimpleChannelInboundHandler<DataPacket> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DataPacket msg) throws Exception {
        log.debug(msg.toString());
        if (msg instanceof AuthMsg || msg instanceof HeartBeatMsg || msg instanceof LocationMsg || msg instanceof LogOutMsg) {
            CommonResp resp = CommonResp.success(msg, getFlowId(ctx));
            ctx.writeAndFlush(resp);
        } else if (msg instanceof RegisterMsg) {
            RegisterResp resp = RegisterResp.success(msg, getFlowId(ctx));
            ctx.writeAndFlush(resp);
        }
    }

}

另一种是每个DataPacket的子类型都定义一个Handler,如下图

public class LocationMsgHandler extends SimpleChannelInboundHandler<LocationMsg> 
public class HeartBeatMsgHandler extends SimpleChannelInboundHandler<HeartBeatMsg> 
public class RegisterMsgHandler extends SimpleChannelInboundHandler<LogOutMsg> 

这里我选择第二种,一个原因是因为代码风格好,另一个原因后面会具体说明。

这里列举一个LocationMsgHandler的详细代码,将位置保存到数据库然后回复设备


@Slf4j
@Component
@ChannelHandler.Sharable
public class LocationMsgHandler extends BaseHandler<LocationMsg> {

    @Autowired
    private LocationRepository locationRespository;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LocationMsg msg) throws Exception {
        log.debug(msg.toString());
        locationRespository.save(LocationEntity.parseFromLocationMsg(msg));
        CommonResp resp = CommonResp.success(msg, getSerialNumber(ctx.channel()));
        write(ctx, resp);
    }
}

BaseHandler继承SimpleChannelInboundHandler ,里面定义了一些通用的方法,例如getSerialNumber()获取应答的流水号

    private static final AttributeKey<Short> SERIAL_NUMBER = AttributeKey.newInstance("serialNumber");

    public short getSerialNumber(Channel channel){
        Attribute<Short> flowIdAttr = channel.attr(SERIAL_NUMBER);
        Short flowId = flowIdAttr.get();
        if (flowId == null) {
            flowId = 0;
        } else {
            flowId++;
        }
        flowIdAttr.set(flowId);
        return flowId;
    }

我们将流水号存入Channel内部,方便维护。

3.4.Channel的高效管理方式

假设现在出现了一个需求,我们需要找到一个特定的连接发送一条消息,在我们这个项目里,特定指的是根据header中的手机号找到连接并发送消息。我们可以自己维护一个Map用来存放所有Channel,但是这样就浪费了Netty自带的DefaultChannelGroup提供的一系列方法了。所以我们改进一下,定义一个ChannelManager,内部采用DefaultChannelGroup维护Channel,自己维护手机号->ChannelId的映射关系。

@Component
public class ChannelManager {

    private static final AttributeKey<String> TERMINAL_PHONE = AttributeKey.newInstance("terminalPhone");
    
    private ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private Map<String, ChannelId> channelIdMap = new ConcurrentHashMap<>();

    private ChannelFutureListener remover = future ->
            channelIdMap.remove(future.channel().attr(TERMINAL_PHONE).get());


    public boolean add(String terminalPhone, Channel channel) {
        boolean added = channelGroup.add(channel);
        if (added) {
            channel.attr(TERMINAL_PHONE).set(terminalPhone);
            channel.closeFuture().addListener(remover);
            channelIdMap.put(terminalPhone, channel.id());
        }
        return added;
    }

    public boolean remove(String terminalPhone) {
        return channelGroup.remove(channelIdMap.remove(terminalPhone));
    }

    public Channel get(String terminalPhone) {
        return channelGroup.find(channelIdMap.get(terminalPhone));
    }

    public ChannelGroup getChannelGroup() {
        return channelGroup;
    }
}

我们定义了一个ChannelFutureListener,当channel关闭时,会执行这个回调,帮助我们维护自己的channelIdMap不至于太过臃肿,提升效率,DefaultChannelGroup中也是如此,所以不必担心Channel都不存在了 还占用着内存这种情况。另外我们可以将DefaultChannelGroup提供出去,以便某些时候进行广播。

3.5.一些改进

1.我们的LocationMsgHandler中出现了数据库操作

        locationRespository.save(LocationEntity.parseFromLocationMsg(msg));

然而在Netty中,默认情况下Handler由Reactor线程驱动,一旦阻塞就会大大降低并发能力,所以我们定义一个专门的EventExecutorGroup(不认识的话可以先理解为线程池),用来驱动耗时的Handler,只要在初始化Channel时指定即可。前面所说的每个DataPacket子类型定义一个Handler的另一个好处就体现在这里,我们可以让那些耗时的Handler用专门的业务线程池去驱动,而不耗时的Handler由默认的Reactor线程驱动,增加了灵活性。

        pipeline.addLast(heartBeatMsgHandler);
        pipeline.addLast(businessGroup,locationMsgHandler);//因为locationMsgHandler中涉及到数据库操作,所以放入businessGroup
        pipeline.addLast(authMsgHandler);
        pipeline.addLast(registerMsgHandler);
        pipeline.addLast(logOutMsgHandler);

这里可以利用定义Handler在pipeline中的顺序,提升一点性能,因为在pipeline中找到对应的handler时间复杂度是线性O(n),如果消息分类过多,这里最好的方式是使用策略模式将消息分类和Handler定义成Map或者Array,达到最佳性能。

2.接上面的,现在我们LocationMsgHandler由businessGroup驱动了,然而写响应的时候还是会移交给Reactor线程,所以为了减少一些判断提升略微的性能,我们可以将write(ctx, resp);改为

workerGroup.execute(() -> write(ctx, resp));

其中的workerGroup正是启动引导中的,我们借助Spring把它单独定义成了bean,用的时候直接注解引入即可

serverBootstrap.group(bossGroup, workerGroup)

3.借助Spring的力量我们可以将几乎所有的组件定义成单例,提升了略微的性能,除了编码器和解码器,因为他们有一些属性需要维护,不能定义为单例。

结束语

一直看到这里的朋友感谢你们的耐心,这是我第一次写文章,有错误的地方还请多多包涵。

另外将完全版代码奉上 zpsw/jt808-netty

这也是个人开源的第一个项目,如果对你有帮助,给个Star将不胜感激。

附上一些其他的Netty最佳实践

  • writeAndFlush不要一直调用, 是否可以通过调用write,并且在适当的时间flush,因为每次系统flush都是一次系统调用,如果可以的话write的调用次数也应该减少,因为它会经过整个pipeline(github.com/netty/netty…)
  • 如果你不是很关注write的结果,可以使用channel.voidPromise(),可以减少对象的创建
  • 一直写对于处理能力较弱的接受者来说,可能会引起OutMemoryError,关注channel.isWritable()和channelhandler中的cahnnelWritabilityChanged()将会很有帮助,channel.bytesBeforeUnwritable和channel.bytesBeforeWritable()同样值得关注
  • 关注write_buffer_high_water_mark和write_buffer_low_water_mark的配置, 例如high:32kb(default 64kb), low:8kb(default 32kb)
  • 可以通过channelpipeline 触发custome events (pipeline.fireUserEventTriggered(MyCustomEvent)), 可以在DuplexChannelHandler中处理相应的事件
  • Netty从4.1开始默认使用池化的pooledBytebuffer配置Channel
  • 永远优先使用direct buffer,除非你需要在channeloutboundhandler中操作byte[]时才使用heap buffer,这是经验法则
  • ByteBuf.forEachByte()比通过范围检查来查找更快,因为
    • 可以消除范围检查
    • 很多提前创建好的ByteBufProcessor对象而且可以共享
    • 更容易被JIT内联编译
    • 可以使用在ByteBuf中寻找特定的编码
  • 一些其他buffer技巧
    • alloc() 优于 Unpooled
    • slice(), duplicate() 优于 copy
    • 批量操作优于循环
  • 永远不要阻塞EventLoop
    • Thread.sleep()
    • CountDownLatch.await() 或者java.util.concurrent包下的任何阻塞操作
    • 密集的长时间计算
    • 需要花上一段时间的阻塞操作(例如数据库查询)
  • 通过EventLoop执行任务可以减少需要的线程数量并使其线程安全(只能是轻量级任务)
  • 尽可能重复使用EventLoopGroup
  • 代理类应用可以通过在双方的Channel中共享EventLoop达到最小的上下文切换开销,如下
  public class ProxyHandler extends ChannelInboundHandlerAdapter{
        public void channelActive(ChannelHandlerContext ctx){
                final channel inboundchannel = ctx.channel();
                Bootstrap  b = new Bootstrap();
                b.group(inboundchannel.eventLoop());
                ...
                ChannelFuture f = b.connect(remoteHost, remotePort);
                ...
            }
  • 当从Eventloop外部操作Channel时,尽可能合并操作,以减少唤醒和对象创建的开销
  //不推荐,会创建两个Runnable对象
  channel.write(msg1); 
  channel.writeAndFlush(msg3);
  //组合操作,推荐
  channel.eventLoop().execute(new Runnable(){
     public void run(){
  	    channel.write(msg1); 
  	    channel.writeAndFlush(msg3);
  }});	
  • ChannelHandlerContext的writeAndFlush优于Channel的,因为最短的路径获得最高的性能
  • 如果没有状态就共享ChannelHandlers(@ChannelHandler.Shareable)
  • 一旦某个ChannelHandler不再需要了,就应该立刻移除它,这样可以保持channelPipeLine尽可能短,减少通行的负载(例如UniPortHandler)
  • 尽量在Linux中使用Epoll
    • NIO下更少的GC
    • 更少的同步
    • 边缘触发
    • 支持TCP_CORK(通过Channel Option)

另外给新手安利一个网络调试工具NetAssist网络调试助手

再见