`
truemylife
  • 浏览: 228343 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

一个任务队列的BlockingQueue实现

阅读更多

一、Concurrent简单介绍

Concurrentjdk1.5推出来的对多线程实现的进一步封装,它大大的简化了多线程开发。concurrent包分成了三个部分,分别是java.util.concurrentjava.util.concurrent.atomicjava.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。

       Executor:具体Runnable任务的执行者。

ExecutorService:一个线程池管理者,其实现类有多种,比如普通线程池,定时调度线程池ScheduledExecutorService等,我们能把一个

Runnable,Callable提交到池中让其调度。

Future:是与Runnable,Callable进行交互的接口,比如一个线程执行结束后取返回的结果等等,还提供了cancel终止线程。

BlockingQueue:阻塞队列。

 

 

 

二、一个任务队列的BlockingQueue实现

 

public int FetchInvertal = 1000;
public Datum newdatum = null;
// Creates a LinkedBlockingQueue with a capacity of Integer.MAX_VALUE.
// 理论上Integer.MAX_VALUE.的任务排队
final BlockingQueue<Datum> queue = new LinkedBlockingQueue<Datum>();
// 线程池
// 其中一个线程,取出待处理资料,put到queue
// 其余四个线程处理具体业务
final ExecutorService fetchdataservice = Executors
				.newSingleThreadExecutor();
final ExecutorService convertservice = Executors.newFixedThreadPool(4);
//计数
final AtomicInteger wc = new AtomicInteger();
final AtomicReference<String> atomstarttime=new AtomicReference<String>(null);
// FetchList线程
Runnable fetchlist = new Runnable() {
public void run() {
while (true) {
	try {
		List<Datum> tmpls = getLatestDatums("1", tomstarttime.get());
		if (tmpls != null && tmpls.size() > 0) {
		queue.addAll(tmpls);
		atomstarttime.set(tmpls.get(tmpls.size() - 1).getAddtime());
		tmpls=null;
		}
// 每隔一秒钟就检测一下是否有新的待处理的数据
		Thread.sleep(FetchInvertal);
		} catch (InterruptedException ex) {
	}
}
}
};
fetchdataservice.submit(fetchlist);
fetchdataservice.shutdown();

// 四个处理线程
for (int index = 0; index < 4; index++) {
		Runnable exewrite = new Runnable() {
		public void run() {
		int port = 8100 + wc.getAndIncrement();
		while (true) {
		try {
//如果队列里没有数据,会自动退出当前进程,为了防止进程被停掉,先判断是否有任务队列 
//队列里去看有没有任务,一直循环。
			if (queue.size() > 0) {
					Datum datum = queue.take();
					if (datum != null) {
					if (dobiz==true) {//此条件是伪码
										// 若成功,自动审核通过
						updateDatumState(datum.getUuid(), "3");
					} else {
						// 若不成功,自动审核不通过
						updateDatumState(datum.getUuid(), "2");
					}
				}
			} else {
					Thread.sleep(2000);//如果队列里没有任务了,睡眠两秒
			}
		} catch (InterruptedException e) {
	}
}
}
};
convertservice.submit(exewrite);
}
convertservice.shutdown();
}

private List<Datum> getLatestDatums(String status, String starttime) {
		//返回最新产生的待处理的任务列表
	}

private String updateDatumState(String uuid, String status) {
		//更新当前任务状态,打上已处理完成,或处理异常的标志
	}

 

 

 

 

 

分享到:
评论

相关推荐

    BlockingQueue队列自定义超时时间取消线程池任务

    定义全局线程池,将用户的请求放入自定义队列中,排队等候线程调用,等待超时则自动取消该任务,实现超时可取消的异步任务

    Java NIO+多线程实现聊天室

    阻塞队列BlockingQueue,生产者消费者模式 选择器 渠道 字节缓冲区 ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子指标 内置锁 竣工服务 log4j+slf4j日志 实现的功能 登录注销 ...

    第7章-JUC多线程v1.1.pdf

    如果当前线程池中的线程数目&gt;=corePollSize, 则每来一个任务, 会尝试将其添加到缓冲队列中, 如果添加成功, 则该任务会等待空闲线程将其取出去执行, 如果添加失败(一般是以为任务队列已经满了), 则会尝试创建新的线程...

    java线程池概念.txt

    因为项目需要,还涉及到排队下载的功能,所以就选择了线程池来管理线程以及线程池里面的任务队列workQueue来实现项目所需的功能;  b:在实际使用中,服务器在创建和销毁线程上花费的时间和消耗的系统资源都相当大...

    Java开发基于多线程和NIO实现聊天室源码+项目说明(含服务端+客户端).zip

    - 阻塞队列BlockingQueue,生产者消费者模式 - Selector - Channel - ByteBuffer - ProtoStuff 高性能序列化 - HttpClient连接池 - Spring依赖注入 - lombok简化POJO开发 - 原子变量 - 内置锁 - ...

    高级开发并发面试题和答案.pdf

    面试高级开发的期间整理的面试题目,记录我面试遇到过的并发题目以及答案 目录 并发 常说的并发问题是哪些;资源竞争、死锁、事务、可见性 ...实现一个阻塞队列(用Condition写生产者与消费者就)?BlockingQueue

    Chat:Java NIO+多线程实现聊天室

    Java基于多线程和NIO实现聊天室涉及到的技术点线程池ThreadPoolExecutor阻塞队列BlockingQueue,生产者消费者模式SelectorChannelByteBufferProtoStuff 高性能序列化HttpClient连接池Spring依赖注入lombok简化POJO...

    Java并发编程(学习笔记).xmind

    如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个(Which)任务?另外,如何(How)通知应用程序有任务被拒绝? 在执行一个任务之前或之后,应该进行什么(What)动作? 使用Exector框架 ...

    DC_Tower_Elevator:直流塔式电梯挑战赛

    每个电梯都由一个线程表示。 所有电梯并行运行,并一直在等待请求。 假设问题是生产者/消费者目标,则使用BlockingQueue来完成此任务。 之所以选择BlockingQueue,是因为数据类型必须是线程安全的,并且必须实现所...

    Java并发编程实战

    第一部分 基础知识 第2章 线程安全性 2.1 什么是线程安全性 2.2 原子性 2.2.1 竞态条件 2.2.2 示例:延迟初始化中的竞态条件 2.2.3 复合操作 2.3 加锁机制 2.3.1 内置锁 2.3.2 重入 2.4 用锁来保护状态 ...

    Java 并发编程实战

    第一部分 基础知识 第2章 线程安全性 2.1 什么是线程安全性 2.2 原子性 2.2.1 竞态条件 2.2.2 示例:延迟初始化中的竞态条件 2.2.3 复合操作 2.3 加锁机制 2.3.1 内置锁 2.3.2 重入 2.4 用锁来保护状态 ...

    Scaling-Threadpool-Server

    简单扩展服务器 说明 这是利用可配置线程池和Java NIO的简单可伸缩服务器的示例。 客户端用于生成随机生成的字符串,并以设置的时间间隔将消息发送到服务器...实现阻塞队列以同时容纳工作人员和任务 用于存储选择器中的

    Java JDK实例宝典

    6 一个时钟程序 第11章 Java多媒体 11. 1 滚动的消息 11. 2 三维弹球 11. 3 贪吃蛇游戏 11. 4 Java声音处理 11. 5 媒体播放器 第12章 反射 12. 1 instanceof操作符 12. 2 获取...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段30讲、如何实现一个自己的显式锁Lock精讲下(让锁具备超时功能).mp4 │ 高并发编程第一阶段31讲、如何给你的应用程序注入钩子程序,Linux下演示.mp4 │ 高并发编程第一阶段32讲、如何捕获...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第一阶段30讲、如何实现一个自己的显式锁Lock精讲下(让锁具备超时功能).mp4 │ 高并发编程第一阶段31讲、如何给你的应用程序注入钩子程序,Linux下演示.mp4 │ 高并发编程第一阶段32讲、如何捕获...

    javaSE代码实例

    1.4 第一个Java程序 8 1.4.1 开发源代码 8 1.4.2 编译运行 9 1.5 小结 11 第2章 基本数据类型——构建Java 大厦的基础 12 2.1 源代码注释 12 2.1.1 单行注释 12 2.1.2 区域注释 12 2.1.3 文档...

Global site tag (gtag.js) - Google Analytics