Java使用并发阻塞队列实现生产者与消费者

Java 新民 718℃ 已收录 0评论

代码如下:

	package TestQueue;

	import java.util.concurrent.BlockingQueue;
	import java.util.concurrent.LinkedBlockingQueue;
	import java.util.concurrent.TimeUnit;
	import java.util.concurrent.atomic.AtomicInteger;

	/**
	 * 生产者线程
	 * @author admin
	 *
	 */
	class ProducerThread extends Thread{
		private BlockingQueue queue;
		private volatile boolean flag = true;
		private static AtomicInteger count = new AtomicInteger();
		
		public  ProducerThread(BlockingQueue queue) {
			this.queue = queue; 
		}
		
		@Override
		public void run() {
			System.out.println("生产者线程启动。。。");
			try {
				while(flag){
					System.out.println("正在生产数据。。。");
					int data = count.incrementAndGet();
					// 添加队列
					boolean offer = queue.offer(data);
					if(offer){
						System.out.println("生产者添加队列"+data+"成功");
					}else{
						System.out.println("生产者添加队列"+data+"失败");
					}
					Thread.sleep(1000);
				}
				
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}finally{
				System.out.println("生产者线程停止");
			}
		}
		
		public void stopThread(){
			this.flag = false;
		}
	}

	/**
	 * 消费者线程
	 * @author admin
	 *
	 */
	class ConsumerThread extends Thread{
		
		private BlockingQueue queue;
		private volatile boolean flag = true;
		
		public  ConsumerThread(BlockingQueue queue) {
			this.queue = queue; 
		}
		
		
		@Override
		public void run() {
		
			System.out.println("消费者线程启动....");
			try {
				while(flag){
					Integer data = (Integer)queue.poll(2, TimeUnit.SECONDS);
					if(data != null){
						System.out.println("消费者获取"+data+"成功");
					}else{
						System.out.println("消费者获取"+data+"失败");
						this.flag = false;
					}
				}
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}finally{
				System.out.println("消费者线程停止");
			}
		}
		
	}

	public class Demo01 {
		public static void main(String[] args) throws InterruptedException {
			BlockingQueue queue = new LinkedBlockingQueue<>(10);
			
			ProducerThread producer1 = new ProducerThread(queue);
			ProducerThread producer2 = new ProducerThread(queue);
			ConsumerThread consumer = new ConsumerThread(queue);
			
			producer1.start();
			producer2.start();
			consumer.start();
			
			Thread.sleep(10*1000);
			producer1.stopThread();
			producer2.stopThread();
		}
	}

本站文章如未注明,均为原创丨本网站采用BY-NC-SA协议进行授权,转载请注明转自:https://www.snowruin.com/?p=1734
喜欢 (3)or分享 (0)
发表我的评论
取消评论
表情 代码 贴图 加粗 链接 私信 删除线 签到

Hi,请填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址