Fork/join实现Excel批量导入去重

3,254 阅读4分钟

1.概述

最近工作中,有需要使用Fork/join 优化 的业务,本文我已实际案例出发,代码落地的形式来实现优化。

Fork/Join 是什么?

Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。


2. 业务逻辑

后台Excel批量导入学生,导入的时候需要与数据库中的数据进行比较,判断username是否重复,如果重复则不导入。(ps:实际业务中可能是其他数据进行去重,逻辑是一样的,本文中用学生数据作为案例)

3.已知

数据库 student表 10w条数据

excel中 1w条数据

3.思路

3.1 第一版本思路

为了减少数据库的压力,我们

第一步把所有数据读取到Java内存中


第二步 excel 数据 本身去重


第三步 遍历excel数据,每一个username拿去与dbData判断是否有相等的


在 数据库10w excel 1w 条的数据情况下 筛选耗时 8049ms

3.2 第二步思路(加入Fork/join)

结合业务,我的思路是选择在上述的第三步优化,把之前的 1w条数据筛选 拆分成2个 5000的任务 同时进行

这样在其他因素不变的情况下 ,第三步筛选的效率会提高大概一倍

实现

第一步 编写一个有返回值的任务类,定义好

THRESHOLD_NUM (单个线程处理数据个数)

start ,end (下标)

第二步 在 compute 实现 里面实现逻辑任务的拆分

当处理数据小于等于 单个线程处理的数据值 ,则进行去重的业务

当处理数据小于等于 单个线程处理的数据值 ,则需要对任务的拆分与结果集的合并(ps:递归调用)

第三步业务层调用

在 数据库10w excel 1w 条的数据情况下 筛选耗时 4319ms

效率提高接近一倍

4.代码

package com.itbbs.service;


import com.itbbs.ArrayListUtil;
import com.itbbs.DataUtil;
import com.itbbs.pojo.Student;
import com.itbbs.task.DistinctTask;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

/**
 * 学生操作service
 * tjx
 */
public class StudentService {


    /**
     * 批量导入学生
     */
    public void importStudent(List<Student> dbData,List<Student> excelData){

        // excel自身去重
        excelData = excelData.stream()
                .filter(ArrayListUtil.distinctByKey(Student::getUsername)) //根据username 去重
                .collect(Collectors.toList());
        //不重复数据
        List<Student> repetitionData = new ArrayList<>();
        long s = System.currentTimeMillis();
        //遍历所以excel数据
        for (Student data : excelData) {
            String username = data.getUsername();
            //判断是否存在于 dbData中
            if (!ArrayListUtil.isInclude(dbData, username)) {
                //如果不存在则添加到不重复集合中
                repetitionData.add(data);
            }
        }

        long e = System.currentTimeMillis();
        System.out.println("筛选耗时:"+(e-s)+"ms");
        //在数据库不重复的数据里面 筛选出不重复数据
        repetitionData =repetitionData.stream()
                .sorted(Comparator.comparing(Student::getUsername))//根据username 排序
                .collect(Collectors.toList());

//        repetitionData.forEach(p-> System.out.println(p.getUsername()));

    }


    /**
     * 批量导入学生
     */
    public void importStudent2(List<Student> dbData,List<Student> excelData){
        // 自身去重
        excelData = excelData.stream()
                .filter(ArrayListUtil.distinctByKey(Student::getUsername)) //根据username 去重
                .collect(Collectors.toList());

        long s = System.currentTimeMillis();
        //获取不重复数据
        ForkJoinPool fjp = new ForkJoinPool();
        DistinctTask task  = new DistinctTask(0,excelData.size(),dbData,excelData);
        List<Student> repetitionData = fjp.invoke(task);
        long e = System.currentTimeMillis();
        System.out.println("筛选耗时:"+(e-s)+"ms");
        //在数据库不重复的数据里面 筛选出不重复数据
        repetitionData =repetitionData.stream()
                .sorted(Comparator.comparing(Student::getUsername))//根据username 排序
                .collect(Collectors.toList());
//        repetitionData.forEach(p-> System.out.println(p.getUsername()));
    }
    public static void main(String[] args) {
        // 模拟获取数据库
        List<Student> dbData = DataUtil.getDbData();

        // 模拟获取excel数据
        List<Student> excelData = DataUtil.getExcelData();
        new StudentService().importStudent(dbData,excelData);
        new StudentService().importStudent2(dbData,excelData);
    }
}
package com.itbbs.task;

import com.itbbs.ArrayListUtil;
import com.itbbs.pojo.Student;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

public class DistinctTask extends RecursiveTask<List<Student>> {

    //单个任务处理数据
    private static final int THRESHOLD_NUM = 5000;
    //下标
    private int start, end;

    //需要处理的数据
    private List<Student> dbData;
    private List<Student> excelData;

    public DistinctTask(int start, int end, List<Student> dbData, List<Student> excelData) {
        this.start = start;
        this.end = end;
        this.dbData = dbData;
        this.excelData = excelData;
    }

    @Override
    protected List<Student> compute() {
        //获取当前下标下的数据
        excelData = excelData.subList(start,end);
        //获取需要计算的数据量
        int size = excelData.size();
        if(size<=THRESHOLD_NUM){
            //计算
            List<Student> repetitionData = new ArrayList<>();
            //遍历所以excel数据
            for (Student data : excelData) {
                String username = data.getUsername();
                //判断是否存在于 dbData中
                if (!ArrayListUtil.isInclude(dbData, username)) {
                    //如果不存在则添加到不重复集合中
                    repetitionData.add(data);
                }
            }
            return repetitionData;





        }else{
            //拆分
            int middle = (start + end) / 2;
            DistinctTask left = new DistinctTask(start,middle,dbData,excelData);
            DistinctTask right = new DistinctTask(middle+1,end,dbData,excelData);
            //执行子任务
            left.fork();
            right.fork();
            //获取子任务结果
            //join() 方法会阻塞到结果算出来
            List<Student> lResult = left.join();
            List<Student> rResult = right.join();
            //何并结果
            lResult.addAll(rResult);
            return lResult;
        }
    }
}
package com.itbbs;

import com.itbbs.pojo.Student;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * 数据源 工具类
 * tjx
 */
public class DataUtil {


    /**
     * 模拟 获取数据库
     * @return
     */
    public static List<Student> getDbData(){
        List<Student> result = new ArrayList<Student>();
        Random random = new Random();
        for (int i = 0; i <100000 ; i++) {
            Student student = new Student();
            student.setUsername(random.nextInt(99)+"");
            result.add(student);
        }
        return result;
    }


    /**
     * 模拟 获取excel数据
     * @return
     */
    public static List<Student> getExcelData(){
        List<Student> result = new ArrayList<Student>();
        Random random = new Random();
        for (int i = 0; i <10000 ; i++) {
            Student student = new Student();
            student.setUsername(random.nextInt(100000)+"");
            result.add(student);
        }
        return result;
    }


}
package com.itbbs;

import com.itbbs.pojo.Student;
import org.apache.commons.lang3.ArrayUtils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;

public class ArrayListUtil {

    /**	 * 去重复元素	 * @param keyExtractor	 * @param <T>	 * @return	 */	public static <T> Predicate<T> distinctByKey(Function<? super T, Object> keyExtractor) {
        Map<Object, Boolean> map = new ConcurrentHashMap<>();		return t -> map.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
    }

    /**
     * 判断 list 是否包含 targetValue
     * @param list
     * @param targetValue
     * @return
     */
    public static boolean isInclude(List<Student> list, String targetValue){
        return  ArrayUtils.contains(list.toArray(),targetValue);
    }
}
package com.itbbs.pojo;

public class Student {

    private String username;

    private String password;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof Student) {
            Student student = (Student) obj;
            return (username.equals(student.username));
        }
        return super.equals(obj);
    }

    @Override
    public int hashCode() {
        Student student = (Student) this;
        return student.username.hashCode();
    }
}