阅读 788

服务如何优雅关闭

背景

很多时候服务都有平滑退出的需求,例如RPC服务在停止之后需要从注册服务摘除节点、从消息队列已经消费的消息需要正常处理完成等。一般地我们希望能让服务在退出前能执行完当前正在执行的任务,这个时候就需要我们在JVM关闭的时候运行一些清理现场的代码。

方案

ShutdownHook

JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,允许用户注册一个JVM关闭的钩子。这个钩子可以在以下几种场景被调用:

  • 程序正常退出;
  • 使用System.exit();
  • 终端使用Ctrl+C触发的终端;
  • 系统关闭;
  • 使用kill pid命令干掉进程;

一般地发布系统会通过kill命令来停止服务。这个时候服务可以接收到关闭信号并执行钩子程序进行清理工作。

场景示例

假设以下场景,有个生产者往内部队列发消息,有个消费者读取队列消息并执行。当我们停止服务的时候,希望队列的消息都能正常处理完成,代码示例如下:

/**
 * 服务关闭测试
 */
public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生产任务
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消费任务
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消费速度比生产速度稍慢,模拟积压情况
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先启动消费
        consumeThread.start();
        // 再启动生产
        producerThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("try close...");
            // 先关闭生产
            producerTask.setStopped();
            // 再关闭消费
            consumeTask.setStopped();
            try {
                System.out.println("close wait...");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            }
            System.out.println("close finished...");
        }));
    }
}
复制代码

执行结果如下所示,可以看到服务关闭的时候钩子程序执行成功,在等待消息处理完成后才退出。

钩子示例结果

潜在问题

在使用ShutdownHook的时候,我们往往控制不了钩子的执行顺序。Java.Runtime.addShutdownHook是对外公开的API接口。在前述场景里面,假若是独立注册钩子,在更复杂的项目里面是不是就没办法保证执行的顺序呢?曾在实际场景中遇到过这样的问题,从kafka队列消费消息,交给内部线程池去处理,我们自定义了线程池的拒绝策略为一直等待(为了保证消息确实处理),然后就会偶尔出现服务无法关闭的问题。原因正是线程池先被关闭,kafka队列却还在消费消息,导致消费线程一直在等待。

Signal

Java同时提供了signal信号机制,我们的服务也可以接收到关闭信号。

使用Signal机制有以下原因:

  • ShutdownHook执行顺序无法保障,第三方组件也可能注册,导致业务自定义的退出流程依赖的资源会被提前关闭和清理;
  • Signal是非公开API,第三方组件基本很少使用,我们可以在内部托管服务关闭的执行顺序;
  • 在完成清理工作后可以执行exit调用,保证资源清理不会影响ShutdownHook的退出清理逻辑;

这里核心的原因还是希望能完全保证服务关闭的顺序,避免出现问题。我们在服务内部按顺序维护关闭任务,上述代码调整后如下所示:

public class TermHelper {

    private static AtomicBoolean signalTriggered = new AtomicBoolean(false);
    private static AtomicBoolean stopping = new AtomicBoolean(false);
    private static AtomicBoolean registeredHolder = new AtomicBoolean(false);

    private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>();

    private static void tryRegisterOnlyOnce() {
        boolean previousRegistered = registeredHolder.getAndSet(true);
        if (!previousRegistered) {
            registerTermSignal();
        }
    }

    private static void registerTermSignal() {
        Signal.handle(new Signal("TERM"), signal -> {
            boolean previous = signalTriggered.getAndSet(true);
            if (previous) {
                System.out.println("Term has been triggered.");
                return;
            }
            termAndExit();
        });
    }

    public static void addTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addLast(runnable);
    }

    public static void addFirstTerm(Runnable runnable) {
        tryRegisterOnlyOnce();
        terms.addFirst(runnable);
    }

    private static void termAndExit() {
        try {
            Thread current = Thread.currentThread();
            current.setName(current.getName() + "(退出线程)");
            System.out.println("do term cleanup....");
            doTerm();
            System.out.println("exit success.");
            System.exit(0);
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public static void doTerm() {
        boolean previousStopping = stopping.getAndSet(true);
        if (previousStopping) {
            System.out.println("Term routine already running, wait until done!");
            return;
        }
        for (Runnable runnable : terms) {
            try {
                System.out.println("execute term runnable : " + runnable);
                runnable.run();
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
复制代码

TermHelper内部使用队列维护关闭任务,在服务关闭的时候串行执行相关任务,保证其顺序。我们也可以在此基础上维护关闭任务的优先级,实现按优先级高低依次执行关闭任务。

public class ShutDownTest {

    private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50);

    private static AtomicLong taskId = new AtomicLong(0);

    // 生产任务
    private static class ProduceTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get()) {
                long element = taskId.incrementAndGet();
                queue.add(element);
                System.out.println("add element : " + element);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop producer.");
        }
    }

    // 消费任务
    private static class ConsumeTask implements Runnable {

        private AtomicBoolean stopped = new AtomicBoolean(false);

        @Override
        public void run() {
            while (!stopped.get() || queue.size() > 0) {
                try {
                    long element = queue.take();
                    System.out.println("consume element : " + element);
                    doWork();
                } catch (InterruptedException e) {
                }
            }
        }

        private void doWork() {
            try {
                // 消费速度比生产速度稍慢,模拟积压情况
                Thread.sleep(60);
            } catch (InterruptedException e) {
            }
        }

        public void setStopped() {
            stopped.compareAndSet(false, true);
            System.out.println("stop consumer.");
        }
    }

    public static void main(String[] args) {
        final ProduceTask producerTask = new ProduceTask();
        final Thread producerThread = new Thread(producerTask);

        final ConsumeTask consumeTask = new ConsumeTask();
        Thread consumeThread = new Thread(consumeTask);

        // 先启动消费
        consumeThread.start();
        // 再启动生产
        producerThread.start();

        TermHelper.addFirstTerm(() -> {
            // 关闭生产
            producerTask.setStopped();
        });

        TermHelper.addTerm(() -> {
            // 再关闭消费
            consumeTask.setStopped();
        });

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("shut down hook...");
        }));
    }
}
复制代码

执行结果如下所示。需要注意的是我们只注册了TERM信号,所以需要通过kill -TERM的方式关闭服务。从图中可以看到我们测试的生产者和消费者都正常退出了,内部的消息最后也处理完成。

image-20190407212611347

小结

若需要平滑停止服务,我们一般可以通过ShutdownHook和Signal来实现。ShutdownHook一般比较难保证关闭任务的执行顺序,这个时候可以考虑使用Signal机制来完全托管我们关闭服务的执行顺序。

关注下面的标签,发现更多相似文章
评论