java 多线程并发设计模式之二: Master worker 模式应用

在多线程程序设计中Master worker 模式是常用的并行模式之一,核心思想是由两类进程协助完成的,Master 进程负责接收和分配任务并保存结果集,Worker 负责处理任务, 并把结果返回给Master 进程. 这类设计模式最大的好处是 将一个大任务分配成若干个小任务并行执行。下面是一个简单的Master-Worker模式的框架

Master.java
程序代码 程序代码

package com.yihaomen.masterwork;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
    //任务队列
    protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
    //Worker线程队列
    protected Map<String,Thread> threadMap=new HashMap<String,Thread>();
    //子任务处理结果集
    protected Map<String,Object> resultMap = new ConcurrentHashMap<String,Object>();
    
    //是否所有的子任务都结束了
    public boolean isComplete(){
        for(Map.Entry<String,Thread> entry:threadMap.entrySet()){
            if(entry.getValue().getState()!=Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }
    
    //Master的构造,需要一个Worker进程逻辑,和需要的Worker进程数量
    public Master(Worker worker,int countWorker){
        worker.setWorkQueue(workQueue);
        worker.setResultMap(resultMap);
        for(int i=0;i<countWorker ;i++)
            threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
    }
    
    //提交一个任务
    public void submit(Object job){
        workQueue.add(job);
    }
    
    //返回子任务结果集
    public Map<String,Object>  getResultMap(){
        return resultMap;
    }
    
    //开始运行所有的Worker进程,进行处理
    public void execute(){
        for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
            entry.getValue().start();
        }
    }
}



Worker.java
程序代码 程序代码

package com.yihaomen.masterwork;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable{
    //任务队列,用于取得子任务
    protected Queue<Object> workQueue;
    //子任务处理结果集
    protected Map<String,Object> resultMap;
    public void setWorkQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
    
    //子任务处理的逻辑,在子类中实现具体逻辑
    public Object handle(Object input){
        return input;
    }
    
    @Override
    public void run() {
        while (true) {
             //获取子任务
             Object input = workQueue.poll();
             if (input == null) break;
             //处理子任务
             Object re=handle(input);
             //将处理结果写入结果集
             resultMap.put(Integer.toString(input.hashCode()), re);
        }
    }
}



注意这里面用到了java concurrent 包中的. ConcurrentLinkedQueue, ConcurrentHashMap, 这是线程安全的类. 下面是一个测试例子,计算1-100 的立方和. 过程如下:首先继承 Worker 类 实现一个 PlusWorker 的类,orerride handle 方法. 然后提交100个任务,开始计算.

程序代码 程序代码

package com.yihaomen.masterwork;

import java.util.Map;
import java.util.Set;

import org.junit.Test;

public class TestMasterWorker {

    public class PlusWorker extends Worker{
        public Object handle(Object input){
            Integer i=(Integer)input;
            return i*i*i;
        }
    }
    
    @Test
    public void testMasterWorker() {
        Master m=new Master(new PlusWorker(),5);
        for(int i=0;i<100;i++)
            m.submit(i);
        m.execute();
        int re=0;
        Map<String,Object> resultMap=m.getResultMap();
        while(resultMap.size()>0 || !m.isComplete()){
            Set<String> keys=resultMap.keySet();
            String key=null;
            for(String k:keys){
                key=k;
                break;
            }
            Integer i=null;
            if(key!=null)
                i=(Integer)resultMap.get(key);
            if(i!=null)
                re+=i;
            if(key!=null)
                resultMap.remove(key);
        }
        
        System.out.println("testMasterWorker:"+re);
    }
    
    @Test
    public void testPlus(){
        int re=0;
        for(int i=0;i<100;i++){
            re+=i*i*i;
        }
        System.out.println("testPlus:"+re);
    }

}



由此可见master-worker 模式适合与将大任务化成小任务并行执行的情况,各个小任务基本独立运行. 测试例子下载:

下载文件 java multiple master worker pattern


除非申明,文章均为一号门原创,转载请注明本文地址,谢谢!
文章来自: 本站原创
引用通告: 查看所有引用 | 我要引用此文章
Tags: 多线程
相关日志:
评论: 2 | 引用: 0 | 查看次数: -
回复回复放学不走[2015-01-21 04:32 PM | del]
很完美..赞一个.
问下博主,这个队列ConcurrentLinkedQueue一般用于什么任务?其实我想问的是,对于多线程读写大文件,这个适用吗?
回复回复亮剑[2014-04-01 09:33 PM | del]
看到你的文章,才明白我还是在山脚下
发表评论
昵 称:
密 码: 游客发言不需要密码.
内 容:
验证码: 验证码
选 项:
虽然发表评论不用注册,但是为了保护您的发言权,建议您注册帐号.