JMX可视化监控线程池

4,851 阅读7分钟

前两天阅读公司代码看到了用JMX监控定时任务信息和状态,JMX这个单词感觉很熟于是便去查阅了一下,并写了监控线程池的Demo

通过阅读本篇文章你将了解到:

  • JMX介绍
  • 线程池介绍
  • JMX监控线程池应用

什么是JMX

JMX简介

JMX(Java Management Extensions),监控管理框架,通过使用JMX可以监控和管理应用程序。JMX最常见的场景是监控Java程序的基本信息和运行情况,任何Java程序都可以开启JMX,然后使用JConsoleVisual VM进行预览

JMX架构

总共分为三层,分发层、代理层、设备层
分发层:根据不同的协议定义了对代理层进行各种操作的管理接口,简单的来说是监控指标的查看方式,可以是HTTP连接、RMI连接、SNMP连接
代理层:管理MBean,通过将MBean注册到代理层实现MBean的管理,除了注册MBean,还可以注册Adapter,代理层在应用中一般都是MBeanService
设备层:监控指标抽象出的类,可以分为以下几种:

  • Standard MBean
  • Dynamic MBean
  • Open MBean
  • Model MBean
  • MXBean

应用中一般使用Standard MBean比较多,所以这里只介绍Standard MBean,使用Standard MBean需要满足一定的规则,规则如下:

  • 定义一个接口,接口名必须为XXXXMBean的格式,必须MBean结尾
  • 如果接口为XXXXMBean,则接口实现类必须为MBean,否则程序将报错
  • 接口中通过getset方法表示监控指标是否可读、可写。比如getXXX()抽象方法,则XXX就是监控的指标,getXXX()表示XXX性能指标可读,setXXX()方法表示该监控指标可写
  • 参数和返回类型只能是简单的引用类型(如String)和基本数据类型,不可以是自定义类型,如果返回值为自定义类型可以选择MXBean

线程池简单介绍

线程池是线程的管理工具,通过使用线程池可以复用线程降低资源消耗、提高响应速度、提高线程的可管理性。如果在系统中大量使用线程池,就必须对线程池进行监控方便出错时定位问题。可以通过线程池提供的参数进行监控,线程池提供的参数如下:

方法 含义
getActiveCount 线程池中正在执行任务的线程数量
getCompletedTaskCount 线程池已完成的任务数量
getCorePoolSize 线程池的核心线程数量
getLargestPoolSize 线程池曾经创建过的最大线程数量
getMaximumPoolSize 线程池的最大线程数量
getPoolSize 线程池当前的线程数量
getTaskCount 线程池需要执行的任务数量

应用

介绍完JMX及线程池以后,写一个JMX监控线程池的Demo,总不能纸上谈兵吧

  • 定义线程池监控类:ThreadPoolMonitor.java

    public class ThreadPoolMonitor extends ThreadPoolExecutor {
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        /**
         * ActiveCount
         * */
        int ac = 0;
    
        /**
         * 当前所有线程消耗的时间
         * */
        private AtomicLong totalCostTime = new AtomicLong();
    
        /**
         * 当前执行的线程总数
         * */
        private AtomicLong totalTasks = new AtomicLong();
    
        /**
         * 线程池名称
         */
        private String poolName;
    
        /**
         * 最短 执行时间
         * */
        private long minCostTime;
    
        /**
         * 最长执行时间
         * */
        private long maxCostTime;
    
    
        /**
         * 保存任务开始执行的时间
         */
        private ThreadLocal<Long> startTime = new ThreadLocal<>();
    
        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
              Executors.defaultThreadFactory(), poolName);
        }
    
        public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory, String poolName) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            this.poolName = poolName;
        }
    
        public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
            return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }
    
        public static ExecutorService newCachedThreadPool(String poolName) {
            return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
        }
    
        public static ExecutorService newSingleThreadExecutor(String poolName) {
            return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
        }
    
        /**
         * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
         */
        @Override
        public void shutdown() {
            // 统计已执行任务、正在执行任务、未执行任务数量
            logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            super.shutdown();
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            // 统计已执行任务、正在执行任务、未执行任务数量
            logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
              this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
            return super.shutdownNow();
        }
    
        /**
         * 任务执行之前,记录任务开始时间
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            startTime.set(System.currentTimeMillis());
        }
    
        /**
         * 任务执行之后,计算任务结束时间
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            long costTime = System.currentTimeMillis() - startTime.get();
            startTime.remove();  //删除,避免占用太多内存
            //设置最大最小执行时间
            maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
            if (totalTasks.get() == 0) {
                minCostTime = costTime;
            }
            minCostTime = minCostTime < costTime ? minCostTime : costTime;
            totalCostTime.addAndGet(costTime);
            totalTasks.incrementAndGet();
    
            logger.info("{}-pool-monitor: " +
                          "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " +
                          "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                          "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                    this.poolName,
                    costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(),
                    this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                    this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
        }
    
        public int getAc() {
            return ac;
        }
    
        /**
         * 线程平均耗时
         *
         * @return
         * */
        public float getAverageCostTime() {
            return totalCostTime.get() / totalTasks.get();
        }
    
        /**
         * 线程最大耗时
         * */
        public long getMaxCostTime() {
            return maxCostTime;
        }
    
        /**
         * 线程最小耗时
         * */
        public long getMinCostTime() {
            return minCostTime;
        }
    
        /**
         * 生成线程池所用的线程,改写了线程池默认的线程工厂
         */
        static class EventThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            /**
             * 初始化线程工厂
             *
             * @param poolName 线程池名称
             */
            EventThreadFactory(String poolName) {
                SecurityManager s = System.getSecurityManager();
                group = Objects.nonNull(s) ? s.getThreadGroup() :   Thread.currentThread().getThreadGroup();
                namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
            }
    
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    }
    

    通过继承线程池来自定义线程池,并在构造函数中加入了poolName标明是哪一个线程池,同时重写了beforeExecuteafterExecuteterminated等方法,在beforeExecute方法中记录线程池执行的时间,在afterExecute方法中计算线程执行的耗时、最大耗时、最小耗时、平均耗时。重写线程池生成线程的方法,指定了生成的线程名

  • 定义一个MBeanThreadPoolParamMBean.java
    MBean其实并不是一个实体类而是一个接口,里面定义了监控的指标

    public interface ThreadPoolParamMBean {
        /**
         * 线程池中正在执行任务的线程数量
         *
         * @return
         */
        int getActiveCount();
    
        /**
         * 线程池已完成的任务数量
         *
         * @return
         */
        long getCompletedTaskCount();
    
        /**
         * 线程池的核心线程数量
         *
         * @return
         */
        int getCorePoolSize();
    
        /**
         * 线程池曾经创建过的最大线程数量
         *
         * @return
         */
        int getLargestPoolSize();
    
        /**
         * 线程池的最大线程数量
         *
         * @return
         */
        int getMaximumPoolSize();
    
        /**
         * 线程池当前的线程数量
         *
         * @return
         */
        int getPoolSize();
    
        /**
         * 线程池需要执行的任务数量
         *
         * @return
         */
        long getTaskCount();
    
        /**
         * 线程最大耗时
         *
         * @return
         * */
        long getMaxCostTime();
    
        /**
         * 线程最小耗时
         *
         * @return
         * */
        long getMinCostTime();
    
        /**
         * 线程平均耗时
         *
         * @return
         * */
        float getAverageCostTime();
    }
    
  • 定义一个MBean实现类:ThreadPoolParam.java
    定义的是静态MBean,所以接口实现类必须满足规定,即xxxMBean,实现类为xxx

    public class ThreadPoolParam implements ThreadPoolParamMBean  {
        private ThreadPoolMonitor threadPoolMonitor;
    
        public ThreadPoolParam(ExecutorService es) {
            this.threadPoolMonitor = (ThreadPoolMonitor) es;
        }
    
        /**
         * 线程池中正在执行任务的线程数量
         *
         * @return
         */
        @Override
        public int getActiveCount() {
            return threadPoolMonitor.getAc();
        }
    
        /**
         * 线程池已完成的任务数量
         *
         * @return
         */
        @Override
        public long getCompletedTaskCount() {
            return threadPoolMonitor.getCompletedTaskCount();
        }
    
        /**
         * 线程池的核心线程数量
         *
         * @return
         */
        @Override
        public int getCorePoolSize() {
            return threadPoolMonitor.getCorePoolSize();
        }
    
        /**
         * 线程池曾经创建过的最大线程数量
         *
         * @return
         */
        @Override
        public int getLargestPoolSize() {
            return threadPoolMonitor.getLargestPoolSize();
        }
    
        /**
         * 线程池的最大线程数量
         *
         * @return
         */
        @Override
        public int getMaximumPoolSize() {
            return threadPoolMonitor.getMaximumPoolSize();
        }
    
        /**
         * 线程池当前的线程数量
         *
         * @return
         */
        @Override
        public int getPoolSize() {
            return threadPoolMonitor.getPoolSize();
        }
    
        /**
         * 线程池需要执行的任务数量
         *
         * @return
         */
        @Override
        public long getTaskCount() {
            return threadPoolMonitor.getTaskCount();
        }
    
        /**
         * 线程最大耗时
         *
         * @return
         * */
        @Override
        public long getMaxCostTime() {
            return threadPoolMonitor.getMaxCostTime();
        }
    
        /**
         * 线程最小耗时
         *
         * @return
         * */
        @Override
        public long getMinCostTime() {
            return threadPoolMonitor.getMinCostTime();
        }
    
        /**
         * 线程平均耗时
         *
         * @return
         * */
        @Override
        public float getAverageCostTime() {
            return threadPoolMonitor.getAverageCostTime();
        }
    }
    

    监控的参数指标通过线程池得到

  • 测试类:Test.java

    public class Test {
        private static Random random = new Random();
        public static void main(String[] args) throws MalformedObjectNameException,  InterruptedException {
            ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
            ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);
    
            ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
            ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);
    
            MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam"));
            MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam"));
    
            //http连接的方式查看监控任务
            HtmlAdaptor.start();
    
            executeTask(es1);
            executeTask(es2);
            Thread.sleep(1000 * 60 * 60);
        }
    
        private static void executeTask(ExecutorService es) {
            new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    int temp = i;
                    es.submit(() -> {
                        //随机睡眠时间
                        try {
                            Thread.sleep(random.nextInt(60) * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(temp);
                    });
                }
            }).start();
        }
    }
    

    说明:

    • MBeanServerUtil.registerMBean()注册监控的类
    • HtmlAdaptor.start()开启HTTP连接的方式查看监控任务

    启动程序后打开http://localhost:8082/如下图:

点击test-pool-1下的type=threadPoolParam

通过刷新获取线程池最新的监控指标 test-pool-1type=threadPoolParam这些属性是在ObjectName中定义的属性值

总结

使用JMX监控线程池只是JMX一个功能,本篇文章只是学以致用,更多有关JMX以及线程池的内容可以查阅其他资料。文章若有错误欢迎指正

最后附:项目代码,欢迎forkstar,【我都划重点了就star一下】