Master-Worker模式

3,991 阅读2分钟

模式介绍

Master-Worker模式是常用的并行设计模式。核心思想是,系统由两个角色组成,Master和Worker,Master负责接收和分配任务,Worker负责处理子任务。任务处理过程中,Master还负责监督任务进展和Worker的健康状态;Master将接收Client提交的任务,并将任务的进展汇总反馈给Client
如下图所示:





Worker:用于实际处理一个任务;

Task:具体任务实体

Master:任务的分配和最终结果的合成;
Main:启动程序,调度开启Master

代码实现

/**
 * @author mouliu
 * @create 2018-04-07-下午10:03
 */
public class Task {

    private int id;
    private int price ;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }
}
/**
 * @author mouliu
 * @create 2018-04-09-上午9:22
 */
public class Main {
    public static void main(String[] args){
        Master master = new Master(new Worker(),20);

        Random r = new Random();
        for (int i=1;i<=100;i++){
            Task t = new Task();
            t.setId(i);
            t.setPrice(r.nextInt(1000));
            master.submit(t);
        }
        master.execute();
        long start = System.currentTimeMillis();
        while (true){
            if (master.isComplete()){
                long end = System.currentTimeMillis()-start;
                int priceResult = master.getResult();
                System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
                break;
            }
        }
    }
}
/**
 * @author mouliu
 * @create 2018-04-07-下午10:07
 */
public class Worker implements Runnable{
    private ConcurrentLinkedQueue<Task>workQueue;
    private ConcurrentHashMap<String,Object>resultMap;

    public void setWorkQueue(ConcurrentLinkedQueue<Task>workQueue){
        this.workQueue = workQueue;
    }

    public void setResultMap(ConcurrentHashMap<String,Object> resultMap){
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while (true){
            Task input = this.workQueue.poll();
            if (input==null){
                break;
            }
            Object output = handle(input);
            this.resultMap.put(Integer.toString(input.getId()),output);
        }
    }

    private Object handle(Task input){
        Object output = null;
        try {
            Thread.sleep(500);
            output = input.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }
}
/**
 * @author mouliu
 * @create 2018-04-07-下午10:02
 */
public class Master {
    //1、有一个盛放任务的容器
    private ConcurrentLinkedQueue<Task>workQueue =
            new ConcurrentLinkedQueue<>();
    //2、需要有一个盛放worker的集合
    private HashMap<String,Thread> workers = new HashMap<>();

    //3、需要有一个盛放每一个worker执行任务的结果集合
    private ConcurrentHashMap<String,Object> resultMap=
            new ConcurrentHashMap<>();

    //4、构造方法(创建给定数量的线程)
    Master(Worker worker,int workerCount){
        worker.setWorkQueue(this.workQueue);
        worker.setResultMap(this.resultMap);

        for (int i=0;i<workerCount;i++){
            this.workers.put(Integer.toString(i),new Thread(worker));
        }
    }
    //5、需要一个提交任务的方法
    public void submit(Task task){
        this.workQueue.add(task);
    }
    //6、需要有一个执行的方法,启动所有的worker方法去执行任务
    public void execute(){
        for (Map.Entry<String,Thread>me:workers.entrySet()){
            me.getValue().start();
        }
    }
    //7、判断是否运行结束的方法
    public boolean isComplete(){
        for (Map.Entry<String,Thread>me:workers.entrySet()){
            if (me.getValue().getState()!=Thread.State.TERMINATED){
                return  false;
            }
        }
        return true;
    }
    //8、计算结果方法
    public int getResult(){
        int priceResult = 0;
        for (Map.Entry<String,Object>me:resultMap.entrySet()){
            priceResult+=(Integer) me.getValue();
        }
        return priceResult;
    }
}

总结:

Master-Worker模式可以将大任务划分为小任务的场景,是一种分流处理的设计理念。通过多线程或者多进程多机器的模式,可以将小任务处理分发给更多的CPU处理可以提高任务的完成速度,提高系统的性能。