老大甩给我 30G 文件,让我几天内全部导入到数据库( 二 )

上述代码中,每读取到一行内容,就会直接交给线程池来执行 。
我们知道线程池原理如下:
如果核心线程数未满,将会直接创建线程执行任务 。
如果核心线程数已满,将会把任务放入到队列中 。
如果队列已满,将会再创建线程执行任务 。
如果最大线程数已满,队列也已满,那么将会执行拒绝策略 。
线程池执行流程图
由于我们上述线程池设置的核心线程数为 5,很快就到达了最大核心线程数,后续任务只能被加入队列 。
为了后续任务不被线程池拒绝,我们可以采用如下方案:
以上两种方案都存在同样的问题,第一种是相当于将文件所有内容加载到内存,将会占用过多内存 。
而第二种创建过多的线程,同样也会占用过多内存 。
一旦内存占用过多,GC 无法清理,就可能会引起频繁的 「Full GC」,甚至导致 「OOM」,导致程序导入速度过慢 。
当然了,我们还可以第三种方案,综合前两种,设置合适队列长度,以及合适最大线程数 。不过呢,「合适」这个度真不好把握,另外也还是有「OOM」问题 。
所以为了解决这个问题,日思夜想研究出两个解决方案:
批量执行
JDK 提供的 ,可以让主线程等待子线程都执行完成之后,再继续往下执行 。
利用这个特性,我们可以改造多线程导入的代码,主体逻辑如下:
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {// 存储每个任务执行的行数List lines = Lists.newArrayList();// 存储异步任务List tasks = Lists.newArrayList();while (iterator.hasNext()) {String line = iterator.nextLine();lines.add(line);// 设置每个线程执行的行数if (lines.size() == 1000) {// 新建异步任务,注意这里需要创建一个 Listtasks.add(new ConvertTask(Lists.newArrayList(lines)));lines.clear();}if (tasks.size() == 10) {asyncBatchExecuteTask(tasks);}}// 文件读取结束,但是可能还存在未被内容tasks.add(new ConvertTask(Lists.newArrayList(lines)));// 最后再执行一次asyncBatchExecuteTask(tasks);}
这段代码中,每个异步任务将会导入 1000 行数据,等积累了 10 个异步任务,然后将会调用 k 使用线程池异步执行 。
/*** 批量执行任务** @param tasks*/private static void asyncBatchExecuteTask(List tasks) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(tasks.size());for (ConvertTask task : tasks) {task.setCountDownLatch(countDownLatch);executorService.submit(task);}// 主线程等待异步线程 countDownLatch 执行结束countDownLatch.await();// 清空,重新添加任务tasks.clear();}
k 方法内将会创建 ,然后主线程内调用 await方法等待所有异步线程执行结束 。

老大甩给我 30G 文件,让我几天内全部导入到数据库

文章插图
异步任务逻辑如下:
/*** 异步任务* 等数据导入完成之后,一定要调用 countDownLatch.countDown()* 不然,这个主线程将会被阻塞,*/private static class ConvertTask implements Runnable {private CountDownLatch countDownLatch;private List lines;public ConvertTask(List lines) {this.lines = lines;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {for (String line : lines) {convertToDB(line);}} finally {countDownLatch.countDown();}}}
任务类逻辑就非常简单,遍历所有行,将其导入到数据库中 。所有数据导入结束,调用 # 。
一旦所有异步线程执行结束,调用 #,主线程将会被唤醒,继续执行文件读取 。