java 多线程并发设计模式之四: 生产者消费者模式
By:Roy.LiuLast updated:2014-03-27
生产者消费者模式是一个经典的多线程设计模式,其核心思想是:有两类线程和一个内存缓冲区或者队列, 一类线程发起任务,并提交到队列中。另一类线程用来处理这些任务,叫做消费者线程. 这两类线程进行通信的桥梁是内存缓冲区,从而实现了解耦,生产者不知道消费者的存在,消费者也不知道生产者的存在. 二者的处理速度无论快慢,都可以通过内存缓冲区得到协调.
在下面的例子中,用 BlockingQueue 作为内存缓冲区。 PCData 类作为要处理的任务。
生产者线程
消费者线程
要处理的任务载体
测试生产者,消费者 主程序
所有源代码提供下载:
java thread procedure consume pattern sample download
在下面的例子中,用 BlockingQueue 作为内存缓冲区。 PCData 类作为要处理的任务。
生产者线程
package com.yihaomen.produceconsume; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueuequeue; private static AtomicInteger count = new AtomicInteger(); private static final int SLEEPTIME = 1000; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { PCData data = null; Random r = new Random(); System.out.println("start producer id="+Thread.currentThread().getId()); try { while (isRunning) { Thread.sleep(r.nextInt(SLEEPTIME)); data = new PCData(count.incrementAndGet()); System.out.println(data+" is put into queue"); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.err.println("failed to put data��" + data); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop() { isRunning = false; } }
消费者线程
package com.yihaomen.produceconsume; import java.text.MessageFormat; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Consumer implements Runnable { private BlockingQueuequeue; private static final int SLEEPTIME = 1000; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { System.out.println("start Consumer id=" + Thread.currentThread().getId()); Random r = new Random(); try { while(true){ PCData data = queue.take(); if (null != data) { int re = data.getData() * data.getData(); System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(), data.getData(), re)); Thread.sleep(r.nextInt(SLEEPTIME)); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
要处理的任务载体
package com.yihaomen.produceconsume; public final class PCData { private final int intData; public PCData(int d){ intData=d; } public PCData(String d){ intData=Integer.valueOf(d); } public int getData(){ return intData; } @Override public String toString(){ return "data:"+intData; } }
测试生产者,消费者 主程序
package com.yihaomen.produceconsume; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueuequeue = new LinkedBlockingQueue (10); //建立生产者 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); //建立消费者 Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); //线程池 ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(10 * 1000); //停止生产者 producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(3000); service.shutdown(); } }
所有源代码提供下载:
java thread procedure consume pattern sample download
From:一号门
Next:Spring MVC 程序首页的设置
COMMENTS