上述代码中,每读取到一行内容,就会直接交给线程池来执行 。
我们知道线程池原理如下:
如果核心线程数未满,将会直接创建线程执行任务 。
如果核心线程数已满,将会把任务放入到队列中 。
如果队列已满,将会再创建线程执行任务 。
如果最大线程数已满,队列也已满,那么将会执行拒绝策略 。
线程池执行流程图
由于我们上述线程池设置的核心线程数为 5,很快就到达了最大核心线程数,后续任务只能被加入队列 。
为了后续任务不被线程池拒绝,我们可以采用如下方案:
以上两种方案都存在同样的问题,第一种是相当于将文件所有内容加载到内存,将会占用过多内存 。
而第二种创建过多的线程,同样也会占用过多内存 。
一旦内存占用过多,GC 无法清理,就可能会引起频繁的 「Full GC」,甚至导致 「OOM」,导致程序导入速度过慢 。
当然了,我们还可以第三种方案,综合前两种,设置合适队列长度,以及合适最大线程数 。不过呢,「合适」这个度真不好把握,另外也还是有「OOM」问题 。
所以为了解决这个问题,日思夜想研究出两个解决方案:
批量执行
JDK 提供的 ,可以让主线程等待子线程都执行完成之后,再继续往下执行 。
利用这个特性,我们可以改造多线程导入的代码,主体逻辑如下:
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {// 存储每个任务执行的行数List lines = Lists.newArrayList();// 存储异步任务List
这段代码中,每个异步任务将会导入 1000 行数据,等积累了 10 个异步任务,然后将会调用 k 使用线程池异步执行 。
/*** 批量执行任务** @param tasks*/private static void asyncBatchExecuteTask(List
k 方法内将会创建 ,然后主线程内调用 await方法等待所有异步线程执行结束 。
文章插图
异步任务逻辑如下:
/*** 异步任务* 等数据导入完成之后,一定要调用 countDownLatch.countDown()* 不然,这个主线程将会被阻塞,*/private static class ConvertTask implements Runnable {private CountDownLatch countDownLatch;private List lines;public ConvertTask(List lines) {this.lines = lines;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {for (String line : lines) {convertToDB(line);}} finally {countDownLatch.countDown();}}}
任务类逻辑就非常简单,遍历所有行,将其导入到数据库中 。所有数据导入结束,调用 # 。
一旦所有异步线程执行结束,调用 #,主线程将会被唤醒,继续执行文件读取 。
- 三国最聪明的谋士排名,谁能给我三国历史上谋士排名
- 上班第一天,老板就甩来30G文件是种什么体验...
- 这10部经典恐怖片,曾给我们的成长,带来惊吓和快乐 十个最恐怖的吉尼斯记录
- 一线豪车品牌有哪些,谁给我个世界豪车标志排行榜啊?
- 最年轻的世界级湾区,甩出一个“王炸” 广州的世界之最
- 詹姆斯最新得分排名,詹姆斯现在得分排名第几,在NBA历史上
- 我国的三款武器成世界顶尖,甩美国至少八条街,东风17只是其一 中国之最*事科技
- 什么叫甩尾,什么叫甩尾
- GOAT!C罗创造历史:仅用26分钟戴帽,3大纪录诞生,甩开梅西 历史带帽之最
- 京东买完降价可以退差价吗,在京东订单提交成功后商品降价了,可以退给我差价吗?