Netty空闲检测&Keepalive

4 阅读7分钟

前言

Netty的空闲检测和Keepalive机制都是为了确保客户端和服务器之间的连接仍然有效,防止连接断开。但它们在实现方式和原理上有所不同。

Netty的空闲检测机制是一种自定义的、基于应用层的机制。它主要通过定时发送和接收特定的消息(心跳包)来检测连接是否仍然处于活动状态。具体来说,Netty提供了IdleStateHandler类来实现心跳检测。在初始化ChannelPipeline时,我们可以添加IdleStateHandler实例,并设置读、写超时时间。当在指定时间内没有读或写操作时,IdleStateHandler会触发相应的事件。然后,我们可以添加一个自定义的事件处理类来处理这些事件,当检测到空闲状态时,发送心跳检测信息或者断开连接。

而Keepalive机制则是TCP/IP协议栈提供的一种机制,它依赖于操作系统实现。当TCP连接建立后,如果一段时间内(通常是2小时)双方都没有数据交互,那么操作系统会自动发送一个探测帧(keepalive probe)来查看对方是否还在线。如果对方无响应,操作系统会多次发送探测帧,直到确定连接已经断开。这种机制无需应用程序干预,但缺点是默认的心跳时间可能较长,不够灵活。

在Netty中,我们可以选择使用空闲检测机制或Keepalive机制,或者同时使用两者来确保连接的稳定性。具体选择哪种方式,需要根据应用的具体需求和网络环境来决定。例如,如果应用对连接的实时性要求较高,或者网络环境不稳定,可能需要使用更频繁的空闲检测;而如果应用主要处理长时间无交互的连接,或者对操作系统的默认心跳时间满意,那么可以选择使用Keepalive机制。


空闲检测

Netty中使用了IdleStateHandler来进行空闲检测,客户端和服务端保持长连接需要通过一个检测机制来确保链接的有效性,在链接处于空闲状态或者一方宕机又或者网络延迟,在这种情况下就要确认链接是否有效,无效链接就需要客户端和服务端都关闭当前链路,释放文件句柄资源。

IdleStateHandler 有三个主要参数:

  • readerIdleTime:一个IdleStateEvent,其状态是IdleState.READER_IDLE时的指定时间段内,没有执行读操作将被触发。 指定0以禁用。
  • writerIdleTime:一个IdleStateEvent,其状态是IdleState.WRITER_IDLE时的指定时间段内,没有执行写操作将被触发。 指定0以禁用。
  • allIdleTime:一个IdleStateEvent,其状态是IdleState.ALL_IDLE时的指定时间段内,没有进行读取和写入都将被触发。 指定0以禁用

其默认单位是秒,当然也有可以指定单位的构造,还能附带考虑buf处理数据时间的构造,这里就不多介绍了。

    public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

通常我们在实现idle检测时,需要对IdleStateHandler 进行扩展,来满足我们的业务需求,代码如下:

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
/**
 * idle检测
 *
 */
@Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {
    public ServerIdleCheckHandler() {
        super(0, 0, 120, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        if (evt == IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT) {
            log.info("idle check happen, so close the connection");
            ctx.close();
            return;
        }
        super.channelIdle(ctx, evt);
    }
}

实现机制:

  • IdleStateHandler 的实现基于 EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候,通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。

  • 内部有 3 个定时任务,分别对应读事件、写事件、读写事件。通常用户监听读写事件就足够了。

   private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

    private final class WriterIdleTimeoutTask extends AbstractIdleTask {

        WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long lastWriteTime = IdleStateHandler.this.lastWriteTime;
            long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
            if (nextDelay <= 0) {
                // Writer is idle - set a new timeout and notify the callback.
                writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstWriterIdleEvent;
                firstWriterIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }

                    IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Write occurred before the timeout - set a new timeout with shorter delay.
                writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

    private final class AllIdleTimeoutTask extends AbstractIdleTask {

        AllIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long nextDelay = allIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
            }
            if (nextDelay <= 0) {
                // Both reader and writer are idle - set a new timeout and
                // notify the callback.
                allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstAllIdleEvent;
                firstAllIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }

                    IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Either read or write occurred before the timeout - set a new
                // timeout with shorter delay.
                allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

Keepalive机制

Netty本身并不直接提供Keepalive机制,因为Keepalive是TCP/IP协议栈的一部分,由操作系统实现。TCP Keepalive机制用于检测连接的死活,通过在一定时间内没有数据交互时,自动发送探测包来确认对方是否仍然在线。如果探测包没有得到响应,操作系统会多次尝试,直到确定连接已经断开。

在Netty中,你无法直接配置或控制TCP Keepalive机制,因为它是由底层操作系统和网络栈管理的。然而,你可以通过配置操作系统的网络设置来启用或调整TCP Keepalive的相关参数。

如果你希望在Netty中实现类似Keepalive的心跳检测机制,你需要在应用层自己实现。这通常通过定时发送和接收特定的心跳消息来完成。Netty提供了强大的定时器和事件驱动模型,使得在应用层实现心跳检测变得相对简单。你可以使用Netty的定时任务功能(如HashedWheelTimer)来定期发送心跳消息,并在接收到心跳响应时进行相应的处理。

总结来说,Netty本身不直接提供Keepalive机制,但你可以在应用层使用Netty的功能来实现类似的心跳检测机制,以确保连接的稳定性。同时,你也可以通过配置操作系统来启用和调整TCP Keepalive的相关参数。选择使用哪种方式取决于你的具体需求和网络环境。

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT) {
            log.info("all idle happen. so need to send keepalive to keep connection not closed by server");
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
        }
        super.userEventTriggered(ctx, evt);
    }
}

总结

Keepalive模式与空闲检测的结合在网络连接管理中具有显著的优势。以下是它们结合使用的主要优势:

  • 提高连接的稳定性:Keepalive模式通过定期发送探测帧来检查连接是否仍然有效,这有助于及时发现并处理因网络故障或对方主机崩溃导致的连接中断。而空闲检测则可以监测长时间没有数据传输的连接,确保这些连接不会因为长时间的静默而被误判为已断开。两者的结合使用可以大大提高连接的稳定性,减少不必要的连接中断。
  • 优化资源利用:空闲检测机制有助于识别并关闭长时间未使用的连接,从而释放服务器资源,如内存和处理能力。这对于资源有限的服务器环境尤为重要,可以避免资源浪费,提高资源利用率。同时,Keepalive模式可以减少因频繁创建和关闭连接而产生的开销,进一步提高系统性能。
  • 客户端加上空闲检测+keepalive 客户端在设定的时间内不发送数据就发送一个心跳信号。避免连接被断开,启用不频繁的心跳。
  • 提高用户体验:对于依赖网络连接的应用程序来说,稳定的连接和良好的性能是提升用户体验的关键因素。Keepalive模式与空闲检测的结合使用可以确保应用程序在网络环境不稳定时仍然能够保持有效的连接,减少因连接问题导致的用户体验下降。